在您的代碼段
// Import dependencies and create kafka params as in Create Direct Stream
val offsetRanges = Array(
// topic, partition, inclusive starting offset, exclusive ending offset
OffsetRange("test", 0, 0, 100),
OffsetRange("test", 1, 0, 100)
)
val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
Spark Kafka Integration guide
只需使用根據文檔: pyspark kafka streaming
卡夫卡話題partion第一套offsetranges使用
pyspark.streaming.kafka.OffsetRange(topic, partition, fromOffset, untilOffset)
Initiali矩陣特殊積樣子:
fromOffset = 0
untilOffset = 10
partition = 0
topic = 'topic'
offset = OffsetRange(topic, partition, fromOffset, untilOffset)
offsets = [offset]
然後你就可以創建你RDD
kafkaRDD = KafkaUtils.createRDD(sc, kafkaParams, offsets)
亞斯但究竟什麼是火花的版本,您使用的pyspark –
使用它? – FaigB
我使用spark 1.5.1 –