开发手册 欢迎您!
软件开发者资料库

Apache Kafka - 与Spark集成

Apache Kafka与Spark集成 - 从简介,基础知识,集群架构,工作流程,安装步骤,基本操作,简单生产者示例,消费者组示例,与Storm集成,与Spark集成,实时应用程序(Twitter),学习Apache kafka,工具,应用程序。

在本章中,我们将讨论如何将Apache Kafka与Spark Streaming API集成.

关于Spark

启用Spark Streaming API实时数据流的可扩展,高吞吐量,容错流处理.数据可以从Kafka,Flume,Twitter等许多来源获取,并且可以使用复杂的算法进行处理,例如map,reduce,join和window等高级函数.最后,处理后的数据可以推送到文件系统,数据库和实时仪表板.弹性分布式数据集(RDD)是Spark的基本数据结构.它是一个不可变的分布式对象集合. RDD中的每个数据集都分为逻辑分区,可以在集群的不同节点上计算.

与Spark集成

Kafka是一个潜力Spark流媒体的消息传递和集成平台. Kafka充当实时数据流的中心枢纽,并使用Spark Streaming中的复杂算法进行处理.处理完数据后,Spark Streaming可能会将结果发布到HDFS,数据库或仪表板中的另一个Kafka主题或商店中.下图描绘了概念流程.

与Spark集成

现在,让我们详细了解Kafka-Spark API.

SparkConf API

它代表Spark应用程序的配置.用于将各种Spark参数设置为键值对.

类具有以下方法 :

  • set(字符串键,字符串值) : 设置配置变量.

  • 删除(字符串键) : 从配置中删除密钥.

  • setAppName(字符串名称) : 为您的应用程序设置应用程序名称.

  • get(字符串键) : 获取密钥

StreamingContext API

这是Spark功能的主要入口点. SparkContext表示与Spark群集的连接,可用于在群集上创建RDD,累加器和广播变量.签名定义如下所示.

public StreamingContext(String master, String appName, Duration batchDuration,    String sparkHome, scala.collection.Seq jars,    scala.collection.Map environment)

  • master : 要连接的集群URL(例如,mesos://host:port,spark://host:port,local [4]).

  • appName : 作业名称,显示在群集Web UI上

  • batchDuration : 流数据将被分成批次的时间间隔

public StreamingContext(SparkConf conf ,Duration batchDuration)

通过提供新SparkContext所需的配置来创建StreamingContext.

  • conf :  Spark参数

  • batchDuration : 流数据将被分成批次的时间间隔

KafkaUtils API

KafkaUtils API用于将Kafka群集连接到Spark流.此API具有如下定义的重要方法签名.

public static ReceiverInputDStream> createStream(   StreamingContext ssc, String zkQuorum, String groupId,   scala.collection.immutable.Map topics, StorageLevel storageLevel)

上面显示的方法用于创建从Kafka Brokers提取消息的输入流.

  • ssc :  StreamingContext对象.

  • zkQuorum :  Zookeeper法定人数.

  • groupId : 此消费者的群组ID.

  • 主题 : 返回要使用的主题地图.

  • storageLevel : 用于存储接收对象的存储级别.

KafkaUtils API有另一种方法createDirectStream,用于直接创建输入流在不使用任何接收器的情况下从Kafka Brokers提取消息.此流可以保证来自Kafka的每条消息都只包含在转换中一次.

示例应用程序在Scala中完成.要编译应用程序,请下载并安装,scala构建工具(类似于maven).主要应用程序代码如下所示.

import java.util.HashMapimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}import org.apache.spark.SparkConfimport org.apache.spark.streaming._import org.apache.spark.streaming.kafka._object KafkaWordCount {   def main(args: Array[String]) {      if (args.length < 4) {         System.err.println("Usage: KafkaWordCount   ")         System.exit(1)      }      val Array(zkQuorum, group, topics, numThreads) = args      val sparkConf = new SparkConf().setAppName("KafkaWordCount")      val ssc = new StreamingContext(sparkConf, Seconds(2))      ssc.checkpoint("checkpoint")      val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap      val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)      val words = lines.flatMap(_.split(" "))      val wordCounts = words.map(x => (x, 1L))         .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)      wordCounts.print()      ssc.start()      ssc.awaitTermination()   }}

构建脚本

spark-kafka集成取决于火花,火花流和火花Kafka集成罐.创建一个新文件并指定应用程序详细信息及其依赖项. 将在编译和打包应用程序时下载必要的jar.

name := "Spark Kafka Project"version := "1.0"scalaVersion := "2.10.5"libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"

编译/打包

运行以下命令编译并打包应用程序的jar文件.我们需要将jar文件提交到spark控制台来运行应用程序.

 sbt package

提交给Spark

启动Kafka Producer CLI(在前一章中解释),创建一个名为的新主题并提供一些示例消息,如下所示.

Another spark test message

运行以下命令将应用程序提交到spark控制台.

/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark-kafka-project_2.10-1.0.jar localhost:2181   

此应用程序的示例输出如下所示.

spark console messages ..(Test,1)(spark,1)(another,1)(message,1)spark console message ..