Hadoop中 Map Join与计数器

it2023-02-11  66

一、概念

Map Join适用于一张表十分小、一张表很大的场景,然后两张表还有关联

二、项目介绍

1、待处理文本

order.txt订单信息表里记录着订单ID,商品ID,订单销量(模拟当大表) 1001 01 1 1002 02 2 1003 03 3 1004 01 4 1005 02 5 1006 03 6

pd.txt商品信息表里记录着商品ID,商品名称(模拟当小表,小表直接加入缓存) 01 小米 02 华为 03 格力

2、需求

将商品信息表中数据根据商品pid合并到订单数据表中

3、代码展示

MapBean.java

import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class MapBean implements Writable { private String orderID; //订单ID private String shopID; //商品ID private Integer orderNum; //订单商品数量 private String shopName; //商品名称 public MapBean() { super(); } @Override public String toString() { return orderID + "\t" + shopName + "\t" + orderNum; } public String getOrderID() { return orderID; } public void setOrderID(String orderID) { this.orderID = orderID; } public String getShopID() { return shopID; } public void setShopID(String shopID) { this.shopID = shopID; } public Integer getOrderNum() { return orderNum; } public void setOrderNum(Integer orderNum) { this.orderNum = orderNum; } public String getShopName() { return shopName; } public void setShopName(String shopName) { this.shopName = shopName; } //序列化 @Override public void write(DataOutput out) throws IOException { out.writeUTF(orderID); out.writeUTF(shopID); out.writeInt(orderNum); out.writeUTF(shopName); } //反序列化 @Override public void readFields(DataInput in) throws IOException { orderID = in.readUTF(); shopID = in.readUTF(); orderNum = in.readInt(); shopName = in.readUTF(); } }

MapJoinMapper.java

import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; public class MapJoinMapper extends Mapper<LongWritable,Text,MapBean,NullWritable> { /* Map Join将小表放在缓存中,只需在driver类中设置缓存小表的路径, 在Mapper类中取缓存小表,进行读取操作 */ private HashMap<String,String> map = new HashMap<>(); private MapBean bean = new MapBean(); // 用枚举的形式统计读取到的商品的条数和订单的条数(计数器) enum MyCount{ORDERNUM,SHOPNUM} /** * 任务开始执行时调用一次 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void setup(Context context) throws IOException, InterruptedException { // 获取商品数据信息 URI[] cacheFiles = context.getCacheFiles(); // 获取文件路径 String path = cacheFiles[0].getPath(); // 读取文件内容 BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(path))); String line; /* 判断文件每一行,不是空行进行切割, 并将商品ID赋值map的键,商品名称赋值map的值 */ while (StringUtils.isNotEmpty(line = bufferedReader.readLine())){ String[] split = line.split("\t"); map.put(split[0],split[1]); // 用枚举的形式统计读取到的商品的条数 context.getCounter(MyCount.SHOPNUM).increment(1); // 用计算器组名加计数器名来统计读取到的商品的条数 context.getCounter("Msg","ShopNum").increment(1); } } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 获取大表每一行 String line = value.toString(); // 分隔符切割 String[] split = line.split("\t"); // 赋值 bean.setOrderID(split[0]); bean.setShopID(split[1]); bean.setOrderNum(Integer.parseInt(split[2])); // 赋值商品名称,通过小表存在map集合中的键取出值 bean.setShopName(map.get(split[1])); // 分别用枚举和组的形式统计读取到的订单的条数 context.getCounter(MyCount.ORDERNUM).increment(1); context.getCounter("Msg","OrderNum").increment(1); context.write(bean,NullWritable.get()); } /** * 任务执行完毕调用一次 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void cleanup(Context context) throws IOException, InterruptedException { //super.cleanup(context); } }

MapJoinDriver.java

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; public class MapJoinDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { //1.获取job对象和配置文件对象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //2.添加jar的路径 job.setJarByClass(MapJoinDriver.class); //3.设置mapper类 job.setMapperClass(MapJoinMapper.class); //4.设置mapper类输出的数据类型 job.setMapOutputKeyClass(MapBean.class); job.setMapOutputValueClass(NullWritable.class); //map join添加缓存文件 job.addCacheFile(new URI("file:///C:/Users/ASUS/Desktop/datas/mapJoinInput2/pd.txt")); //map join没有reducetask job.setNumReduceTasks(0); //设置文件的输入出路径 FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); //提交任务 boolean result = job.waitForCompletion(true); //成功返回0,失败返回1 System.exit(result ? 0:1); } }

4、输出结果为

最新回复(0)