开发手册 欢迎您!
软件开发者资料库

MapReduce - 分区程序

MapReduce分区程序 - 从简介,算法,安装,API,Hadoop中的实现,分区程序,组合器,Hadoop管理开始,简单易学地学习MapReduce。

分区程序的作用类似于处理输入数据集的条件.分区阶段发生在Map阶段之后和Reduce阶段之前.

分区器的数量等于reducer的数量.这意味着分区器将根据reducer的数量来划分数据.因此,从单个分区程序传递的数据由单个Reducer处理.

分区程序

分区程序对中间映射的键值对进行分区-outputs.它使用用户定义的条件对数据进行分区,该条件类似于散列函数.分区总数与作业的Reducer任务数相同.让我们举个例子来理解分区器是如何工作的.

MapReduce分区器实现

为了方便起见,我们假设我们有一个小的表名为Employee的表,包含以下数据.我们将使用此示例数据作为输入数据集来演示分区程序的工作原理.

Id姓名年龄性别薪水
1201gopal45男性50,000
1202manisha40女性50,000
1203khalil34男性30,000
1204prasanth30男性30,000
1205kiran2040,000
1206laxmi25女性35,000
1207bhavya2015,000
1208reshma19女性15,000
1209kranthi2222,000
1210Satish24男性25,000
1211Krishna2525,000
1212Arshad2820,000
1213lavanya188,000

我们必须编写一个应用程序来处理输入数据集,以便在不同年龄组中按性别查找受薪最高的员工(例如,低于20,在21到21之间) 30,ab ove 30).

输入数据

上述数据在input.txt  hadoop/hadoopPartitioner"目录并作为输入.

1201gopal45男性50000
1202manisha4051000
1203khaleel34男性30000
1204prasanth3031000
1205kiran20男性40000
1206laxmi25女性35000
1207bhavya2015000
1208reshma19女性14000
1209kranthi2222000
1210Satish24男性25000
1211Krishna25男性26000
1212Arshad28男性> 20000
1213lavanya18女性8000

根据给定的输入,以下是程序的算法说明.

地图任务

当我们在文本文件中包含文本数据时,地图任务接受键值对作为输入.此地图任务的输入如下 :

输入 : 关键是一个模式,如"任何特殊的键&加号; filename + 行号"(例如:key = @ input1),该值将是该行中的数据(例如:value = 1201 \t gopal \t 45 \t Male \t 50000).

方法 : 此地图任务的操作如下 :

  • 读取(记录数据) ),作为字符串中参数列表的输入值.

  • 使用split函数,将性别和存储分隔为字符串变量.

String[] str = value.toString().split("\t", -3);String gender=str[3];

  • 将性别信息和记录数据发送为从地图任务输出键值对到分区任务.

 context.write(new Text(gender), new Text(value));

  • 对文本文件中的所有记录重复上述所有步骤.

输出 : 您将获得性别数据和记录数据值作为键值对.

分区程序任务

分区程序任务接受键值对从地图任务作为其输入.分区意味着将数据划分为段.根据给定的分区条件标准,输入的键值配对数据可以根据年龄标准分为三个部分.

输入 : 键值对集合中的整个数据.

key =记录中的性别字段值.

value =该性别的整个记录数据值.

方法 : 分区逻辑的过程如下:

  • 从输入键值对中读取年龄字段值.

String[] str = value.toString().split("\t");int age = Integer.parseInt(str[2]);

  • 使用以下条件检查年龄值.

    • 年龄小于或等于20

    • 年龄大于20且小于或等于30.

    • 年龄大于30岁.

if(age<=20){   return 0;}else if(age>20 && age<=30){   return 1 % numReduceTasks;}else{   return 2 % numReduceTasks;}

输出 : 键值对的整个数据被分段为三个键值对集合. Reducer在每个集合上单独工作.

减少任务

分区任务的数量等于减速器任务的数量.这里我们有三个分区任务,因此我们有三个Reducer任务要执行.

输入 :  Reducer将使用不同的键值对集合执行三次.

key =记录中的性别字段值.

value =该性别的整个记录数据.

方法 : 以下逻辑将应用于每个集合.

  • 读取每条记录的Salary字段值.

String [] str = val.toString().split("\t", -3);Note: str[4] have the salary field value.

  • 使用max变量检查薪水.如果str [4]是最高工资,则将str [4]指定为max,否则跳过该步骤.

if(Integer.parseInt(str[4])>max){   max=Integer.parseInt(str[4]);}

  • 为每个密钥集合重复步骤1和2(男性和放大器;女性是关键的收藏品).执行这三个步骤后,您将从男性密钥集合中找到一个最高工资,从女性密钥集合中找到一个最高工资.

context.write(new Text(key),new IntWritable(max));

输出 : 最后,您将在三个不同年龄组的集合中获得一组键值对数据.它包含Male集合的最高工资和每个年龄组中Female集合的最高工资.

执行Map,Partitioner和Reduce任务后,三个集合键值对数据存储在三个不同的文件中作为输出.

所有这三个任务都被视为MapReduce作业.这些工作的以下要求和规格应在配置和减号中指定;

  • 工作名称

  • 键和值的输入和输出格式

  • Map,Reduce和Partitioner任务的各个类

Configuration conf = getConf();//Create JobJob job = new Job(conf, "topsal");job.setJarByClass(PartitionerExample.class);// File Input and Output pathsFileInputFormat.setInputPaths(job, new Path(arg[0]));FileOutputFormat.setOutputPath(job,new Path(arg[1]));//Set Mapper class and Output format for key-value pair.job.setMapperClass(MapClass.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//set partitioner statementjob.setPartitionerClass(CaderPartitioner.class);//Set Reducer class and Input/Output format for key-value pair.job.setReducerClass(ReduceClass.class);//Number of Reducer tasks.job.setNumReduceTasks(3);//Input and Output format for datajob.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);

示例程序

以下程序显示如何在MapReduce程序中为给定条件实现分区程序./p>

package partitionerexample;import java.io.*;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.fs.*;import org.apache.hadoop.mapreduce.lib.input.*;import org.apache.hadoop.mapreduce.lib.output.*;import org.apache.hadoop.util.*;public class PartitionerExample extends Configured implements Tool{   //Map class   public static class MapClass extends Mapper   {      public void map(LongWritable key, Text value, Context context)      {         try{            String[] str = value.toString().split("\t", -3);            String gender=str[3];            context.write(new Text(gender), new Text(value));         }         catch(Exception e)         {            System.out.println(e.getMessage());         }      }   }      //Reducer class   public static class ReduceClass extends Reducer   {      public int max = -1;      public void reduce(Text key, Iterable  values, Context context) throws IOException, InterruptedException      {         max = -1;         for (Text val : values)         {            String [] str = val.toString().split("\t", -3);            if(Integer.parseInt(str[4])>max)            max=Integer.parseInt(str[4]);         }         context.write(new Text(key), new IntWritable(max));      }   }      //Partitioner class   public static class CaderPartitioner extends   Partitioner < Text, Text >   {      @Override      public int getPartition(Text key, Text value, int numReduceTasks)      {         String[] str = value.toString().split("\t");         int age = Integer.parseInt(str[2]);                  if(numReduceTasks == 0)         {            return 0;         }                  if(age<=20)         {            return 0;         }         else if(age>20 && age<=30)         {            return 1 % numReduceTasks;         }         else         {            return 2 % numReduceTasks;         }      }   }      @Override   public int run(String[] arg) throws Exception   {      Configuration conf = getConf();      Job job = new Job(conf, "topsal");      job.setJarByClass(PartitionerExample.class);      FileInputFormat.setInputPaths(job, new Path(arg[0]));      FileOutputFormat.setOutputPath(job,new Path(arg[1]));      job.setMapperClass(MapClass.class);      job.setMapOutputKeyClass(Text.class);      job.setMapOutputValueClass(Text.class);            //set partitioner statement      job.setPartitionerClass(CaderPartitioner.class);      job.setReducerClass(ReduceClass.class);      job.setNumReduceTasks(3);      job.setInputFormatClass(TextInputFormat.class);      job.setOutputFormatClass(TextOutputFormat.class);      job.setOutputKeyClass(Text.class);      job.setOutputValueClass(Text.class);      System.exit(job.waitForCompletion(true)? 0 : 1);      return 0;   }      public static void main(String ar[]) throws Exception   {      int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);      System.exit(0);   }}

将上述代码保存为"/home/hadoop/"中的 PartitionerExample.java hadoopPartitioner".下面给出了程序的编译和执行.

编译和执行

让我们假设我们在Hadoop用户的主目录中(例如,/home/hadoop).

按照下面给出的步骤编译并执行上述程序.

步骤1 : 下载Hadoop-core-1.2.1.jar,用于编译和执行MapReduce程序.您可以从 mvnrepository.com 下载jar.

让我们假设下载的文件夹是"/home/hadoop/hadoopPartitioner"

第2步 : 以下命令用于编译程序 PartitionerExample.java 并为程序创建jar.

  $ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java  $ jar -cvf PartitionerExample.jar -C.

第3步 : 使用以下命令在HDFS中创建输入目录.

  $ HADOOP_HOME/bin/hadoop fs -mkdir input_dir

第4步 : 使用以下命令在HDFS的输入目录中复制名为 input.txt 的输入文件.

  $ HADOOP_HOME/bin/hadoop fs -put/home/hadoop/hadoopPartitioner/input.txt input_dir

第5步 : 使用以下命令验证输入目录中的文件.

  $ HADOOP_HOME/bin/hadoop fs -ls input_dir/

第6步 : 使用以下命令通过从输入目录中获取输入文件来运行Top salary应用程序.

  $ HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir

等待文件执行一段时间.执行后,输出包含许多输入拆分,映射任务和Reducer任务.

15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49File System Counters   FILE: Number of bytes read=467   FILE: Number of bytes written=426777   FILE: Number of read operations=0   FILE: Number of large read operations=0   FILE: Number of write operations=0   HDFS: Number of bytes read=480   HDFS: Number of bytes written=72   HDFS: Number of read operations=12   HDFS: Number of large read operations=0   HDFS: Number of write operations=6Job Counters   Launched map tasks=1   Launched reduce tasks=3   Data-local map tasks=1   Total time spent by all maps in occupied slots (ms)=8212   Total time spent by all reduces in occupied slots (ms)=59858   Total time spent by all map tasks (ms)=8212   Total time spent by all reduce tasks (ms)=59858   Total vcore-seconds taken by all map tasks=8212   Total vcore-seconds taken by all reduce tasks=59858   Total megabyte-seconds taken by all map tasks=8409088   Total megabyte-seconds taken by all reduce tasks=61294592Map-Reduce Framework   Map input records=13   Map output records=13   Map output bytes=423   Map output materialized bytes=467   Input split bytes=119   Combine input records=0   Combine output records=0   Reduce input groups=6   Reduce shuffle bytes=467   Reduce input records=13   Reduce output records=6   Spilled Records=26   Shuffled Maps =3   Failed Shuffles=0   Merged Map outputs=3   GC time elapsed (ms)=224   CPU time spent (ms)=3690   Physical memory (bytes) snapshot=553816064   Virtual memory (bytes) snapshot=3441266688   Total committed heap usage (bytes)=334102528Shuffle Errors   BAD_ID=0   CONNECTION=0   IO_ERROR=0   WRONG_LENGTH=0   WRONG_MAP=0   WRONG_REDUCE=0File Input Format Counters   Bytes Read=361File Output Format Counters   Bytes Written=72

第7步 : 使用以下命令验证输出文件夹中的结果文件.

  $ HADOOP_HOME/bin/hadoop fs -ls output_dir/

您将在三个文件中找到输出,因为您在程序中使用了三个分区器和三个Reducers.

步骤8 : 使用以下命令查看 Part-00000 文件中的输出.此文件由HDFS生成.

  $ HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

零件00000中的输出

Female   15000Male     40000

使用以下命令查看 Part-00001 文件中的输出.

  $ HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001

部分输出 - 00001

Female   35000Male    31000

使用以下命令查看 Part-00002 文件中的输出.

  $ HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002

Part-00002中的输出

Female  51000Male   50000