我無法並行Scala中的一個列表,讓顯示java.lang.NullPointerException無法在斯卡拉
並行列表messages.foreachRDD(rdd => {
for(avroLine <- rdd){
val record = Injection.injection.invert(avroLine.getBytes).get
val field1Value = record.get("username")
val jsonStrings=Seq(record.toString())
val newRow = sqlContext.sparkContext.parallelize(Seq(record.toString()))
}
})
輸出
jsonStrings...List({"username": "user_118", "tweet": "tweet_218", "timestamp": 18})
異常
Caused by: java.lang.NullPointerException
at com.capitalone.AvroConsumer$$anonfun$main$1$$anonfun$apply$1.apply(AvroConsumer.scala:83)
at com.capitalone.AvroConsumer$$anonfun$main$1$$anonfun$apply$1.apply(AvroConsumer.scala:74)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.util.CompletionIterator.foreach(CompletionIterator.scala:26)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
Thanks in Adv ANCE!
感謝您的回覆。我正在嘗試使用avro invert轉換我的RabbitMQ AVRO二進制流,然後將其保存爲文件系統的.csv文件。 val messages = RabbitMQUtils.createStream(ssc,rabbitParams); messages.foreachRDD(RDD => { \t爲(avroLine < - 消息){ VAL記錄= SparkUtils.getRecordInjection(QUEUE_NAME).invert(rdd.getBytes)。獲得; VAL jsonStrings:RDD [字符串] =皮下。 parallelize(Seq(record.toString())); val result = sqlContext.read.json(jsonStrings).toDF(); result.write.mode(「Append」)。csv(「/ Users/Documents/rabbitmq/consumer-out /「); }}) – Mg2729
它看起來非常像您試圖爲每個輸入記錄生成一個輸出記錄。那是對的嗎?是否有一個原因,你不能只使用'地圖'這個翻譯? –
是的,我的消費者每15分鐘運行一次並消耗所有流。此外,我嘗試使用map函數,val消息= RabbitMQUtils.createStream(ssc,rabbitParams); messages.foreachRDD(rdd => {val record = rdd.map(message => SparkUtils.getRecordInjection(QUEUE_NAME).invert(message。 getBytes).get); val jsonStrings:RDD [String] = sqlContext.sparkContext.parallelize(Seq(record.toString()));但是,至少在我的prev過程中,我可以將我的二進制流轉換爲可讀,但是我'獲取下面的輸出與地圖。jsonStrings ... ParallelCollectionRDD [42]並行在AVROMqStreaming.scala:62 – Mg2729