Kafka+SparkStreaming--实时流处理实践

kafka 最初由 Linkedin 公司开发,使用 Scala 语言编写,是一个分布式、分区、多副本、多订阅者的日志系统。 Spark Streaming 是大规模流式数据处理的新贵,将流式计算分解成一系列短小的批处理作业。由于 SparkStreaming 对网络流方式的支持, SparkStreaming+Kafka 的日志流式处理方式已越来越普遍。本文将讲解如何利用两者搭建出一套实时流处理系统,并详细列出实际搭建过程中遇到的问题及解决方式。

准备工作

启动集群

  • 启动 zookeeper

进入 zookeeper HOME 目录,执行 $ bin/zkServer.sh start 启动 zookeeper。

  • 启动 kafka server

进入 kafka HOME 目录,执行

1
$ bin/kafka-server-start.sh config/server.properties
  • 创建 topic
1
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkaTest
  • 检查 topic 是否创建成功
1
$ bin/kafka-topics.sh --list --zookeeper localhost:2181

返回 kafkaTest ,正常。

  • 启动 kafka producer ,生产信息
1
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafkaTest

broker-list 是刚刚启动的 kafka server 的机器的配置,本文本机测试选择localhost,启动的server的端口配置为9092,如果启动了多个broker,则依次填写,例如:

1
$ bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic kafkaTest
  • 启动 kafka consumer ,测试消费信息是否正常
1
$ bin/kafka-console-consumer.sh --zookeeper 10.3.242.35:2181 --from-beginning --topic kafkaTest

接收正常,接下来可以开始写 spark 应用程序了。

编写并调试 spark 程序

spark 官方提供了与 kafka 结合的 demo ,具体内容可见 https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala

本文为了联调测试方便,适当改写,同样来测试一个wordcount的例子。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.buptwtist.cristo
import java.util.Properties
import kafka.producer._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
object KafkaWordCount {
def main(args: Array[String]) {
val zkQuorum = "10.3.242.35:2181"
val group = "test-consumer-group"
val topics = "kafkaTest"
val numThreads = 2
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_,numThreads)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}

其中, val zkQuorum = "10.3.242.35:2181" 是我的 zookeeper 的地址;test-consumer-group 表示 consumer-group 的名称,与$KAFKA_HOME/config/consumer.properties 中的 group.id 的配置内容一致;kafkaTest 表示 topic; 2 表示线程数。一个很简单的 wordcount 的例子,在此就不多做解释了。有问题的话参阅 http://spark.apache.org/docs/latest/streaming-programming-guide.html#linking

在测试这段程序的过程中遇到了一些问题,首先遇到的一个错误是在打包完成 submit 执行的时候报错:

1
Caused by: java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$

经过一番查阅之后发现这是由于下载安装的编译好的 spark 版本里面并不包含 spark-streaming-kafka 包,因此得自己将这些依赖打包到jar文件上传。

Maven有将依赖打包的功能,而我平时使用的是 sbt , sbt package 等命令并不支持将依赖连同打包,这时就要求助于 sbt-assembly 了。关于 sbt-assembly ,其 github 上有详细的使用说明,https://github.com/sbt/sbt-assembly。

如果你平时就用 sbt-assembly 的话那就比较简单了,如果像我一样之前没有用过这个工具的话,还有点小折腾。首先按照 github 上的 README 添加插件就行。配置项目目录下 project 子目录下的 plugins.sbt

1
2
3
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.9.2")

然后按照 README 所述添加 build.sbt 的内容,添加完成之后,执行sbt assembly又报错:

1
[error] (*:assembly) deduplicate: different file contents found in the following:

好在这个错误 README 里面已经提醒很有可能会发生了,需要注意的是除了 README 里面举的关于 spark 的几个 conlifcts 的例子,还有其他 conlifcts ,需要 exclude 其他依赖,下面是多次编译发现冲突、修改冲突之后,我的 build.sbt 的具体内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import AssemblyKeys._
assemblySettings
name:= "KafkaWordCount"
version := "1.0"
scalaVersion := "2.10.4"
mainClass in assembly := Some("com.buptwtist.cristo.KafkaWordCount")
libraryDependencies ++= Seq(
("org.apache.spark" %% "spark-core" % "1.1.0").
exclude("org.mortbay.jetty", "servlet-api").
exclude("commons-beanutils", "commons-beanutils-core").
exclude("commons-collections", "commons-collections").
exclude("commons-collections", "commons-collections").
exclude("com.esotericsoftware.minlog", "minlog").
exclude("org.slf4j", "jcl-over-slf4j").
excludeAll(
ExclusionRule(organization = "org.eclipse.jetty.orbit")
)
)
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.1.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.1.0"
resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
resolvers += "Maven Repository" at "http://repo.maven.apache.org/maven2"

然后 submit

1
2
spark-submit --class com.buptwtist.cristo.KafkaWordCount \
target/scala-2.10/KafkaWordCount-assembly-1.0.jar

终于能顺利运行。

在 producer 输入 message ,控制台周期计算 wordcount 并显示输出。

Kafka + SparkStreaming 运行成功。