首页 > 服务器 > Web服务器 > 正文

Hadoop编程基于MR程序实现倒排索引示例

2024-09-01 13:53:12
字体:
来源:转载
供稿:网友

相信接触过搜索引擎开发的同学对倒排索引并不陌生,谷歌、百度等搜索引擎都是用的倒排索引,关于倒排索引的有关知识,这里就不再深入讲解,有兴趣的同学到网上了解一下。这篇博文就带着大家一起学习下如何利用Hadoop的MR程序来实现倒排索引的功能。

一、数据准备

1、输入文件数据

这里我们准备三个输入文件,分别如下所示

a.txt

hello tom hello jerry hello tom 

b.txt

hello jerry hello jerry tom jerry 

c.txt

hello jerry hello tom 

2、最终输出文件数据

最终输出文件的结果为:

[plain] view plain copyhello  c.txt-->2 b.txt-->2 a.txt-->3  jerry  c.txt-->1 b.txt-->3 a.txt-->1  tom c.txt-->1 b.txt-->1 a.txt-->2  

二、倒排索引过程分析

根据输入文件数据和最终的输出文件结果可知,此程序需要利用两个MR实现,具体流程可总结归纳如下:

-------------第一步Mapper的输出结果格式如下:-------------------- context.wirte("hello->a.txt", "1") context.wirte("hello->a.txt", "1") context.wirte("hello->a.txt", "1") context.wirte("hello->b.txt", "1") context.wirte("hello->b.txt", "1") context.wirte("hello->c.txt", "1") context.wirte("hello->c.txt", "1") -------------第一步Reducer的得到的输入数据格式如下:------------- <"hello->a.txt", {1,1,1}> <"hello->b.txt", {1,1}> <"hello->c.txt", {1,1}> -------------第一步Reducer的输出数据格式如下--------------------- context.write("hello->a.txt", "3") context.write("hello->b.txt", "2") context.write("hello->c.txt", "2") -------------第二步Mapper得到的输入数据格式如下:----------------- context.write("hello->a.txt", "3") context.write("hello->b.txt", "2") context.write("hello->c.txt", "2") -------------第二步Mapper输出的数据格式如下:-------------------- context.write("hello", "a.txt->3") context.write("hello", "b.txt->2") context.write("hello", "c.txt->2") -------------第二步Reducer得到的输入数据格式如下:----------------- <"hello", {"a.txt->3", "b.txt->2", "c.txt->2"}> -------------第二步Reducer输出的数据格式如下:----------------- context.write("hello", "a.txt->3 b.txt->2 c.txt->2") 最终结果为: hello  a.txt->3 b.txt->2 c.txt->2 

三、程序开发

3.1、第一步MR程序与输入输出

package com.lyz.hdfs.mr.ii; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /**  * 倒排索引第一步Map Reduce程序,此处程序将所有的Map/Reduce/Runner程序放在一个类中  * @author liuyazhuang  *  */ public class InverseIndexStepOne {   /**    * 完成倒排索引第一步的mapper程序    * @author liuyazhuang    *    */   public static class StepOneMapper extends Mapper<LongWritable, Text, Text, LongWritable>{     @Override     protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)         throws IOException, InterruptedException {       //获取一行数据       String line = value.toString();       //切分出每个单词       String[] fields = StringUtils.split(line, " ");       //获取数据的切片信息       FileSplit fileSplit = (FileSplit) context.getInputSplit();       //根据切片信息获取文件名称       String fileName = fileSplit.getPath().getName();       for(String field : fields){         context.write(new Text(field + "-->" + fileName), new LongWritable(1));       }     }   }   /**    * 完成倒排索引第一步的Reducer程序    * 最终输出结果为:    * hello-->a.txt  3     hello-->b.txt  2     hello-->c.txt  2     jerry-->a.txt  1     jerry-->b.txt  3     jerry-->c.txt  1     tom-->a.txt 2     tom-->b.txt 1     tom-->c.txt 1    * @author liuyazhuang    *    */   public static class StepOneReducer extends Reducer<Text, LongWritable, Text, LongWritable>{     @Override     protected void reduce(Text key, Iterable<LongWritable> values,         Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {       long counter = 0;       for(LongWritable value : values){         counter += value.get();       }       context.write(key, new LongWritable(counter));     }   }   //运行第一步的MR程序   public static void main(String[] args) throws Exception{     Configuration conf = new Configuration();     Job job = Job.getInstance(conf);     job.setJarByClass(InverseIndexStepOne.class);     job.setMapperClass(StepOneMapper.class);     job.setReducerClass(StepOneReducer.class);     job.setMapOutputKeyClass(Text.class);     job.setMapOutputValueClass(LongWritable.class);     job.setOutputKeyClass(Text.class);     job.setOutputValueClass(LongWritable.class);     FileInputFormat.addInputPath(job, new Path("D:/hadoop_data/ii"));     FileOutputFormat.setOutputPath(job, new Path("D:/hadoop_data/ii/result"));     job.waitForCompletion(true);   } } 

3.1.1 输入数据

a.txt

hello tom hello jerry hello tom 

b.txt

hello jerry hello jerry tom jerry 

c.txt

hello jerry hello tom 

3.1.2

输出结果:

hello-->a.txt  3 hello-->b.txt  2 hello-->c.txt  2 jerry-->a.txt  1 jerry-->b.txt  3 jerry-->c.txt  1 tom-->a.txt 2 tom-->b.txt 1 tom-->c.txt 1 

3.2 第二步MR程序与输入输出

package com.lyz.hdfs.mr.ii; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /**  * 倒排索引第二步Map Reduce程序,此处程序将所有的Map/Reduce/Runner程序放在一个类中  * @author liuyazhuang  *  */ public class InverseIndexStepTwo {   /**    * 完成倒排索引第二步的mapper程序    *    * 从第一步MR程序中得到的输入信息为:    * hello-->a.txt  3     hello-->b.txt  2     hello-->c.txt  2     jerry-->a.txt  1     jerry-->b.txt  3     jerry-->c.txt  1     tom-->a.txt 2     tom-->b.txt 1     tom-->c.txt 1    * @author liuyazhuang    *    */   public static class StepTwoMapper extends Mapper<LongWritable, Text, Text, Text>{     @Override     protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)         throws IOException, InterruptedException {       String line = value.toString();       String[] fields = StringUtils.split(line, "/t");       String[] wordAndFileName = StringUtils.split(fields[0], "-->");       String word = wordAndFileName[0];       String fileName = wordAndFileName[1];       long counter = Long.parseLong(fields[1]);       context.write(new Text(word), new Text(fileName + "-->" + counter));     }   }   /**    * 完成倒排索引第二步的Reducer程序    * 得到的输入信息格式为:    * <"hello", {"a.txt->3", "b.txt->2", "c.txt->2"}>,    * 最终输出结果如下:    * hello  c.txt-->2 b.txt-->2 a.txt-->3     jerry  c.txt-->1 b.txt-->3 a.txt-->1     tom c.txt-->1 b.txt-->1 a.txt-->2    * @author liuyazhuang    *    */   public static class StepTwoReducer extends Reducer<Text, Text, Text, Text>{     @Override     protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)         throws IOException, InterruptedException {       String result = "";       for(Text value : values){         result += value + " ";       }       context.write(key, new Text(result));     }   }   //运行第一步的MR程序   public static void main(String[] args) throws Exception{     Configuration conf = new Configuration();     Job job = Job.getInstance(conf);     job.setJarByClass(InverseIndexStepTwo.class);     job.setMapperClass(StepTwoMapper.class);     job.setReducerClass(StepTwoReducer.class);     job.setMapOutputKeyClass(Text.class);     job.setMapOutputValueClass(Text.class);     job.setOutputKeyClass(Text.class);     job.setOutputValueClass(Text.class);     FileInputFormat.addInputPath(job, new Path("D:/hadoop_data/ii/result/part-r-00000"));     FileOutputFormat.setOutputPath(job, new Path("D:/hadoop_data/ii/result/final"));     job.waitForCompletion(true);   } } 

3.2.1 输入数据

hello-->a.txt  3 hello-->b.txt  2 hello-->c.txt  2 jerry-->a.txt  1 jerry-->b.txt  3 jerry-->c.txt  1 tom-->a.txt 2 tom-->b.txt 1 tom-->c.txt 1 

3.2.2 输出结果

hello  c.txt-->2 b.txt-->2 a.txt-->3  jerry  c.txt-->1 b.txt-->3 a.txt-->1  tom c.txt-->1 b.txt-->1 a.txt-->2  

总结

以上就是本文关于Hadoop编程基于MR程序实现倒排索引示例的全部内容,希望对大家有所帮助。有什么问题可以直接留言,小编会及时回复大家的。感谢朋友们对本站的支持!


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表