2015-10-29 107 views
5

我拼命嘗試將Cassandra連接到pyspark,但我無法使其工作。我對火花和卡桑德拉來說很新,所以我可能會錯過一些相當簡單的事情。將Cassandra與Spark連接/集成(pyspark)

我對網上所有不同的解釋有些困惑,但是從我的理解,最簡單的方法是使用「Spark包」? (http://spark-packages.org/package/TargetHolding/pyspark-cassandra

所以,用下面的命令:

./bin/spark-submit --packages com.datastax.spark:spark-cassandra-connector_2.11:1.5.0-M2 ../Main/Code/myPysparkFile.py 

我說得對,我明白,我不需要,如果我用火花包如上所述下載任何包?

在myPysparkFile.py

我嘗試了以下兩個版本,兩者都不我爲我工作:

第1版,這是我從14頁http://www.slideshare.net/JonHaddad/intro-to-py-spark-and-cassandra了:

"SparkCassandraTest.py" 
from pyspark import SparkContext, SparkConf 
from pyspark_cassandra import CassandraSparkContext,Row 

conf = SparkConf() 
conf.setMaster("local[4]") 
conf.setAppName("Spark Cassandra") 
conf.set("spark.cassandra.connection.host","http://127.0.0.1") 

sc = CassandraSparkContext(conf=conf) 

rdd = sc.cassandraTable("test", "words") 

爲錯誤我得到:

ImportError: No module named pyspark_cassandra 

版本2(其從啓發:https://github.com/datastax/spark-cassandra-connector/blob/master/doc/15_python.md):

"SparkCassandraTest.py" 
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SQLContext 

conf = SparkConf() 
conf.setMaster("local[4]") 
conf.setAppName("Spark Cassandra") 
conf.set("spark.cassandra.connection.host","http://127.0.0.1") 

sc = SparkContext(conf=conf) 
sqlContext = SQLContext(sc) 

sqlContext.read\ 
    .format("org.apache.spark.sql.cassandra")\ 
    .options(table="kv", keyspace="test")\ 
    .load().show() 

使我有以下錯誤:

py4j.protocol.Py4JJavaError: An error occurred while calling o28.load. 
: java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less; 
    at org.apache.spark.sql.cassandra.DefaultSource$.<init>(DefaultSource.scala:138) 
    at org.apache.spark.sql.cassandra.DefaultSource$.<clinit>(DefaultSource.scala) 
    at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:56) 
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:125) 
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:483) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:207) 
    at java.lang.Thread.run(Thread.java:745) 

我真的不知道我做錯了,希望得到任何幫助。 另外,使用版本1或版本2有什麼區別?這兩個版本之間有什麼優點或缺點?此外,有關如何最佳整合和使用火花與卡桑德拉的任何進一步的參考將不勝感激。

順便說一句,Cassandra是在我的電腦上運行在端口基本配置7000

感謝。

+0

什麼是火花版本 – Abhi

回答

8

Pyspark_Cassandra是與spark-cassandra連接器不同的包裝。它包含SCC的一個版本,但不可互換。安裝SCC不會安裝pyspark_cassandra。如果您想使用pyspark的sc.cassandraTable(),則此包是必需的。

安裝SCC可以讓您在pyspark中使用Dataframes,這是處理來自pyspark的C *最有效的方法。這與您的V2示例相同。它失敗使得它看起來像你沒有使用--package命令啓動V2。

可以失敗的原因是,你在這裏指定的Scala 2.11版本的庫

./bin/spark-submit --packages com.datastax.spark:spark-cassandra-connector_2.11:1.5.0-M2 ../Main/Code/myPysparkFile.py 

而且最有可能沒有運行的Scala 2.10版本星火(默認下載2.10)

+1

你說得對,用2.10用同樣的命令解決了這個問題。非常感謝。 – Kito

+0

我可以使用相同的驅動程序連接到Cassandra 3.3 @RussS – Abhi

+0

您是否知道可以從cassandra連接器創建表? –