开发手册 欢迎您!
软件开发者资料库

HCatalog - 输入输出格式

HCatalog输入输出格式 - 从简介,安装,CLI,创建表,更改表,视图,显示表,显示分区,索引,读写器,输入输出格式,加载器和存储器开始学习HCatalog。

HCatInputFormat HCatOutputFormat 接口用于从HDFS读取数据,处理后,使用MapReduce作业将结果数据写入HDFS.让我们详细说明输入和输出格式接口.

HCatInputFormat

HCatInputFormat 与MapReduce作业一起用于读取数据来自HCatalog管理的表格. HCatInputFormat公开了一个Hadoop 0.20 MapReduce API,用于读取数据,就好像它已经发布到表中一样.

Sr.No.方法名称&描述
1

public static HCatInputFormat setInput(Job job, String dbName, String tableName)throws IOException

设置用于作业的输入.它使用给定的输入规范查询Metastore,并将匹配的分区序列化为MapReduce任务的作业配置.

2

public static HCatInputFormat setInput(Configuration conf, String dbName, String tableName) throws IOException

设置用于作业的输入.它使用给定的输入规范查询Metastore,并将匹配的分区序列化为MapReduce任务的作业配置.

3

public HCatInputFormat setFilter(String filter)throws IOException

在输入表上设置过滤器.

4

public HCatInputFormat setProperties(Properties properties) throws IOException

设置输入格式的属性.

HCatInputFormat API包括以下方法 :

  • setInput

  • setOutputSchema

  • getTableSchema

使用 HCatInputFormat 读取数据,首先使用正在读取的表中的必要信息实例化 InputJobInfo ,然后使用 InputJobInfo 调用 setInput .

您可以使用 setOutputSchema 包含投影架构的方法,用于指定输出字段.如果未指定架构,则将返回表中的所有列.您可以使用getTableSchema方法确定指定输入表的表模式.

HCatOutputFormat

HCatOutputFormat与MapReduce作业一起使用以将数据写入HCatalog管理的表格. HCatOutputFormat公开了一个Hadoop 0.20 MapReduce API,用于将数据写入表中.当MapReduce作业使用HCatOutputFormat写入输出时,将使用为表配置的默认OutputFormat,并在作业完成后将新分区发布到表中.

Sr.No.方法名称&描述
1

public static void setOutput (Configuration conf, Credentials credentials, OutputJobInfo outputJobInfo) throws IOException

设置要为作业写入的输出的信息.它查询元数据服务器以查找要用于表的StorageHandler.如果分区已经发布,则会抛出错误.

2

public static void setSchema (Configuration conf, HCatSchema schema) throws IOException

设置要写入分区的数据的模式.如果未调用此表,则默认情况下将使用表模式.

3

public RecordWriter , HCatRecord > getRecordWriter (TaskAttemptContext context)throws IOException, InterruptedException

获取作业的记录编写者.它使用StorageHandler的默认OutputFormat来获取记录编写器.

4

public OutputCommitter getOutputCommitter (TaskAttemptContext context) throws IOException, InterruptedException

获取此输出格式的输出提交者.它确保正确提交输出.

HCatOutputFormat API包括以下方法 :

  • setOutput

  • setSchema

  • getTableSchema

HCatOutputFormat上的第一个调用必须是 setOutput ;任何其他调用都会抛出异常,表示输出格式未初始化.

正在写出的数据的模式由 setSchema 方法指定.您必须调用此方法,提供您正在编写的数据模式.如果您的数据与表模式具有相同的模式,则可以使用 HCatOutputFormat.getTableSchema()来获取表模式,然后将其传递给 setSchema()./p>

示例

以下MapReduce程序从一个表中读取数据,该表假设在第二列("第1列")中有一个整数,并且计算它找到的每个不同值的实例数.也就是说,它相当于"选择col1,count(*)来自$ table group by col1; ".

例如,如果值为第二列是{1,1,1,3,3,5},然后程序将产生以下值的输出和计数和减号;

  1,3  3,2  5,1

现在让我们来看一下程序代码 :

import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import org.apache.HCatalog.common.HCatConstants;import org.apache.HCatalog.data.DefaultHCatRecord;import org.apache.HCatalog.data.HCatRecord;import org.apache.HCatalog.data.schema.HCatSchema;import org.apache.HCatalog.mapreduce.HCatInputFormat;import org.apache.HCatalog.mapreduce.HCatOutputFormat;import org.apache.HCatalog.mapreduce.InputJobInfo;import org.apache.HCatalog.mapreduce.OutputJobInfo;public class GroupByAge extends Configured implements Tool {   public static class Map extends Mapper {      int age;      @Override      protected void map(         WritableComparable key, HCatRecord value,         org.apache.hadoop.mapreduce.Mapper.Context context      )throws IOException, InterruptedException {         age = (Integer) value.get(1);         context.write(new IntWritable(age), new IntWritable(1));      }   }   public static class Reduce extends Reducer {      @Override      protected void reduce(         IntWritable key, java.lang.Iterable values,         org.apache.hadoop.mapreduce.Reducer.Context context      )throws IOException ,InterruptedException {         int sum = 0;         Iterator iter = values.iterator();         while (iter.hasNext()) {            sum++;            iter.next();         }         HCatRecord record = new DefaultHCatRecord(2);         record.set(0, key.get());         record.set(1, sum);         context.write(null, record);      }   }   public int run(String[] args) throws Exception {      Configuration conf = getConf();      args = new GenericOptionsParser(conf, args).getRemainingArgs();      String serverUri = args[0];      String inputTableName = args[1];      String outputTableName = args[2];      String dbName = null;      String principalID = System      .getProperty(HCatConstants.HCAT_METASTORE_PRINCIPAL);      if (principalID != null)      conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID);      Job job = new Job(conf, "GroupByAge");      HCatInputFormat.setInput(job, InputJobInfo.create(dbName, inputTableName, null));      // initialize HCatOutputFormat      job.setInputFormatClass(HCatInputFormat.class);      job.setJarByClass(GroupByAge.class);      job.setMapperClass(Map.class);      job.setReducerClass(Reduce.class);      job.setMapOutputKeyClass(IntWritable.class);      job.setMapOutputValueClass(IntWritable.class);      job.setOutputKeyClass(WritableComparable.class);      job.setOutputValueClass(DefaultHCatRecord.class);      HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null));      HCatSchema s = HCatOutputFormat.getTableSchema(job);      System.err.println("INFO: output schema explicitly set for writing:" + s);      HCatOutputFormat.setSchema(job, s);      job.setOutputFormatClass(HCatOutputFormat.class);      return (job.waitForCompletion(true) ? 0 : 1);   }   public static void main(String[] args) throws Exception {      int exitCode = ToolRunner.run(new GroupByAge(), args);      System.exit(exitCode);   }}

在编译上述程序之前,你必须下载一些 jars 并添加这些到此应用程序的类路径.你需要下载所有的Hive jar和HCatalog jar(HCatalog-core-0.5.0.jar,hive-metastore-0.10.0.jar,libthrift-0.7.0.jar,hive-exec-0.10.0.jar, libfb303-0.7.0.jar,jdo2-api-2.3-ec.jar,slf4j-api-1.6.1.jar).

使用以下命令复制那些 jar 文件从本地 HDFS 并将其添加到类路径.

bin/hadoop fs -copyFromLocal $HCAT_HOME/share/HCatalog/HCatalog-core-0.5.0.jar /tmpbin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-metastore-0.10.0.jar /tmpbin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libthrift-0.7.0.jar /tmpbin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-exec-0.10.0.jar /tmpbin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libfb303-0.7.0.jar /tmpbin/hadoop fs -copyFromLocal $HIVE_HOME/lib/jdo2-api-2.3-ec.jar /tmpbin/hadoop fs -copyFromLocal $HIVE_HOME/lib/slf4j-api-1.6.1.jar /tmpexport LIB_JARS=hdfs:///tmp/HCatalog-core-0.5.0.jar,hdfs:///tmp/hive-metastore-0.10.0.jar,hdfs:///tmp/libthrift-0.7.0.jar,hdfs:///tmp/hive-exec-0.10.0.jar,hdfs:///tmp/libfb303-0.7.0.jar,hdfs:///tmp/jdo2-api-2.3-ec.jar,hdfs:///tmp/slf4j-api-1.6.1.jar

使用以下命令编译并执行给定程序.

$ HADOOP_HOME/bin/hadoop jar GroupByAge tmp/hive

现在,检查输出目录(hdfs:user/tmp/hive)以获取输出(part_0000,part_0001).