2017-06-04 160 views
0

即時試圖將我的流數據從火花保存到卡桑德拉,火花接到卡夫卡,它的工作正常,但保存到卡桑德拉,使我變得瘋狂。即時通訊使用火花2.0.2,卡夫卡0.10和2.23卡桑德拉,火花,卡桑德拉,流媒體,蟒蛇,錯誤,數據庫,卡夫卡

這是submiting如何IM引發

spark-submit --verbose --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0 --jars /tmp/pyspark-cassandra-0.3.5.jar --driver-class-path /tmp/pyspark-cassandra-0.3.5.jar --py-files /tmp/pyspark-cassandra-0.3.5.jar --conf spark.cassandra.connection.host=localhost /tmp/direct_kafka_wordcount5.py localhost:9092 testing 

,這是我的代碼,它只是從火花塞的例子,它的作品,但我一點點修改着保存這些數據卡桑德拉....

,這什麼我嘗試做的,但只是計數結果 http://rustyrazorblade.com/2015/05/spark-streaming-with-python-and-kafka/

from __future__ import print_function 
import sys 
import os 
import time 
import pyspark_cassandra 
import pyspark_cassandra.streaming 
from pyspark_cassandra import CassandraSparkContext 
import urllib 
from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils 
from pyspark.sql import SQLContext 
from pyspark.sql import Row 
from pyspark.sql.types import IntegerType 
from pyspark.sql.functions import udf 
from pyspark.sql.functions import from_unixtime, unix_timestamp, min, max 
from pyspark.sql.types import FloatType 
from pyspark.sql.functions import explode 
from pyspark.sql.functions import split 
if __name__ == "__main__": 
    if len(sys.argv) != 3: 
     print("Usage: direct_kafka_wordcount.py <broker_list> <topic>", file=sys.stderr) 
     exit(-1) 
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount") 
    ssc = StreamingContext(sc, 1) 
    sqlContext = SQLContext(sc) 
    brokers, topic = sys.argv[1:] 
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) 
    lines = kvs.map(lambda x: x[1]) 
    counts=lines.count() 
    counts.saveToCassandra("spark", "count") 
    counts.pprint() 
    ssc.start() 
    ssc.awaitTermination() 

我得到這個錯誤,

回溯(最近通話最後一個): 文件 「/tmp/direct_kafka_wordcount5.py」,行88,在 counts.saveToCassandra( 「火花」, 「計數」)

回答

0

Pyspark Casasndra停止前一陣子被更新,最新版本只支持最多星火1.6 https://github.com/TargetHolding/pyspark-cassandra

此外

counts=lines.count() // Returns data to the driver (not an RDD) 

counts現在是一個整數。這意味着功能saveToCassandra不適用,因爲這是RDD的功能

+0

如何將整合器傳遞給savecassandra? ,我知道pyspark cassandra它過時了,但我使用spark 1.6 – logyport