Cassandra v2.1.12
Spark v1.4.1
Scala 2.10
和Cassandra是監聽
rpc_address:127.0.1.1
rpc_port:9160
例如,連接卡夫卡和火花流,一邊聽每4秒卡夫卡,我有以下的火花工作
sc = SparkContext(conf=conf)
stream=StreamingContext(sc,4)
map1={'topic_name':1}
kafkaStream = KafkaUtils.createStream(stream, 'localhost:2181', "name", map1)
並且spark-streaming不斷收聽kafka經紀人,每隔4秒鐘輸出一次內容。
同樣的方式,我想要火花流媒體來收聽cassandra並輸出指定表格的內容,比方說每4秒。
如何轉換上面的流代碼,使其與cassandra而不是kafka一起使用?
非流媒體解決方案
我可以明顯地保持在一個無限循環中運行的查詢,但事實並非如此流吧?
火花的工作:
from __future__ import print_function
import time
import sys
from random import random
from operator import add
from pyspark.streaming import StreamingContext
from pyspark import SparkContext,SparkConf
from pyspark.sql import SQLContext
from pyspark.streaming import *
sc = SparkContext(appName="sparkcassandra")
while(True):
time.sleep(5)
sqlContext = SQLContext(sc)
stream=StreamingContext(sc,4)
lines = stream.socketTextStream("127.0.1.1", 9160)
sqlContext.read.format("org.apache.spark.sql.cassandra")\
.options(table="users", keyspace="keyspace2")\
.load()\
.show()
像這樣運行
sudo ./bin/spark-submit --packages \
datastax:spark-cassandra-connector:1.4.1-s_2.10 \
examples/src/main/python/sparkstreaming-cassandra2.py
,我得到表值這rougly看起來像
lastname|age|city|email|firstname
那麼什麼是正確的方法「流媒體」來自cassandra的數據?