2017-07-13 128 views
0

我無法並行Scala中的一個列表,讓顯示java.lang.NullPointerException無法在斯卡拉

並行列表
messages.foreachRDD(rdd => { 
     for(avroLine <- rdd){ 
     val record = Injection.injection.invert(avroLine.getBytes).get 
     val field1Value = record.get("username") 
     val jsonStrings=Seq(record.toString()) 
     val newRow = sqlContext.sparkContext.parallelize(Seq(record.toString())) 
      } 
      }) 

輸出

jsonStrings...List({"username": "user_118", "tweet": "tweet_218", "timestamp": 18}) 

異常

Caused by: java.lang.NullPointerException 
at com.capitalone.AvroConsumer$$anonfun$main$1$$anonfun$apply$1.apply(AvroConsumer.scala:83) 
at com.capitalone.AvroConsumer$$anonfun$main$1$$anonfun$apply$1.apply(AvroConsumer.scala:74) 
at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
at org.apache.spark.util.CompletionIterator.foreach(CompletionIterator.scala:26) 
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917) 
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
at org.apache.spark.scheduler.Task.run(Task.scala:99) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 

Thanks in Adv ANCE!

回答

0

您正試圖在spark worker上下文中創建RDD。雖然foreachRDD在驅動程序中運行,但您在每個RDD上執行的操作foreach將分發給工作人員。您似乎不太可能想要爲輸入流的每一行創建一個新的RDD。評論後

更新:

很難有評論線程那裏是代碼沒有格式的討論。我的基本問題是,你爲什麼不這樣做這樣的事情:

val messages: ReceiverInputDStream[String] = RabbitMQUtils.createStream(ssc, rabbitParams) 
def toJsonString(message: String): String = SparkUtils.getRecordInjection(QUEUE_NAME).invert(message.getBytes()).get 
val jsonStrings: DStream[String] = messages map toJsonString 

我沒有打擾找出並追查你使用的所有庫(請,下一次,提交MCVE)所以我沒有試圖編譯它。但它看起來像你想要的是將每個輸入消息映射到一個JSON字符串。也許你想對Strings產生的DStream做一些幻想,但這可能是一個不同的問題。

+0

感謝您的回覆。我正在嘗試使用av​​ro invert轉換我的RabbitMQ AVRO二進制流,然後將其保存爲文件系統的.csv文件。 val messages = RabbitMQUtils.createStream(ssc,rabbitParams); messages.foreachRDD(RDD => { \t爲(avroLine < - 消息){ VAL記錄= SparkUtils.getRecordInjection(QUEUE_NAME).invert(rdd.getBytes)。獲得; VAL jsonStrings:RDD [字符串] =皮下。 parallelize(Seq(record.toString())); val result = sqlContext.read.json(jsonStrings).toDF(); result.write.mode(「Append」)。csv(「/ Users/Documents/rabbitmq/consumer-out /「); }}) – Mg2729

+0

它看起來非常像您試圖爲每個輸入記錄生成一個輸出記錄。那是對的嗎?是否有一個原因,你不能只使用'地圖'這個翻譯? –

+0

是的,我的消費者每15分鐘運行一次並消耗所有流。此外,我嘗試使用map函數,val消息= RabbitMQUtils.createStream(ssc,rabbitParams); messages.foreachRDD(rdd => {val record = rdd.map(message => SparkUtils.getRecordInjection(QUEUE_NAME).invert(message。 getBytes).get); val jsonStrings:RDD [String] = sqlContext.sparkContext.parallelize(Seq(record.toString()));但是,至少在我的prev過程中,我可以將我的二進制流轉換爲可讀,但是我'獲取下面的輸出與地圖。jsonStrings ... ParallelCollectionRDD [42]並行在AVROMqStreaming.scala:62 – Mg2729

0
def toJsonString(message: String): String = {val record = 

SparkUtils.getRecordInjection(QUEUE_NAME).invert(message.getBytes()).get } 
dStreams.foreachRDD(rdd => { 
val jsonStrings = rdd.map (stream =>toJsonString(stream)) 
val df = sqlContext.read.json(jsonStrings) 
df.write.mode("Append").csv("/Users/Documents/kafka-poc/consumer-out/def/")} 
+0

請指出解決您問題的部分代碼。 – jwvh

+0

謝謝Joe爲您提供的所有幫助。 – Mg2729