Kafka Connector for Presto允许使用Presto从Apache Kafka访问数据.
先决条件
下载并安装以下版本的最新版本Apache项目.
Apache ZooKeeper
Apache Kafka
启动ZooKeeper
使用以下命令启动ZooKeeper服务器.
$ bin/zookeeper-server-start.sh config/zookeeper.properties
现在,ZooKeeper在2181上启动端口.
启动Kafka
使用以下命令在另一个终端启动Kafka.
$ bin/kafka-server-start.sh config/server.properties
kafka启动后,它使用端口号9092.
TPCH数据
下载tpch-kafka
$ curl -o kafka-tpch https://repo1.maven.org/maven2/de/softwareforge/kafka_tpch_0811/1.0/kafka_tpch_ 0811-1.0.sh
现在您已使用上述命令从Maven central下载了加载程序.您将得到类似的响应如下.
% Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 0 0 0 0 0 0 0 0 --:--:-- 0:00:01 --:--:-- 0 5 21.6M 5 1279k 0 0 83898 0 0:04:30 0:00:15 0:04:15 129k 6 21.6M 6 1407k 0 0 86656 0 0:04:21 0:00:16 0:04:05 131k 24 21.6M 24 5439k 0 0 124k 0 0:02:57 0:00:43 0:02:14 175k 24 21.6M 24 5439k 0 0 124k 0 0:02:58 0:00:43 0:02:15 160k 25 21.6M 25 5736k 0 0 128k 0 0:02:52 0:00:44 0:02:08 181k ………………………..
然后,使用以下命令使其可执行,
$ chmod 755 kafka-tpch
运行tpch-kafka
使用以下命令运行kafka-tpch程序以使用tpch数据预加载许多主题.
查询
$ ./kafka-tpch load --brokers localhost:9092 --prefix tpch. --tpch-type tiny
结果
2016-07-13T16:15:52.083+0530 INFO main io.airlift.log.Logging Logging to stderr2016-07-13T16:15:52.124+0530 INFO main de.softwareforge.kafka.LoadCommandProcessing tables: [customer, orders, lineitem, part, partsupp, supplier,nation, region]2016-07-13T16:15:52.834+0530 INFO pool-1-thread-1de.softwareforge.kafka.LoadCommand Loading table 'customer' into topic 'tpch.customer'...2016-07-13T16:15:52.834+0530 INFO pool-1-thread-2de.softwareforge.kafka.LoadCommand Loading table 'orders' into topic 'tpch.orders'...2016-07-13T16:15:52.834+0530 INFO pool-1-thread-3de.softwareforge.kafka.LoadCommand Loading table 'lineitem' into topic 'tpch.lineitem'...2016-07-13T16:15:52.834+0530 INFO pool-1-thread-4de.softwareforge.kafka.LoadCommand Loading table 'part' into topic 'tpch.part'...……………………………………………….
现在,使用tpch加载Kafka表客户,订单,供应商等.
添加配置设置
让我们在Presto服务器上添加以下Kafka连接器配置设置.
connector.name = kafka kafka.nodes = localhost:9092 kafka.table-names = tpch.customer,tpch.orders,tpch.lineitem,tpch.part,tpch.partsupp, tpch.supplier,tpch.nation,tpch.region kafka.hide-internal-columns = false
在在上面的配置中,使用Kafka-tpch程序加载Kafka表.
启动Presto CLI
使用以下命令启动Presto CLI,
$ ./presto --server localhost:8080 --catalog kafka —schema tpch;
这里"tpch"是Kafka连接器的架构,您将收到如下响应.
presto:tpch>
列表表
以下查询列出了"tpch"架构中的所有表格.
查询
presto:tpch> show tables;
结果
Table ---------- customer lineitem nation orders part partsupp region supplier
描述客户表
以下查询描述"customer" table.
查询
presto:tpch> describe customer;
结果
Column | Type | Comment -------------------+---------+--------------------------------------------- _partition_id | bigint | Partition Id _partition_offset | bigint | Offset for the message within the partition _segment_start | bigint | Segment start offset _segment_end | bigint | Segment end offset _segment_count | bigint | Running message count per segment _key | varchar | Key text _key_corrupt | boolean | Key data is corrupt _key_length | bigint | Total number of key bytes _message | varchar | Message text _message_corrupt | boolean | Message data is corrupt _message_length | bigint | Total number of message bytes