我有以下任務在我之前。Spark和Cassandra並行處理
用戶執行spark提交命令時提供一組IP地址配置文件。
比方說,該數組看起來像這樣:
val ips = Array(1,2,3,4,5)
可以有高達陣列100.000值..
對於數組的所有元素,我應該讀Cassandra的數據,執行一些計算並將數據返回給Cassandra。
如果我做的:
ips.foreach(ip =>{
- read data from Casandra for specific "ip" // for each IP there is different amount of data to read (within the functions I determine start and end date for each IP)
- process it
- save it back to Cassandra})
能正常工作。
我相信,過程順序運行;我不利用並行性。
,如果我做的另一方面:
val IPRdd = sc.parallelize(Array(1,2,3,4,5))
IPRdd.foreach(ip => {
- read data from Cassandra // I need to use spark context to make the query
-process it
save it back to Cassandra})
我得到的序列化異常,因爲火花試圖序列火花背景下,這是不序列化。
如何使這項工作,但仍然利用並行性。
感謝
編輯
這是execption我得到:在線程 「主要」 org.apache.spark.SparkException
例外:任務不序列化 的組織。 apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply(RDD.scala:919) at org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply(RDD.scala:918) at org.apache.spark.rdd .RDDOperationScope $ .withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala :316) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) at com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1.apply(WibeeeBatchJob.scala:59 ) at com.e nerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1.apply(WibeeeBatchJob.scala:54) at scala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable。在com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob.main上的ArrayOps $ ofRef.foreach(ArrayOps.scala:108) (WibeeeBatchJob.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect。NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在java.lang.reflect.Method.invoke(Method.java:498) 在org.apache。 spark.deploy.SparkSubmit $ .org $ apache $ spark $ deploy $ SparkSubmit $$ runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit $ .doRunMain $ 1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit $ .submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit $ .main(SparkSubmit.scala:121) at org.apache.spark.deploy。 SparkSubmit.main(SparkSubmit.scala) 引起:java.io.NotSerializableException:org.apache.spark.SparkContext 序列化堆棧: - object not serializable(class:org.apache.spark.SparkContext,value:[email protected]) - field(class:com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1,name:sc $ 1,type:class org.apache.spark.SparkContext) - object(class com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1,) - field(class:com。 enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1 $$ anonfun $ apply $ 1,name:$ outer,type:class com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1) - 對象(類com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1 $$ anonfun $ apply $ 1) at org.apache.spark.serializer.SerializationDebugger $ .improveException(SerializationDebugger.scala:40) 在org.apache.spark.serializer.JavaSerializationS tream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:301 )
什麼是特定異常和導致該問題的具體代碼? –