2016-01-25 93 views
3

我使用如何將火花流與cassandra連接?

Cassandra v2.1.12 
Spark v1.4.1 
Scala 2.10 

和Cassandra是監聽

rpc_address:127.0.1.1 
rpc_port:9160 

例如,連接卡夫卡和火花流,一邊聽每4秒卡夫卡,我有以下的火花工作

sc = SparkContext(conf=conf) 
stream=StreamingContext(sc,4) 
map1={'topic_name':1} 
kafkaStream = KafkaUtils.createStream(stream, 'localhost:2181', "name", map1) 

並且spark-streaming不斷收聽kafka經紀人,每隔4秒鐘輸出一次內容。

同樣的方式,我想要火花流媒體來收聽cassandra並輸出指定表格的內容,比方說每4秒

如何轉換上面的流代碼,使其與cassandra而不是kafka一起使用?


非流媒體解決方案

我可以明顯地保持在一個無限循環中運行的查詢,但事實並非如此流吧?

火花的工作:

from __future__ import print_function 
import time 
import sys 

from random import random 
from operator import add 
from pyspark.streaming import StreamingContext 
from pyspark import SparkContext,SparkConf 
from pyspark.sql import SQLContext 
from pyspark.streaming import * 

sc = SparkContext(appName="sparkcassandra") 
while(True): 
    time.sleep(5) 
    sqlContext = SQLContext(sc) 
    stream=StreamingContext(sc,4) 
    lines = stream.socketTextStream("127.0.1.1", 9160) 
    sqlContext.read.format("org.apache.spark.sql.cassandra")\ 
       .options(table="users", keyspace="keyspace2")\ 
       .load()\ 
       .show() 

像這樣運行

sudo ./bin/spark-submit --packages \ 
datastax:spark-cassandra-connector:1.4.1-s_2.10 \ 
examples/src/main/python/sparkstreaming-cassandra2.py 

,我得到表值這rougly看起來像

lastname|age|city|email|firstname 

那麼什麼是正確的方法「流媒體」來自cassandra的數據?


回答

0

目前Cassandra是本身不支持作爲星火1.6流源,必須實現自定義接收器爲你自己的情況下(listen to cassandra and output the contents of the specified table every say 4 seconds.)。

請參閱實施指南:

Spark Streaming Custom Receivers

1

目前「正道」從C *流數據是不是從C *流數據:)相反,它通常使很多更有意義讓你的消息隊列(如卡夫卡)在C *和Stream之前關閉。 C *不容易支持增量表讀取,但是如果集羣密鑰基於插入時間,則可以完成此操作。

如果您有興趣使用C *作爲流源一定要檢查出和 https://issues.apache.org/jira/browse/CASSANDRA-8844 變更數據捕獲評論

這是最有可能你在找什麼。

如果你實際上只是想定期讀取整個表,並做一些你可能是最好的,只是有cron作業推出一批操作你真的沒有反正恢復狀態的方式。