2016-01-25 105 views
0

我需要存儲來自kafka-> spark streaming-> cassandra的值。將火花值保存到Cassandra

現在,我收到來自kafka-> spark的值,並且我有一個spark工作將值保存到cassandra db中。但是,我正面臨數據類型dstream的問題。

在下面的代碼片段中,您可以看到我如何將DStream轉換爲Python友好的列表對象,以便我可以使用它,但它會給出錯誤。在卡夫卡製片

輸入:

Byrne 24 San Diego [email protected] Rob

火花的工作:

map1={'spark-kafka':1} 
kafkaStream = KafkaUtils.createStream(stream, 'localhost:2181', "name", map1) 
lines = kafkaStream.map(lambda x: x[1]) 
words = lines.flatMap(lambda line: line.split(" ")) 

words.pprint() # outputs-> Byrne 24 SanDiego [email protected] Rob 

list=[lambda word for word in words] 
#gives an error -> TypeError: 'TransformedDStream' object is not iterable 

這是怎麼了,我從火花>卡桑德拉節約值

rdd2=sc.parallelize([{ 
... "lastname":'Byrne', 
... "age":24, 
... "city":"SanDiego", 
... "email":"[email protected]", 
... "firstname":"Rob"}]) 
rdd2.saveToCassandra("keyspace2","users") 

什麼將DStream對象轉換爲字典或其他類型的最佳方法做我想在這裏做什麼的最佳方式?

我只需要將從kafka收到的值(以DStream的形式)保存在Cassandra中。

謝謝,任何幫助將是不錯的!

版本:

Cassandra v2.1.12 
Spark v1.4.1 
Scala 2.10 
+0

因爲這是我的錯誤,所以更多與$ JAVA_HOME問題有關而不是mesos。 – HackCode

+0

好男人放鬆。沒有理由在這裏得到超。現在我們不要垃圾此帖。 – HackCode

回答

0

其實,我找到了答案在本教程http://katychuang.me/blog/2015-09-30-kafka_spark.html

+0

@ HackCode-我也試圖執行相同的例子,但我面臨着saveToCassandra('keyspace','table')的錯誤line.Error - py4j.protocol.Py4JJavaError:調用o38.newInstance時發生錯誤。我錯過了什麼,你可以建議我。 – kit

0

像一切「斯帕克」,我想簡短的解釋是,由於這是因爲即使你熟悉RDDS,DStreams是一個更高的理念:
離散化的流(DStream )是同一類型的連續RDD序列,代表連續的數據流。在你的情況下,DStreams是從現場Kafka數據創建的。
雖然星火流程序運行,每DSTREAM定期生成從活卡夫卡數據

一個RDD現在,遍歷收到RDDS,你需要使用DStream#foreachRDD(和它的名字所暗示的,它提供了類似的目的因爲foreach,但是這一次,要迭代RDDs)。
一旦你有一個RDD,你可以調用rdd.collect()rdd.take()或任何其他的RDD標準API。

作爲結語,爲了讓事情變得更有趣,Spark引入了一種新的無接收器「直接」方法,以確保更強大的端到端保證。
KafkaUtils.createDirectStream需要Spark 1.3+)
此方法不是使用接收器接收數據,而是定期查詢Kafka每個主題+分區中的最新偏移量,並相應地定義每批處理的偏移量範圍。當處理數據的作業啓動時,Kafka簡單的客戶API用於讀取Kafka定義的偏移範圍。
(這是一個很好的方式來表達你將不得不「亂」與偏移自己)

進一步詳情,請參閱Direct Streams Approach
了Scala代碼示例