hadoop夺命复习之旅第二弹
- 倒排索引案例
- 数据去重
- TopN排序
倒排索引案例
-
Map阶段:将文本中的单词按照空格进行切割,并以冒号拼接,“单词:文档名称”作为key,单词次数作为value,都以文本方式传输至Combine阶段
- 代码
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> {
private static Text keyInfo = new Text(); //存储单词和URL组合
private static final Text valueInfo = new Text("1"); //存储词频,初始化为1
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String line = value.toString();
String[] fields = StringUtils.split(line, " "); //得到字段数组
FileSplit fileSplit = (FileSplit)context.getInputSplit(); //得到这行数据所在的文件切片
String fileName = fileSplit.getPath().getName(); //根据文件切片得到文件名
for (String field:fields){
//key值由单词和URL组成,如"MapReduce:file1"
keyInfo.set(field+":"+fileName);
context.write(keyInfo, valueInfo);
}
}
} -
Combine:根据Map阶段的输出结果形式,在cn.com.sise.mapreduce.invertindex包下,自定义实现Combine阶段的类InvertedIndexCombiner,该类继承Reducer类,对每个文档的单词进行词频统计
- 代码
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;
import java.io.IOException;
public class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text> {
private static Text info = new Text();
// 输入:<MapReduce:file3{1,1,...}>
// 输出:
@Override
protected void reduce(Text key, Iterablevalues, Context context)throws IOException,
InterruptedException{
int sum=0; // 统计词频
for (Text value:values){
sum+=Integer.parseInt(value.toString());
}
int splitIndex = key.toString().indexOf(":");
// 重新设置value值由URL和词频组成
info.set(key.toString().substring(splitIndex+1)+":"+sum);
// 重新设置key值为单词
key.set(key.toString().substring(0,splitIndex));
context.write(key, info);
}
} -
Reduce:接收Combine阶段输出的数据,按照最终案例倒排索引文件需求的样式,将单词作为key,多个文档名称和词频连接作为value,输出到目标目录
- 代码
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import org.apache.hadoop.io.Text;
public class InvertedIndexReducer extends Reducer<Text, Text, Text, Text> {
private static Text result = new Text();
// 输入:
// 输出:<MapReduce file1:1;file2:1;file3:2;>
@Override
protected void reduce(Text key, Iterablevalues, Context context)throws IOException, InterruptedException{
// 生成文档列表
String fileList = new String();
for(Text value:values) {
fileList += value.toString() + ";";
}
result.set(fileList);
context.write(key, result);
}
}
** -
Runner:设置MapReduce工作任务的相关参数,本来采用集群运行模式。因此,需要设置远程HDFS系统源文件目录(hdfs://localhost:9000/user/hadoop/inputdata)以及结果输出目录(hdfs://localhost:9000/user/hadoop/outputdata)
- 代码
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class InvertedIndexDriver{
public static void main(String[] args)throws ClassNotFoundException,IOException, InterruptedException{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(InvertedIndexDriver.class);
job.setMapperClass(InvertedIndexMapper.class);
job.setCombinerClass(InvertedIndexCombiner.class);
job.setReducerClass(InvertedIndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/user/hadoop/inputdata"));
// 指定处理完成之后的结果所保存的位置
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/user/hadoop/outputdata"));
// 向yarn集群提交这个job
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
**
数据去重
-
Map:读取数据集文件将TextInputFormat默认组件解析的类似<0,2020-9-1 a>键值对修改为<2020-9-1 a,null>
- 代码
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
public class DedupMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
private static Text field = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
field=value;
context.write(field, NullWritable.get());
}
} -
Reduce:仅接受Map阶段传递过来的数据,根据Shuffle工作原理,键值key相同的数据就会被合并,因此输出的数据就不会出现重复数据了
- 代码
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
public class DedupReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterablevalues, Context context) throws IOException, InterruptedException{
context.write(key, NullWritable.get());
}
} -
Runner:设置MapReduce工作任务的相关参数,本案例采用集群运行模式。因此,需要设置远程HDFS系统源文件目录(hdfs://localhost:9000/user/hadoop/inputdata1)以及结果输出目录(hdfs://localhost:9000/user/hadoop/outputdata1)
- 代码
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DedupRunner {
public static void main(String[] args)throws ClassNotFoundException,IOException, InterruptedException{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(DedupRunner.class);
job.setMapperClass(DedupMapper.class);
job.setReducerClass(DedupReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/user/hadoop/inputdata"));
// 指定处理完成之后的结果所保存的位置
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/user/hadoop/outputdata"));
job.waitForCompletion(true);
}
}
**
TopN排序
-
Map:先将文件中的每行数据进行切割提取,并把数据保存到TreeMap中,判断TreeMap是否大于5,如果大于5就需要移除最小的数据。由于数据是逐行读取,如果这时就向外写数据,那么TreeMap就保存了每一行的最大5个数,因此需要在cleanup()方法中编写context.write()方法,这样就保证了当前MapTask中TreeMap保存了当前文件最大的5条数据后,再输出到Reduce阶段
- 代码
import java.util.TreeMap;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.NullWritable;
public class TopNMapper extends Mapper<LongWritable, Text, NullWritable, IntWritable> {
private TreeMap<Integer, String>repToRecordMap = new TreeMap<Integer, String>();
@Override
public void map(LongWritable key, Text value, Context context){
String line = value.toString();
String[]nums = line.split(" ");
for(String num:nums){
repToRecordMap.put(Integer.parseInt(num)," ");
if(repToRecordMap.size()>5){
repToRecordMap.remove(repToRecordMap.firstKey());
}
}
}
@Override
protected void cleanup(Context context){
for(Integer i:repToRecordMap.keySet()){
try{
context.write(NullWritable.get(), new IntWritable(i));
}catch(Exception e){
e.printStackTrace();
}
}
}
} -
Reduce:首先TreeMap自定义排序规则,当需求取最大值时,只需要在compare()方法中返回正数即可满足倒序排序,reduce()方法依然要满足时刻判断TreeMap中存放数据是前5个数,并最终遍历输出最大的5个数
- 代码
import java.io.IOException;
import java.util.Comparator;
import java.util.TreeMap;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class TopNReducer extends Reducer<NullWritable, IntWritable, NullWritable, IntWritable> {
private TreeMap<Integer,String>repToRecordMap = new TreeMap<Integer,String>
(new Comparator(){
public int compare(Integer a, Integer b){
return b-a;
}
});
@Override
public void reduce(NullWritable key,Iterablevalues,Context context)throws IOException, InterruptedException{
// 返回一个基本类型的整形,谁大谁排后面
// 返回负数表示:o1小于o2
// 返回0表示:o1和o2相等
// 返回正数: o1大于o2
for(IntWritable value:values){
repToRecordMap.put(value.get()," ");
if(repToRecordMap.size()>5){
repToRecordMap.remove(repToRecordMap.firstKey());
}
}
for(Integer i:repToRecordMap.keySet()){
context.write(NullWritable.get(),new IntWritable(i));
}
}
} -
Runner
- 代码
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import cn.com.sise.mapreduce.dedup.DedupMapper;
import cn.com.sise.mapreduce.dedup.DedupReducer;
import cn.com.sise.mapreduce.dedup.DedupRunner;
public class TopNRunner {
public static void main(String[] args)throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(TopNRunner.class);
job.setMapperClass(TopNMapper.class);
job.setReducerClass(TopNReducer.class);
job.setNumReduceTasks(1);
job.setMapOutputKeyClass(NullWritable.class); // map阶段的输出的key
job.setMapOutputValueClass(IntWritable.class); // map阶段的输出的value
job.setOutputKeyClass(NullWritable.class); // reduce阶段的输出的key
job.setOutputValueClass(IntWritable.class); // reduce阶段输出的value
FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/user/hadoop/inputdata"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/user/hadoop/outputdata"));
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}