开发手册 欢迎您!
软件开发者资料库

MapReduce - 组合器

MapReduce组合器 - 从简介,算法,安装,API,Hadoop中的实现,分区器,组合器,Hadoop管理开始,简单易学地学习MapReduce。

组合器,也称为半减速器,是一个可选类,它通过接受Map类的输入,然后将输出键值对传递给Reducer类来操作./p>

组合器的主要功能是用相同的键来汇总地图输出记录.组合器的输出(键值集合)将通过网络发送到实际的Reducer任务作为输入.

组合器

组合器在Map类和Reduce类之间使用class来减少Map和Reduce之间的数据传输量.通常,map任务的输出很大,传输到reduce任务的数据很高.

以下MapReduce任务图显示了COMBINER PHASE.

Combiner

Combiner如何工作?

这是一个简要介绍关于MapReduce Combiner如何工作和减去的总结;

  • 组合器没有预定义的接口,它必须实现Reducer接口的减少()方法.

  • 组合器对每个地图输出键进行操作.它必须具有与Reducer类相同的输出键值类型.

  • 组合器可以从大型数据集生成摘要信息,因为它替换了原始Map输出.

虽然,Combiner是可选的,但它有助于将数据分成多个组以进行Reduce阶段,这使得处理更容易.

MapReduce组合器实现

以下示例提供了有关组合器的理论概念.假设我们有一个名为 input.txt 的输入文本文件用于MapReduce.

What do you mean by ObjectWhat do you know about JavaWhat is Java Virtual MachineHow Java enabled High Performance

重要阶段下面讨论使用Combiner的MapReduce程序.

记录阅读器

这是MapReduce的第一个阶段,其中记录阅读器从中读取每一行将文本文件作为文本输入,并将输出作为键值对.

输入 : 输入文件中的逐行文字.

输出 : 形成键值对.以下是预期键值对的集合.

<1, What do you mean by Object><2, What do you know about Java><3, What is Java Virtual Machine><4, How Java enabled High Performance>

地图阶段

地图阶段从记录阅读器获取输入,处理它,并产生另一个输出一组键值对.

输入 : 以下键值对是从记录阅读器中获取的输入.

<1, What do you mean by Object><2, What do you know about Java><3, What is Java Virtual Machine><4, How Java enabled High Performance>

Map阶段读取每个键值对,使用StringTokenizer将每个单词与值分开,将每个单词视为键,将该单词的计数视为值.以下代码片段显示了Mapper类和map函数.

public static class TokenizerMapper extends Mapper{   private final static IntWritable one = new IntWritable(1);   private Text word = new Text();      public void map(Object key, Text value, Context context) throws IOException, InterruptedException    {      StringTokenizer itr = new StringTokenizer(value.toString());      while (itr.hasMoreTokens())       {         word.set(itr.nextToken());         context.write(word, one);      }   }}

输出 : 预期输出如下&

                  

组合器阶段

组合器阶段从Map阶段获取每个键值对,处理它并生成输出为键值集合对.

输入 : 以下键值对是从Map阶段获取的输入.

                  

组合器阶段读取每个键值对,将常用词组合为键,将值组合为集合.通常,Combiner的代码和操作类似于Reducer的代码和操作.以下是Mapper,Combiner和Reducer类声明的代码片段.

 job.setMapperClass(TokenizerMapper.class);  job.setCombinerClass(IntSumReducer.class);  job.setReducerClass(IntSumReducer.class);

输出 : 预期输出如下<

            

Reducer阶段

Reducer阶段从Combiner阶段获取每个键值集合对,处理它,以及将输出作为键值对传递.请注意,Combiner功能与Reducer相同.

输入 : 以下键值对是从组合器阶段获得的输入.

            

Reducer阶段读取每个键值对.以下是Combiner的代码片段.

public static class IntSumReducer extends Reducer {   private IntWritable result = new IntWritable();      public void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException    {      int sum = 0;      for (IntWritable val : values)       {         sum += val.get();      }      result.set(sum);      context.write(key, result);   }}

输出 :  Reducer阶段的预期输出如下:<

            

Record Writer

这是MapReduce的最后一个阶段,其中Record Writer从中写入每个键值对Reducer阶段并以文本形式发送输出.

输入 :  Reducer阶段的每个键值对以及输出格式.

输出 : 它为您提供文本格式的键值对.以下是预期的输出.

What           3do             2you            2mean           1by             1Object         1know           1about          1Java           3is             1Virtual        1Machine        1How            1enabled        1High           1Performance    1

示例程序

以下代码块计算程序中的单词数.

import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordCount {   public static class TokenizerMapper extends Mapper   {      private final static IntWritable one = new IntWritable(1);      private Text word = new Text();            public void map(Object key, Text value, Context context) throws IOException, InterruptedException       {         StringTokenizer itr = new StringTokenizer(value.toString());         while (itr.hasMoreTokens())          {            word.set(itr.nextToken());            context.write(word, one);         }      }   }      public static class IntSumReducer extends Reducer    {      private IntWritable result = new IntWritable();      public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException       {         int sum = 0;         for (IntWritable val : values)          {            sum += val.get();         }         result.set(sum);         context.write(key, result);      }   }      public static void main(String[] args) throws Exception    {      Configuration conf = new Configuration();      Job job = Job.getInstance(conf, "word count");      job.setJarByClass(WordCount.class);      job.setMapperClass(TokenizerMapper.class);      job.setCombinerClass(IntSumReducer.class);      job.setReducerClass(IntSumReducer.class);      job.setOutputKeyClass(Text.class);      job.setOutputValueClass(IntWritable.class);      FileInputFormat.addInputPath(job, new Path(args[0]));      FileOutputFormat.setOutputPath(job, new Path(args[1]));      System.exit(job.waitForCompletion(true) ? 0 : 1);   }}

将上述程序保存为 WordCount.java .下面给出了程序的编译和执行.

编译和执行

让我们假设我们在Hadoop用户的主目录中(对于例如,/home/hadoop).

按照下面给出的步骤编译并执行上述程序.

步骤1 : 使用以下命令创建一个目录来存储已编译的java类.

  $ mkdir units

第2步 : 下载Hadoop-core-1.2.1.jar,用于编译和执行MapReduce程序.您可以从 mvnrepository.com 下载jar.

让我们假设下载的文件夹是/home/hadoop/.

第3步 : 使用以下命令编译 WordCount.java 程序并为程序创建一个jar.

  $ javac -classpath hadoop-core-1.2.1.jar -d units WordCount.java  $ jar -cvf units.jar -C units/.

第4步 : 使用以下命令在HDFS中创建输入目录.

  $ HADOOP_HOME/bin/hadoop fs -mkdir input_dir

第5步 : 使用以下命令在HDFS的输入目录中复制名为 input.txt 的输入文件.

  $ HADOOP_HOME/bin/hadoop fs -put/home/hadoop/input.txt input_dir

第6步 : 使用以下命令验证输入目录中的文件.

  $ HADOOP_HOME/bin/hadoop fs -ls input_dir/

第7步 : 使用以下命令通过从输入目录获取输入文件来运行Word计数应用程序.

  $ HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

等待文件执行一段时间.执行后,输出包含许多输入拆分,Map任务和Reducer任务.

步骤8 : 使用以下命令验证输出文件夹中的结果文件.

  $ HADOOP_HOME/bin/hadoop fs -ls output_dir/

第9步 : 使用以下命令查看 Part-00000 文件中的输出.此文件由HDFS生成.

  $ HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

以下是MapReduce程序生成的输出.

What           3do             2you            2mean           1by             1Object         1know           1about          1Java           3is             1Virtual        1Machine        1How            1enabled        1High           1Performance    1