0
我使用星火流1.5.2 +卡夫卡火花流
object Kafka2HDFS {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Kafka2HDFS")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val zk = "192.168.1.105:2181,192.168.1.106:2181,192.168.1.107:2181"
val topicMap = Map("online_dev" -> 2)
val ds = KafkaUtils.createStream(ssc, zk, "group_online_dev", topicMap)
ds.map(_._2).window(Seconds(15)).foreachRDD(rdd => LogUtil.log.info(rdd.count()))
ssc.start()
ssc.awaitTermination()
}
}
有卡夫卡的話題online_dev在一些JSON字符串,它們可以與控制檯 - 消費積分,當卡夫卡總是返回空RDD消費者。應用程序提交後,動物園管理員將包含以下結構:
group_online_dev
`- owner
`- online_dev
`- 1
- 2
...
有組目錄下沒有偏移目錄。
我的問題是,在每個時間窗口返回的RDD都是空的。
在日誌文件中,我發現所有rdd.count()是。
我沒有使用sbt程序集來打包所有jar文件以加快編譯和部署。這裏是我的提交腳本:
~/spark-1.5.2/bin/spark-submit --executor-memory 30g\
--driver-memory 40g\
--executor-cores 10\
--num-executors 4\
--name Streaming\
--class dog.streaming.Kafka2HDFS\
--deploy-mode cluster\
--files "/home/hadoop/spark-1.5.2-t3-n/conf/hive-site.xml"\
--master yarn\
--driver-java-options "-XX:MaxPermSize=1G"\
--jars "lib/mysql-connector-java-5.1.38.jar,lib/datanucleus-api-jdo-3.2.6.jar,lib/datanucleus-core-3.2.10.jar,lib/datanucleus-rdbms-3.2.9.jar,lib/ImpalaJDBC4.jar,lib/play-functional_2.10-2.2.1.jar,lib/play-iteratees_2.10-2.2.1.jar,lib/play-datacommons_2.10-2.2.1.jar,lib/play-json_2.10-2.2.1.jar,lib/spark-core_2.10-1.5.2.jar,lib/spark-hive_2.10-1.5.2.jar,lib/spark-sql_2.10-1.5.2.jar,lib/spark-streaming-kafka_2.10-1.5.2.jar,lib/kafka_2.10-0.8.2.1.jar,lib/metrics-core-2.2.0.jar,lib/zkclient-0.7.jar"\
task/target/scala-2.10/jobs_2.10-1.0.jar