2017-05-08 110 views
2

我正在尋找Spark Streaming + Accumulo連接器和完整的使用示例。Spark Streaming + Accumulo - 序列化BatchWriterImpl

當前我正在嘗試將Spark Streaming結果寫入Accumulo表,但是我得到了BatchWriter的NotSerializableException。有人能指點我如何序列化BatchWriter的例子嗎?以下代碼基於Accumulo文檔。

當前代碼:

val accumuloInstanceName = "accumulo" 
val zooKeepers = "localhost:2181" 
val instance = new ZooKeeperInstance(accumuloInstanceName, zooKeepers) 
val accumuloUser = programOptions.accumuloUser() 
val accumuloPassword = programOptions.accumuloPassword() 
val passwordToken = new PasswordToken(accumuloPassword) 
val connector = instance.getConnector(accumuloUser, passwordToken) 

val accumuloBatchWriterConfig = new BatchWriterConfig 
val accumuloBatchWriterMaxMemory = 32 * 1024 * 1024 
accumuloBatchWriterConfig.setMaxMemory(accumuloBatchWriterMaxMemory) 
val accumuloBatchWriter = connector.createBatchWriter("Data", accumuloBatchWriterConfig) 
fullMergeResultFlatten.foreachRDD(recordRDD => 
    recordRDD.foreach(record => { 
    val mutation = new Mutation(Longs.toByteArray(record.timestamp)) 
    mutation.put("value", "", new Value(Longs.toByteArray(record.value))) 
    mutation.put("length", "", new Value(Longs.toByteArray(record.length))) 
    accumuloBatchWriter.addMutation(mutation) 
    }) 
) 

在運行時發生錯誤:

17/05/05 16:55:25 ERROR util.Utils: Exception encountered 
java.io.NotSerializableException: org.apache.accumulo.core.client.impl.BatchWriterImpl 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) 
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) 

我想這是很常見的情況,但我無法找到任何簡單的火花流+ accumulo例子。

回答

0

正如elserj指出的,序列化連接對象通常不是正確的模式。我所看到的模式是直接使用RDD.foreachPartition()從Spark工作節點發起連接。這很好,因爲它允許您爲每批作業創建一個連接(而不是爲每個單獨的記錄創建一個幾乎沒有效率的新連接)。

實施例:

fullMergeResultFlatten.foreachRDD(recordRDD => { 
    recordRDD.foreachPartition(partitionRecords => { 
    // this connection logic is executed in the Spark workers 
    val accumuloBatchWriter = connector.createBatchWriter("Data", accumuloBatchWriterConfig) 
    partitionRecords.foreach(// save operation) 
    accumuloBatchWriter.close() 
    }) 
}) 
0

您不能序列化BatchWriter類。我沒有關於如何修正你的代碼的建議,但我可以說試圖序列化這個類並不是正確的方法。

相關問題