博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark Streaming和Kafka整合之路(最新版本)
阅读量:6280 次
发布时间:2019-06-22

本文共 2472 字,大约阅读时间需要 8 分钟。

hot3.png

最近完成了Spark Streaming和Kafka的整合工作,耗时虽然不长,但是当中还是遇到了不少的坑,记录下来,大家方便绕行。

先说一下环境:

Spark 2.0.0    kafka_2.11-0.10.0.0

之前的项目当中,已经在pom当中添加了需要的Spark Streaming的依赖,这次只需要添加Spark Streaming Kafka的以来就行了,问题来了。首先是我之前添加的Spark Streaming的依赖:

    <dependency>

      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.0.0</version>
    </dependency>

然后是找到的spark streaming对kafka的支持依赖:

<dependency>

    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.11</artifactId>
    <version>1.6.2</version>
</dependency>

请注意2个version部分,好像差的有点多。不管了,照着例子写写看,果然报了各种class not found的错误。基本可以判断是版本差异造成的问题。

可是,在http://mvnrepository.com上找不到更高版本的依赖怎么办呢?

考虑了一下,只有一个办法了,下载spark源码,自行编译打包需要的jar包。

在github上找到spark项目,clone下来,懒病又犯了,也没仔细看当中的说明,直接就clean compile等等。结果又是各种报错。好吧,好好看看吧,github上给了个地址:http://spark.apache.org/docs/latest/building-spark.html,照着做就没问题了。

然后把项目当中pom里面对streaming kafka的依赖删掉,引入我们自己生成的jar包:

spark-streaming-kafka-0-10_2.11-2.1.0-SNAPSHOT.jar

 

然后贴上代码:

    val conf = new SparkConf().setAppName("kafkastream").setMaster("spark://master:7077").

      set("spark.driver.host", "192.168.1.142").
      setJars(List("/src/git/msgstream/out/artifacts/msgstream_jar/msgstream.jar",
        "/src/git/msgstream/lib/kafka-clients-0.10.0.0.jar",
        "/src/git/msgstream/lib/kafka_2.11-0.10.0.0.jar",
        "/src/git/msgstream/lib/spark-streaming-kafka-0-10_2.11-2.1.0-SNAPSHOT.jar"))
    val ssc = new StreamingContext(conf, Seconds(2))

    val topics = List("woozoom")

    val kafkaParams = Map(("bootstrap.servers", "master:9092,slave01:9092,slave02:9092"),
      ("group.id", "sparkstreaming"), ("key.deserializer", classOf[StringDeserializer]),
      ("value.deserializer", classOf[StringDeserializer]))
    val preferredHosts = LocationStrategies.PreferConsistent
    val offsets = Map(new TopicPartition("woozoom", 0) -> 2L)

    val lines = KafkaUtils.createDirectStream[String, String](

      ssc,
      preferredHosts,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets))

    lines.foreachRDD(rdd => {

      rdd.foreach(x => {
        println(x)
      })
    })

    ssc.start()

    ssc.awaitTermination()

上面标红的部分,是需要注意的,而这些本来我也是不会写的,后来去到spark源码找到test代码

/src/git/spark/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala

测试,通过!!!

 

总结:

1、spark项目很多时候,资源不是很充分,想找例子的话,2个途径,一个spark安装包当中的example但是这个很多时候,版本是比较老的,不是很理想。更好地是从spark源码当中找他的测试用例,这个基本上和你用的最新版本是完全匹配的。

2、编译过很多开源项目,一般大的项目都会有相应的build说明,照着那个做,会为你节省很多时间。

3、从最开始遇到的版本号的问题来看,很多时候我们遇到的问题并不一定是我们自己的问题,不迷信,大胆的相信自己的推测,非常有助于问题的解决。

转载于:https://my.oschina.net/dongtianxi/blog/748590

你可能感兴趣的文章
Apache kafka 简介
查看>>
socket通信Demo
查看>>
技术人员的焦虑
查看>>
js 判断整数
查看>>
建设网站应该考虑哪些因素
查看>>
mongodb $exists
查看>>
js实现页面跳转的几种方式
查看>>
sbt笔记一 hello-sbt
查看>>
常用链接
查看>>
pitfall override private method
查看>>
!important 和 * ----hack
查看>>
聊天界面图文混排
查看>>
控件的拖动
查看>>
svn eclipse unable to load default svn client的解决办法
查看>>
Android.mk 文件语法详解
查看>>
QT liunx 工具下载
查看>>
内核源码树
查看>>
Java 5 特性 Instrumentation 实践
查看>>
AppScan使用
查看>>
Java NIO框架Netty教程(三) 字符串消息收发(转)
查看>>