2017-04-04 60 views
1

我正在嘗試創建消費者生產者應用程序。如何使用spark python將數據保存在cassandra表中?

應用程序的生產者將產生一些關於特定主題的數據。消費者將使用來自同一主題的這些數據並使用spark api進行處理,並將這些數據存儲在cassandra表中。

傳入數據以字符串格式等正在添加下面 -

100 = NO | 101 = III | 102 = 0.0771387731911 | 103 = -0.7076915761 100 = NO | 101 = AAA | 102 = 0.8961325446464 | 103 = -0.5465463154

012 -

from kafka import KafkaConsumer 
from StringIO import StringIO 
import pandas as pd 
from cassandra.cluster import Cluster 

from pyspark import SparkConf, SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils 

def main(): 

    sc = SparkContext(appName="StreamingContext") 
    ssc = StreamingContext(sc, 3) 

    kafka_stream = KafkaUtils.createStream(ssc, "localhost:2181", "sample-kafka-app", {"NO-topic": 1}) 
    raw = kafka_stream.flatMap(lambda kafkaS: [kafkaS]) 
    clean = raw.map(lambda xs: xs[1].split("|")) 
    my_row = clean.map(lambda x: { 
     "pk": "uuid()", 
     "a": x[0], 
     "b": x[1], 
     "c": x[2], 
     "d": x[3], 
    }) 

    my_row.saveToCassandra("users", "data") 
    stream.start() 
    stream.awaitTermination() 

if __name__ == "__main__": 
    main() 

卡桑德拉表結構:

我在波紋管的方式產生的消費

我面對下面的錯誤 -

Traceback (most recent call last): 


File "consumer_no.py", line 84, in <module> 
    main() 
    File "consumer_no.py", line 53, in main 
    my_row.saveToCassandra("users", "data") 
AttributeError: 'TransformedDStream' object has no attribute 'saveToCassandra' 
17/04/04 14:29:22 INFO SparkContext: Invoking stop() from shutdown hook 

是我要去一個正確的方式來實現我上面的解釋?如果沒有,那麼給我一些建議來實現這一點,如果是的話,那麼上面代碼中出現了什麼問題?

+0

[保存數據回卡桑德拉作爲RDD]的可能的複製(http://stackoverflow.com/questions/35414677/saving-data-返回到cassandra-as-rdd) –

+0

[如何使用spark的saveToCassandra保存cassandra表中數據的可能的副本](http://stackoverflow.com/questions/43198661/how-to-save-data-in-卡桑德拉-表使用-火花-savetocassandra) – RussS

回答

0

與其直接嘗試將TransformedDStream保存到Cassandra,您應該將每個RDT從該DStream保存到cassandra。

,如果你這樣做你的代碼應工作:

my_row.foreachRDD(lambda x: x.saveToCassandra("users", "data")) 
相關問題