2016-02-15 180 views
0

我使用星火流1.5.2 +卡夫卡火花流

object Kafka2HDFS { 
    def main(args: Array[String]) { 
    val sparkConf = new SparkConf().setAppName("Kafka2HDFS") 
    val ssc = new StreamingContext(sparkConf, Seconds(5)) 

    val zk = "192.168.1.105:2181,192.168.1.106:2181,192.168.1.107:2181" 
    val topicMap = Map("online_dev" -> 2) 

    val ds = KafkaUtils.createStream(ssc, zk, "group_online_dev", topicMap) 
    ds.map(_._2).window(Seconds(15)).foreachRDD(rdd => LogUtil.log.info(rdd.count())) 

    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

有卡夫卡的話題online_dev在一些JSON字符串,它們可以與控制檯 - 消費積分,當卡夫卡總是返回空RDD消費者。應用程序提交後,動物園管理員將包含以下結構:

group_online_dev 
    `- owner 
     `- online_dev 
     `- 1 
      - 2 
... 

有組目錄下沒有偏移目錄。

我的問題是,在每個時間窗口返回的RDD都是空的。

在日誌文件中,我發現所有rdd.count()是。

我沒有使用sbt程序集來打包所有jar文件以加快編譯和部署。這裏是我的提交腳本:

~/spark-1.5.2/bin/spark-submit --executor-memory 30g\ 
    --driver-memory 40g\ 
    --executor-cores 10\ 
    --num-executors 4\ 
    --name Streaming\ 
    --class dog.streaming.Kafka2HDFS\ 
    --deploy-mode cluster\ 
    --files "/home/hadoop/spark-1.5.2-t3-n/conf/hive-site.xml"\ 
    --master yarn\ 
    --driver-java-options "-XX:MaxPermSize=1G"\ 
    --jars "lib/mysql-connector-java-5.1.38.jar,lib/datanucleus-api-jdo-3.2.6.jar,lib/datanucleus-core-3.2.10.jar,lib/datanucleus-rdbms-3.2.9.jar,lib/ImpalaJDBC4.jar,lib/play-functional_2.10-2.2.1.jar,lib/play-iteratees_2.10-2.2.1.jar,lib/play-datacommons_2.10-2.2.1.jar,lib/play-json_2.10-2.2.1.jar,lib/spark-core_2.10-1.5.2.jar,lib/spark-hive_2.10-1.5.2.jar,lib/spark-sql_2.10-1.5.2.jar,lib/spark-streaming-kafka_2.10-1.5.2.jar,lib/kafka_2.10-0.8.2.1.jar,lib/metrics-core-2.2.0.jar,lib/zkclient-0.7.jar"\ 
    task/target/scala-2.10/jobs_2.10-1.0.jar 

回答

0

我遇到過類似的問題。請務必在您的消費者kafka配置中使用 "auto.offset.reset" -> "earliest"屬性。

official documentation

消費者
爲什麼我從來沒有消費者得到任何數據?

默認情況下,當一個 消費者開始爲第一次,它忽略了所有現有 數據的話題,只會消耗新的數據 消費者開始後到來英寸如果是這種情況,請在消費者啓動後嘗試發送更多數據 。或者,您可以通過將auto.offset.reset設置爲「最早」來爲 消費者配置消費者,對於新的 消費者爲0.9,對於舊消費者爲「最小」。