2
我正在使用Datastax Cassandra java驅動程序從spark工作人員寫入Cassandra。代碼片斷從Spark工作人員讀取和寫入Cassandra拋出錯誤
rdd.foreachPartition(record => {
val cluster = SimpleApp.connect_cluster(Spark.cassandraip)
val session = cluster.connect()
record.foreach { case (bin_key: (Int, Int), kpi_map_seq: Iterable[Map[String, String]]) => {
kpi_map_seq.foreach { kpi_map: Map[String, String] => {
update_tables(session, bin_key, kpi_map)
}
}
}
} //record.foreach
session.close()
cluster.close()
}
在閱讀我使用的火花卡桑德拉連接器(它使用相同的驅動程序在內部我想)
val bin_table = javaFunctions(Spark.sc).cassandraTable("keyspace", "bin_1")
.select("bin").where("cell = ?", cellname) // assuming this will run on worker nodes
println(s"get_bins_for_cell Count of Bins for Cell $cellname is ", cell_bin_table.count())
return bin_table
這樣做每一次不會造成任何問題。一起做是拋出這個堆棧跟蹤。
我的主要目標不是直接從Spark驅動程序進行寫入或讀取。它似乎仍然需要根據上下文做些事情;兩個上下文被使用?
16/07/06 06:21:29 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 22, euca-10-254-179-202.eucalyptus.internal): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_5_piece0 of broadcast_5
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1222)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
可以請你幫我這個'http://stackoverflow.com/questions/39363586/issue-while-storing-data-from-spark-streaming -to-cassanadra' – Naresh