2016-04-21 62 views
0

我有以下任務在我之前。Spark和Cassandra並行處理

用戶執行spark提交命令時提供一組IP地址配置文件。

比方說,該數組看起來像這樣:

val ips = Array(1,2,3,4,5)

可以有高達陣列100.000值..

對於數組的所有元素,我應該讀Cassandra的數據,執行一些計算並將數據返回給Cassandra。

如果我做的:

ips.foreach(ip =>{ 
- read data from Casandra for specific "ip" // for each IP there is different amount of data to read (within the functions I determine start and end date for each IP) 
- process it 
- save it back to Cassandra}) 

能正常工作。

我相信,過程順序運行;我不利用並行性。

,如果我做的另一方面:

val IPRdd = sc.parallelize(Array(1,2,3,4,5)) 
IPRdd.foreach(ip => { 
- read data from Cassandra // I need to use spark context to make the query 
-process it 
save it back to Cassandra}) 

我得到的序列化異常,因爲火花試圖序列火花背景下,這是不序列化。

如何使這項工作,但仍然利用並行性。

感謝

編輯

這是execption我得到:在線程 「主要」 org.apache.spark.SparkException

例外:任務不序列化 的組織。 apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply(RDD.scala:919) at org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply(RDD.scala:918) at org.apache.spark.rdd .RDDOperationScope $ .withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala :316) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) at com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1.apply(WibeeeBatchJob.scala:59 ) at com.e nerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1.apply(WibeeeBatchJob.scala:54) at scala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable。在com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob.main上的ArrayOps $ ofRef.foreach(ArrayOps.scala:108) (WibeeeBatchJob.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect。NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在java.lang.reflect.Method.invoke(Method.java:498) 在org.apache。 spark.deploy.SparkSubmit $ .org $ apache $ spark $ deploy $ SparkSubmit $$ runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit $ .doRunMain $ 1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit $ .submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit $ .main(SparkSubmit.scala:121) at org.apache.spark.deploy。 SparkSubmit.main(SparkSubmit.scala) 引起:java.io.NotSerializableException:org.apache.spark.SparkContext 序列化堆棧: - object not serializable(class:org.apache.spark.SparkContext,value:[email protected]) - field(class:com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1,name:sc $ 1,type:class org.apache.spark.SparkContext) - object(class com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1,) - field(class:com。 enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1 $$ anonfun $ apply $ 1,name:$ outer,type:class com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1) - 對象(類com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1 $$ anonfun $ apply $ 1) at org.apache.spark.serializer.SerializationDebugger $ .improveException(SerializationDebugger.scala:40) 在org.apache.spark.serializer.JavaSerializationS tream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:301 )

+0

什麼是特定異常和導致該問題的具體代碼? –

回答

1

最簡單的做法是使用Spark Cassandra Connector它可以處理連接池和序列化。

有了,你可以做類似

sc.parallelize(inputData, numTasks) 
    .mapPartitions { it => 
    val con = CassandraConnection(yourConf) 
    con.withSessionDo{ session => 
     //Use the session 
    } 
    //Do any other processing 
    }.saveToCassandra("ks","table" 

這將是一個卡桑德拉連接的完全手動操作。會話將全部自動合併並緩存,如果您準備聲明,那麼這些會話也會緩存在執行程序中。

如果你想使用更多的內置方法,也有joinWithCassandraTable這可能適合你的情況。

sc.parallelize(inputData, numTasks) 
    .joinWithCassandraTable("ks","table") //Retrieves all records for which input data is the primary key 
    .map(//manipulate returned results if needed) 
    .saveToCassandra("ks","table") 
+0

你不明白我的問題。 這是我是否可以在並行模式下執行100個不同的查詢(查詢的不同參數),以便執行者是執行查詢的人(這意味着他們是需要Spark上下文實例的人)... 但我意識到spark的上下文不能(也不應該)發送給executors,所以我需要改變我的管道結構。 –

+0

我給出的例子都有執行者執行查詢。 – RussS