3

所以我有一個蟒蛇Stream-sourced DataFrame df,它具有所有我想要放入卡斯安德表的spark-cassandra-connector表中的數據。我已經在兩個方面試着這樣做:如何將流數據集寫入Cassandra?

df.write \ 
    .format("org.apache.spark.sql.cassandra") \ 
    .mode('append') \ 
    .options(table="myTable",keyspace="myKeySpace") \ 
    .save() 

query = df.writeStream \ 
    .format("org.apache.spark.sql.cassandra") \ 
    .outputMode('append') \ 
    .options(table="myTable",keyspace="myKeySpace") \ 
    .start() 

query.awaitTermination() 

但是我不斷獲取此錯誤,分別爲:

pyspark.sql.utils.AnalysisException: "'write' can not be called on streaming Dataset/DataFrame; 

java.lang.UnsupportedOperationException: Data source org.apache.spark.sql.cassandra does not support streamed writing. 

有反正我可以把我的流DataFrame放入我的Cassandra表中?

回答

6

Spark Cassandra Connector中目前沒有用於Cassandra的流式傳輸Sink。您將需要實施自己的Sink或等待它變得可用。

如果您使用的是Scala或Java,則可以使用foreach運算符並使用Using Foreach中所述的ForeachWriter

+1

有什麼辦法可以將我的Streaming DataFrame轉換爲非Streaming數據框? – user2361174

+2

不,沒有轉換(至少沒有我知道的) – RussS

+0

您是否在Java中有工作示例?看起來所有的解決方案來''CassandraConnector.withSessionDo'這需要Scala實現特質;所以沒有運氣與Kotlin或Java .. – Reith