kafka 最初由 Linkedin 公司开发,使用 Scala 语言编写,是一个分布式、分区、多副本、多订阅者的日志系统。 Spark Streaming 是大规模流式数据处理的新贵,将流式计算分解成一系列短小的批处理作业。由于 SparkStreaming 对网络流方式的支持, SparkStreaming+Kafka 的日志流式处理方式已越来越普遍。本文将讲解如何利用两者搭建出一套实时流处理系统,并详细列出实际搭建过程中遇到的问题及解决方式。
准备工作
- 安装 zookeeper
- 安装 kafka
- 安装 spark
kafka 集群的搭建及配置见上一篇文章: Kafka 环境搭建与测试
启动集群
进入 zookeeper HOME
目录,执行 $ bin/zkServer.sh start
启动 zookeeper。
进入 kafka HOME
目录,执行
1
| $ bin/kafka-server-start.sh config/server.properties
|
1
| $ bin/kafka-topics.sh --list --zookeeper localhost:2181
|
返回 kafkaTest
,正常。
1
| $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafkaTest
|
broker-list 是刚刚启动的 kafka server 的机器的配置,本文本机测试选择localhost,启动的server的端口配置为9092,如果启动了多个broker,则依次填写,例如:
1
| $ bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic kafkaTest
|
- 启动 kafka consumer ,测试消费信息是否正常
接收正常,接下来可以开始写 spark 应用程序了。
编写并调试 spark 程序
spark 官方提供了与 kafka 结合的 demo ,具体内容可见 https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
本文为了联调测试方便,适当改写,同样来测试一个wordcount的例子。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package com.buptwtist.cristo import java.util.Properties import kafka.producer._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka._ import org.apache.spark.SparkConf object KafkaWordCount { def main(args: Array[String]) { val zkQuorum = "10.3.242.35:2181" val group = "test-consumer-group" val topics = "kafkaTest" val numThreads = 2 val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(5)) ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_,numThreads)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } }
|
其中, val zkQuorum = "10.3.242.35:2181"
是我的 zookeeper 的地址;test-consumer-group
表示 consumer-group
的名称,与$KAFKA_HOME/config/consumer.properties
中的 group.id
的配置内容一致;kafkaTest
表示 topic; 2 表示线程数。一个很简单的 wordcount 的例子,在此就不多做解释了。有问题的话参阅 http://spark.apache.org/docs/latest/streaming-programming-guide.html#linking 。
在测试这段程序的过程中遇到了一些问题,首先遇到的一个错误是在打包完成 submit 执行的时候报错:
1
| Caused by: java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$
|
经过一番查阅之后发现这是由于下载安装的编译好的 spark 版本里面并不包含 spark-streaming-kafka 包,因此得自己将这些依赖打包到jar文件上传。
Maven有将依赖打包的功能,而我平时使用的是 sbt , sbt package
等命令并不支持将依赖连同打包,这时就要求助于 sbt-assembly 了。关于 sbt-assembly ,其 github 上有详细的使用说明,https://github.com/sbt/sbt-assembly。
如果你平时就用 sbt-assembly 的话那就比较简单了,如果像我一样之前没有用过这个工具的话,还有点小折腾。首先按照 github 上的 README 添加插件就行。配置项目目录下 project 子目录下的 plugins.sbt
1 2 3
| resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.9.2")
|
然后按照 README 所述添加 build.sbt 的内容,添加完成之后,执行sbt assembly
又报错:
1
| [error] (*:assembly) deduplicate: different file contents found in the following:
|
好在这个错误 README 里面已经提醒很有可能会发生了,需要注意的是除了 README 里面举的关于 spark 的几个 conlifcts 的例子,还有其他 conlifcts ,需要 exclude 其他依赖,下面是多次编译发现冲突、修改冲突之后,我的 build.sbt 的具体内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| import AssemblyKeys._ assemblySettings name:= "KafkaWordCount" version := "1.0" scalaVersion := "2.10.4" mainClass in assembly := Some("com.buptwtist.cristo.KafkaWordCount") libraryDependencies ++= Seq( ("org.apache.spark" %% "spark-core" % "1.1.0"). exclude("org.mortbay.jetty", "servlet-api"). exclude("commons-beanutils", "commons-beanutils-core"). exclude("commons-collections", "commons-collections"). exclude("commons-collections", "commons-collections"). exclude("com.esotericsoftware.minlog", "minlog"). exclude("org.slf4j", "jcl-over-slf4j"). excludeAll( ExclusionRule(organization = "org.eclipse.jetty.orbit") ) ) libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.1.0" libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.1.0" resolvers += "Akka Repository" at "http://repo.akka.io/releases/" resolvers += "Maven Repository" at "http://repo.maven.apache.org/maven2"
|
然后 submit
1 2
| spark-submit --class com.buptwtist.cristo.KafkaWordCount \ target/scala-2.10/KafkaWordCount-assembly-1.0.jar
|
终于能顺利运行。
在 producer 输入 message ,控制台周期计算 wordcount 并显示输出。
Kafka + SparkStreaming 运行成功。