2017-10-07 34 views
1

BigQuery資料表在Using the BigQuery Connector with Spark什麼樣的RDDS的火花可以保存到使用saveAsNewAPIHadoopDataset

// Perform word count. 
val wordCounts = (tableData 
    .map(entry => convertToTuple(entry._2)) 
    .reduceByKey(_ + _)) 

// Write data back into a new BigQuery table. 
// IndirectBigQueryOutputFormat discards keys, so set key to null. 
(wordCounts 
    .map(pair => (null, convertToJson(pair))) 
    .saveAsNewAPIHadoopDataset(conf)) 

的例子,如果我刪除.reduceByKey(_ + _)一部分,那麼我將有以下錯誤

組織.apache.spark.SparkException:作業中止。 at org.apache.spark.internal.io.SparkHadoopMapReduceWriter $ .write(SparkHadoopMapReduceWriter.scala:107) at org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ saveAsNewAPIHadoopDataset $ 1.apply $ mcV $ sp(PairRDDFunctions.scala :1085) at org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ saveAsNewAPIHadoopDataset $ 1.apply(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ saveAsNewAPIHadoopDataset $ 1.apply(PairRDDFunctions .scala:1085) at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:112) at org .apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(Pa irRDDFunctions.scala:1084) ... 46已忽略 導致:java.io.IOException:Schema沒有字段。表:test_output_40b400dc_1bfe_454a_9aa8_bf9562d54c3f_source 在com.google.cloud.hadoop.io.bigquery.BigQueryUtils.waitForJobCompletion(BigQueryUtils.java:95) 在com.google.cloud.hadoop.io.bigquery.BigQueryHelper.importFromGcs(BigQueryHelper.java:164 ) 在com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputCommitter.commitJob(IndirectBigQueryOutputCommitter.java:57) 在org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:128) 在org.apache.spark.internal.io.SparkHadoopMapReduceWriter $ .WRITE(SparkHadoopMapReduceWriter.scala:101) ...... 53多個

在一些C ases,我不使用reduceByKey,並想將我的RDD保存在BigQuery中。

+0

可以添加完整的錯誤?是因爲你所做的代碼改變了嗎? – mrsrinivas

+0

是的,錯誤只發生在我的代碼更改後。 – bignano

回答

0

刪除.reduceByKey(_ + _)給出錯誤,因爲下一段代碼需要配對。當您刪除reduceByKey上一步的輸出只會包含不滿足要求convertToJson

0

試單柱架構的工作:

object Schema { 
    def apply(record: JsonObject): Schema = Schema (
     word = record.get ("word").getAsString, 
     Count = record.get ("Count").getAsInt 
    ) 
} 
case class Schema(word String, 
        Count :Int 
       ) 

,並通過這個模式是這樣的:

wordCounts.map(x=>Schema(x)) 

希望它可以幫助你