2016-10-26 68 views
0

我成功整合了kafka和saprk。 我想從kafka發送流式傳輸到spark.and現在我能夠發送流spark.I想在RDD這個流,所以我用createRDD()函數來創建rdds。 但是我在rdd中只有一些來自kafka的masseges。因爲它取決於偏移範圍。所以任何人都可以告訴我如何在kafka-spark CreateRDD()函數中設置offsetRange()。在pyspark rdd kafka中設置offsetRange()函數

回答

0

在您的代碼段

// Import dependencies and create kafka params as in Create Direct Stream 

    val offsetRanges = Array(
     // topic, partition, inclusive starting offset, exclusive ending offset 
     OffsetRange("test", 0, 0, 100), 
     OffsetRange("test", 1, 0, 100) 
    ) 

    val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent) 

Spark Kafka Integration guide

只需使用根據文檔: pyspark kafka streaming

卡夫卡話題partion第一套offsetranges使用

pyspark.streaming.kafka.OffsetRange(topic, partition, fromOffset, untilOffset) 

Initiali矩陣特殊積樣子:

fromOffset = 0 
untilOffset = 10 
partition = 0 
topic = 'topic' 
offset = OffsetRange(topic, partition, fromOffset, untilOffset) 
offsets = [offset] 

然後你就可以創建你RDD

kafkaRDD = KafkaUtils.createRDD(sc, kafkaParams, offsets) 
+0

亞斯但究竟什麼是火花的版本,您使用的pyspark –

+0

使用它? – FaigB

+0

我使用spark 1.5.1 –