分区程序的作用类似于处理输入数据集的条件.分区阶段发生在Map阶段之后和Reduce阶段之前.
分区器的数量等于reducer的数量.这意味着分区器将根据reducer的数量来划分数据.因此,从单个分区程序传递的数据由单个Reducer处理.
分区程序
分区程序对中间映射的键值对进行分区-outputs.它使用用户定义的条件对数据进行分区,该条件类似于散列函数.分区总数与作业的Reducer任务数相同.让我们举个例子来理解分区器是如何工作的.
MapReduce分区器实现
为了方便起见,我们假设我们有一个小的表名为Employee的表,包含以下数据.我们将使用此示例数据作为输入数据集来演示分区程序的工作原理.
Id | 姓名 | 年龄 | 性别 | 薪水 |
---|---|---|---|---|
1201 | gopal | 45 | 男性 | 50,000 |
1202 | manisha | 40 | 女性 | 50,000 |
1203 | khalil | 34 | 男性 | 30,000 |
1204 | prasanth | 30 | 男性 | 30,000 |
1205 | kiran | 20 | 男 | 40,000 |
1206 | laxmi | 25 | 女性 | 35,000 |
1207 | bhavya | 20 | 女 | 15,000 |
1208 | reshma | 19 | 女性 | 15,000 |
1209 | kranthi | 22 | 男 | 22,000 |
1210 | Satish | 24 | 男性 | 25,000 |
1211 | Krishna | 25 | 男 | 25,000 |
1212 | Arshad | 28 | 男 | 20,000 |
1213 | lavanya | 18 | 女 | 8,000 |
我们必须编写一个应用程序来处理输入数据集,以便在不同年龄组中按性别查找受薪最高的员工(例如,低于20,在21到21之间) 30,ab ove 30).
输入数据
上述数据在input.txt hadoop/hadoopPartitioner"目录并作为输入.
1201 | gopal | 45 | 男性 | 50000 |
1202 | manisha | 40 | 女 | 51000 |
1203 | khaleel | 34 | 男性 | 30000 |
1204 | prasanth | 30 | 男 | 31000 |
1205 | kiran | 20 | 男性 | 40000 |
1206 | laxmi | 25 | 女性 | 35000 |
1207 | bhavya | 20 | 女 | 15000 |
1208 | reshma | 19 | 女性 | 14000 |
1209 | kranthi | 22 | 男 | 22000 |
1210 | Satish | 24 | 男性 | 25000 |
1211 | Krishna | 25 | 男性 | 26000 |
1212 | Arshad | 28 | 男性 | > 20000 |
1213 | lavanya | 18 | 女性 | 8000 |
根据给定的输入,以下是程序的算法说明.
地图任务
当我们在文本文件中包含文本数据时,地图任务接受键值对作为输入.此地图任务的输入如下 :
输入 : 关键是一个模式,如"任何特殊的键&加号; filename + 行号"(例如:key = @ input1),该值将是该行中的数据(例如:value = 1201 \t gopal \t 45 \t Male \t 50000).
方法 : 此地图任务的操作如下 :
读取值(记录数据) ),作为字符串中参数列表的输入值.
使用split函数,将性别和存储分隔为字符串变量.
String[] str = value.toString().split("\t", -3);String gender=str[3];
将性别信息和记录数据值发送为从地图任务输出键值对到分区任务.
context.write(new Text(gender), new Text(value));
对文本文件中的所有记录重复上述所有步骤.
输出 : 您将获得性别数据和记录数据值作为键值对.
分区程序任务
分区程序任务接受键值对从地图任务作为其输入.分区意味着将数据划分为段.根据给定的分区条件标准,输入的键值配对数据可以根据年龄标准分为三个部分.
输入 : 键值对集合中的整个数据.
key =记录中的性别字段值.
value =该性别的整个记录数据值.
方法 : 分区逻辑的过程如下:
从输入键值对中读取年龄字段值.
String[] str = value.toString().split("\t");int age = Integer.parseInt(str[2]);
使用以下条件检查年龄值.
年龄小于或等于20
年龄大于20且小于或等于30.
年龄大于30岁.
if(age<=20){ return 0;}else if(age>20 && age<=30){ return 1 % numReduceTasks;}else{ return 2 % numReduceTasks;}
输出 : 键值对的整个数据被分段为三个键值对集合. Reducer在每个集合上单独工作.
减少任务
分区任务的数量等于减速器任务的数量.这里我们有三个分区任务,因此我们有三个Reducer任务要执行.
输入 : Reducer将使用不同的键值对集合执行三次.
key =记录中的性别字段值.
value =该性别的整个记录数据.
方法 : 以下逻辑将应用于每个集合.
读取每条记录的Salary字段值.
String [] str = val.toString().split("\t", -3);Note: str[4] have the salary field value.
使用max变量检查薪水.如果str [4]是最高工资,则将str [4]指定为max,否则跳过该步骤.
if(Integer.parseInt(str[4])>max){ max=Integer.parseInt(str[4]);}
为每个密钥集合重复步骤1和2(男性和放大器;女性是关键的收藏品).执行这三个步骤后,您将从男性密钥集合中找到一个最高工资,从女性密钥集合中找到一个最高工资.
context.write(new Text(key),new IntWritable(max));
输出 : 最后,您将在三个不同年龄组的集合中获得一组键值对数据.它包含Male集合的最高工资和每个年龄组中Female集合的最高工资.
执行Map,Partitioner和Reduce任务后,三个集合键值对数据存储在三个不同的文件中作为输出.
所有这三个任务都被视为MapReduce作业.这些工作的以下要求和规格应在配置和减号中指定;
工作名称
键和值的输入和输出格式
Map,Reduce和Partitioner任务的各个类
Configuration conf = getConf();//Create JobJob job = new Job(conf, "topsal");job.setJarByClass(PartitionerExample.class);// File Input and Output pathsFileInputFormat.setInputPaths(job, new Path(arg[0]));FileOutputFormat.setOutputPath(job,new Path(arg[1]));//Set Mapper class and Output format for key-value pair.job.setMapperClass(MapClass.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//set partitioner statementjob.setPartitionerClass(CaderPartitioner.class);//Set Reducer class and Input/Output format for key-value pair.job.setReducerClass(ReduceClass.class);//Number of Reducer tasks.job.setNumReduceTasks(3);//Input and Output format for datajob.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);
示例程序
以下程序显示如何在MapReduce程序中为给定条件实现分区程序./p>
package partitionerexample;import java.io.*;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.fs.*;import org.apache.hadoop.mapreduce.lib.input.*;import org.apache.hadoop.mapreduce.lib.output.*;import org.apache.hadoop.util.*;public class PartitionerExample extends Configured implements Tool{ //Map class public static class MapClass extends Mapper{ public void map(LongWritable key, Text value, Context context) { try{ String[] str = value.toString().split("\t", -3); String gender=str[3]; context.write(new Text(gender), new Text(value)); } catch(Exception e) { System.out.println(e.getMessage()); } } } //Reducer class public static class ReduceClass extends Reducer { public int max = -1; public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { max = -1; for (Text val : values) { String [] str = val.toString().split("\t", -3); if(Integer.parseInt(str[4])>max) max=Integer.parseInt(str[4]); } context.write(new Text(key), new IntWritable(max)); } } //Partitioner class public static class CaderPartitioner extends Partitioner < Text, Text > { @Override public int getPartition(Text key, Text value, int numReduceTasks) { String[] str = value.toString().split("\t"); int age = Integer.parseInt(str[2]); if(numReduceTasks == 0) { return 0; } if(age<=20) { return 0; } else if(age>20 && age<=30) { return 1 % numReduceTasks; } else { return 2 % numReduceTasks; } } } @Override public int run(String[] arg) throws Exception { Configuration conf = getConf(); Job job = new Job(conf, "topsal"); job.setJarByClass(PartitionerExample.class); FileInputFormat.setInputPaths(job, new Path(arg[0])); FileOutputFormat.setOutputPath(job,new Path(arg[1])); job.setMapperClass(MapClass.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //set partitioner statement job.setPartitionerClass(CaderPartitioner.class); job.setReducerClass(ReduceClass.class); job.setNumReduceTasks(3); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); System.exit(job.waitForCompletion(true)? 0 : 1); return 0; } public static void main(String ar[]) throws Exception { int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar); System.exit(0); }}
将上述代码保存为"/home/hadoop/"中的 PartitionerExample.java hadoopPartitioner".下面给出了程序的编译和执行.
编译和执行
让我们假设我们在Hadoop用户的主目录中(例如,/home/hadoop).
按照下面给出的步骤编译并执行上述程序.
步骤1 : 下载Hadoop-core-1.2.1.jar,用于编译和执行MapReduce程序.您可以从 mvnrepository.com 下载jar.
让我们假设下载的文件夹是"/home/hadoop/hadoopPartitioner"
第2步 : 以下命令用于编译程序 PartitionerExample.java 并为程序创建jar.
$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java $ jar -cvf PartitionerExample.jar -C.
第3步 : 使用以下命令在HDFS中创建输入目录.
$ HADOOP_HOME/bin/hadoop fs -mkdir input_dir
第4步 : 使用以下命令在HDFS的输入目录中复制名为 input.txt 的输入文件.
$ HADOOP_HOME/bin/hadoop fs -put/home/hadoop/hadoopPartitioner/input.txt input_dir
第5步 : 使用以下命令验证输入目录中的文件.
$ HADOOP_HOME/bin/hadoop fs -ls input_dir/
第6步 : 使用以下命令通过从输入目录中获取输入文件来运行Top salary应用程序.
$ HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir
等待文件执行一段时间.执行后,输出包含许多输入拆分,映射任务和Reducer任务.
15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49File System Counters FILE: Number of bytes read=467 FILE: Number of bytes written=426777 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=480 HDFS: Number of bytes written=72 HDFS: Number of read operations=12 HDFS: Number of large read operations=0 HDFS: Number of write operations=6Job Counters Launched map tasks=1 Launched reduce tasks=3 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=8212 Total time spent by all reduces in occupied slots (ms)=59858 Total time spent by all map tasks (ms)=8212 Total time spent by all reduce tasks (ms)=59858 Total vcore-seconds taken by all map tasks=8212 Total vcore-seconds taken by all reduce tasks=59858 Total megabyte-seconds taken by all map tasks=8409088 Total megabyte-seconds taken by all reduce tasks=61294592Map-Reduce Framework Map input records=13 Map output records=13 Map output bytes=423 Map output materialized bytes=467 Input split bytes=119 Combine input records=0 Combine output records=0 Reduce input groups=6 Reduce shuffle bytes=467 Reduce input records=13 Reduce output records=6 Spilled Records=26 Shuffled Maps =3 Failed Shuffles=0 Merged Map outputs=3 GC time elapsed (ms)=224 CPU time spent (ms)=3690 Physical memory (bytes) snapshot=553816064 Virtual memory (bytes) snapshot=3441266688 Total committed heap usage (bytes)=334102528Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0File Input Format Counters Bytes Read=361File Output Format Counters Bytes Written=72
第7步 : 使用以下命令验证输出文件夹中的结果文件.
$ HADOOP_HOME/bin/hadoop fs -ls output_dir/
您将在三个文件中找到输出,因为您在程序中使用了三个分区器和三个Reducers.
步骤8 : 使用以下命令查看 Part-00000 文件中的输出.此文件由HDFS生成.
$ HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
零件00000中的输出
Female 15000Male 40000
使用以下命令查看 Part-00001 文件中的输出.
$ HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001
部分输出 - 00001
Female 35000Male 31000
使用以下命令查看 Part-00002 文件中的输出.
$ HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002
Part-00002中的输出
Female 51000Male 50000