即時試圖將我的流數據從火花保存到卡桑德拉,火花接到卡夫卡,它的工作正常,但保存到卡桑德拉,使我變得瘋狂。即時通訊使用火花2.0.2,卡夫卡0.10和2.23卡桑德拉,火花,卡桑德拉,流媒體,蟒蛇,錯誤,數據庫,卡夫卡
這是submiting如何IM引發
spark-submit --verbose --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0 --jars /tmp/pyspark-cassandra-0.3.5.jar --driver-class-path /tmp/pyspark-cassandra-0.3.5.jar --py-files /tmp/pyspark-cassandra-0.3.5.jar --conf spark.cassandra.connection.host=localhost /tmp/direct_kafka_wordcount5.py localhost:9092 testing
,這是我的代碼,它只是從火花塞的例子,它的作品,但我一點點修改着保存這些數據卡桑德拉....
,這什麼我嘗試做的,但只是計數結果 http://rustyrazorblade.com/2015/05/spark-streaming-with-python-and-kafka/
from __future__ import print_function
import sys
import os
import time
import pyspark_cassandra
import pyspark_cassandra.streaming
from pyspark_cassandra import CassandraSparkContext
import urllib
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
from pyspark.sql.functions import from_unixtime, unix_timestamp, min, max
from pyspark.sql.types import FloatType
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: direct_kafka_wordcount.py <broker_list> <topic>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 1)
sqlContext = SQLContext(sc)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
counts=lines.count()
counts.saveToCassandra("spark", "count")
counts.pprint()
ssc.start()
ssc.awaitTermination()
我得到這個錯誤,
回溯(最近通話最後一個): 文件 「/tmp/direct_kafka_wordcount5.py」,行88,在 counts.saveToCassandra( 「火花」, 「計數」)
如何將整合器傳遞給savecassandra? ,我知道pyspark cassandra它過時了,但我使用spark 1.6 – logyport