hadoop夺命复习之旅第二弹

  • 倒排索引案例
  • 数据去重
  • TopN排序

倒排索引案例

  • Map阶段:将文本中的单词按照空格进行切割,并以冒号拼接,“单词:文档名称”作为key,单词次数作为value,都以文本方式传输至Combine阶段

    • 代码    

    import org.apache.hadoop.mapreduce.Mapper;
    import java.io.IOException;
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;


    public class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> {
    private static Text keyInfo = new Text(); //存储单词和URL组合
    private static final Text valueInfo = new Text("1"); //存储词频,初始化为1


    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException,  InterruptedException{
             String line = value.toString();
             String[] fields = StringUtils.split(line, " "); //得到字段数组
             FileSplit fileSplit = (FileSplit)context.getInputSplit(); //得到这行数据所在的文件切片
             String fileName = fileSplit.getPath().getName(); //根据文件切片得到文件名
             for (String field:fields){
    //key值由单词和URL组成,如"MapReduce:file1"
              keyInfo.set(field+":"+fileName);
              context.write(keyInfo, valueInfo);
                 }
    }  
    }

  • Combine:根据Map阶段的输出结果形式,在cn.com.sise.mapreduce.invertindex包下,自定义实现Combine阶段的类InvertedIndexCombiner,该类继承Reducer类,对每个文档的单词进行词频统计

    • 代码

    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.io.Text;
    import java.io.IOException;


    public class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text> {
    private static Text info = new Text();
             // 输入:<MapReduce:file3{1,1,...}>
    // 输出:
    @Override
    protected void reduce(Text key, Iterablevalues, Context context)throws IOException,
    InterruptedException{
             int sum=0; // 统计词频
             for (Text value:values){
              sum+=Integer.parseInt(value.toString());
              }
             int splitIndex = key.toString().indexOf(":");
    // 重新设置value值由URL和词频组成
             info.set(key.toString().substring(splitIndex+1)+":"+sum);
    // 重新设置key值为单词
             key.set(key.toString().substring(0,splitIndex));
             context.write(key, info);
    }
    }

  • Reduce:接收Combine阶段输出的数据,按照最终案例倒排索引文件需求的样式,将单词作为key,多个文档名称和词频连接作为value,输出到目标目录

    • 代码

    import org.apache.hadoop.mapreduce.Reducer;
    import java.io.IOException;
    import org.apache.hadoop.io.Text;


    public class InvertedIndexReducer extends Reducer<Text, Text, Text, Text> {
    private static Text result = new Text();
             // 输入:
            // 输出:<MapReduce file1:1;file2:1;file3:2;>
    @Override
    protected void reduce(Text key, Iterablevalues, Context context)throws IOException, InterruptedException{
    // 生成文档列表
             String fileList = new String();
             for(Text value:values) {
              fileList += value.toString() + ";";
             }
             result.set(fileList);
             context.write(key, result);
    }
    }
    **

  • Runner:设置MapReduce工作任务的相关参数,本来采用集群运行模式。因此,需要设置远程HDFS系统源文件目录(hdfs://localhost:9000/user/hadoop/inputdata)以及结果输出目录(hdfs://localhost:9000/user/hadoop/outputdata)

    • 代码

    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


    public class InvertedIndexDriver{
    public static void main(String[] args)throws ClassNotFoundException,IOException,  InterruptedException{
     Configuration conf = new Configuration();
     Job job = Job.getInstance(conf);
     job.setJarByClass(InvertedIndexDriver.class);
     job.setMapperClass(InvertedIndexMapper.class);
     job.setCombinerClass(InvertedIndexCombiner.class);
     job.setReducerClass(InvertedIndexReducer.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(Text.class);
     FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/user/hadoop/inputdata"));
    // 指定处理完成之后的结果所保存的位置
     FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/user/hadoop/outputdata"));
    // 向yarn集群提交这个job
     boolean res = job.waitForCompletion(true);
     System.exit(res?0:1);
     
    }
    }
    **

数据去重

  • Map:读取数据集文件将TextInputFormat默认组件解析的类似<0,2020-9-1 a>键值对修改为<2020-9-1 a,null>

    • 代码

    import java.io.IOException;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Mapper.Context;


    public class DedupMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    private static Text field = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException,  InterruptedException{
             field=value;
             context.write(field, NullWritable.get());
    }
    }

  • Reduce:仅接受Map阶段传递过来的数据,根据Shuffle工作原理,键值key相同的数据就会被合并,因此输出的数据就不会出现重复数据了

    • 代码

    import java.io.IOException;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.Mapper.Context;


    public class DedupReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
    @Override
    protected void reduce(Text key, Iterablevalues, Context context) throws IOException,  InterruptedException{
             context.write(key, NullWritable.get());
    }
    }

  • Runner:设置MapReduce工作任务的相关参数,本案例采用集群运行模式。因此,需要设置远程HDFS系统源文件目录(hdfs://localhost:9000/user/hadoop/inputdata1)以及结果输出目录(hdfs://localhost:9000/user/hadoop/outputdata1)

    • 代码

    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


    public class DedupRunner {
    public static void main(String[] args)throws ClassNotFoundException,IOException,  InterruptedException{
     Configuration conf = new Configuration();
     Job job = Job.getInstance(conf);
     job.setJarByClass(DedupRunner.class);
     job.setMapperClass(DedupMapper.class);
     job.setReducerClass(DedupReducer.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(NullWritable.class);
     FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/user/hadoop/inputdata"));
    // 指定处理完成之后的结果所保存的位置
     FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/user/hadoop/outputdata"));
     job.waitForCompletion(true);
    }
    }
    **

TopN排序

  • Map:先将文件中的每行数据进行切割提取,并把数据保存到TreeMap中,判断TreeMap是否大于5,如果大于5就需要移除最小的数据。由于数据是逐行读取,如果这时就向外写数据,那么TreeMap就保存了每一行的最大5个数,因此需要在cleanup()方法中编写context.write()方法,这样就保证了当前MapTask中TreeMap保存了当前文件最大的5条数据后,再输出到Reduce阶段

    • 代码

    import java.util.TreeMap;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.NullWritable;
    public class TopNMapper extends Mapper<LongWritable, Text, NullWritable, IntWritable> {
    private TreeMap<Integer, String>repToRecordMap = new TreeMap<Integer, String>();
     
    @Override
    public void map(LongWritable key, Text value, Context context){
             String line = value.toString();
             String[]nums = line.split(" ");
             for(String num:nums){
              repToRecordMap.put(Integer.parseInt(num)," ");
              if(repToRecordMap.size()>5){
                   repToRecordMap.remove(repToRecordMap.firstKey());
          }
         }
    }
    @Override
    protected void cleanup(Context context){
             for(Integer i:repToRecordMap.keySet()){
          try{
                   context.write(NullWritable.get(), new IntWritable(i));
          }catch(Exception e){
                   e.printStackTrace();
              }
             }
    }
    }

  • Reduce:首先TreeMap自定义排序规则,当需求取最大值时,只需要在compare()方法中返回正数即可满足倒序排序,reduce()方法依然要满足时刻判断TreeMap中存放数据是前5个数,并最终遍历输出最大的5个数

    • 代码

    import java.io.IOException;
    import java.util.Comparator;
    import java.util.TreeMap;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Reducer;


    public class TopNReducer extends Reducer<NullWritable, IntWritable, NullWritable, IntWritable> {
    private TreeMap<Integer,String>repToRecordMap = new TreeMap<Integer,String>
    (new Comparator(){
                 public int compare(Integer a, Integer b){
                  return b-a;
                 }
    });
     
    @Override
    public void reduce(NullWritable key,Iterablevalues,Context context)throws IOException,  InterruptedException{
    // 返回一个基本类型的整形,谁大谁排后面
    // 返回负数表示:o1小于o2
    // 返回0表示:o1和o2相等
    // 返回正数: o1大于o2
             for(IntWritable value:values){
              repToRecordMap.put(value.get()," ");
              if(repToRecordMap.size()>5){
                       repToRecordMap.remove(repToRecordMap.firstKey());
              }
             }
             for(Integer i:repToRecordMap.keySet()){
              context.write(NullWritable.get(),new IntWritable(i));
             }
    }
    }

  • Runner

    • 代码

    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import cn.com.sise.mapreduce.dedup.DedupMapper;
    import cn.com.sise.mapreduce.dedup.DedupReducer;
    import cn.com.sise.mapreduce.dedup.DedupRunner;


    public class TopNRunner {
    public static void main(String[] args)throws Exception{
     Configuration conf = new Configuration();
     Job job = Job.getInstance(conf);
     job.setJarByClass(TopNRunner.class);
     job.setMapperClass(TopNMapper.class);
     job.setReducerClass(TopNReducer.class);
     job.setNumReduceTasks(1);
     job.setMapOutputKeyClass(NullWritable.class); // map阶段的输出的key
     job.setMapOutputValueClass(IntWritable.class); // map阶段的输出的value
     job.setOutputKeyClass(NullWritable.class); // reduce阶段的输出的key
     job.setOutputValueClass(IntWritable.class); // reduce阶段输出的value
     FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/user/hadoop/inputdata"));
     FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/user/hadoop/outputdata"));
     boolean res = job.waitForCompletion(true);
     System.exit(res?0:1);
    }
    }

Last modification:December 31st, 2020 at 12:59 am
如果觉得我的文章对你有用,请随意赞赏