2017-04-15 64 views
0

我正在使用斯卡拉的Spark Streaming將日誌數據寫入彈性搜索使用火花流索引到Elasticsearch時出錯,具有多於22個參數的斯卡拉個案類

我不能夠創建大於22個參數的case scala需要在我的情況下,並不支持scala 2.10。 因此,使用下面的方法來創建類,而不是案例類

斯卡拉類

class FactUsage(d_EVENT_TYPE_NR: Long,EVENT_GRP_DESC: String,EVENT_DESC: String,CUST_TYPE_CD: Long,TICKET_RATING_CD: Long,BUS_UNIT_DESC: String,CUST_MKT_SEGM_DESC: String,EVENT_DTTM: String,EVENT_DTNR: Long,SERVED_PARTY_IMEI_NUM: String,SERVED_PARTY_IMSI_NUM: String,SERVED_PARTY_PHONE_NUM: Long,OTHER_PARTY_ID: String,EVENT_DURATION_QTY: Long,EVENT_VOLUME_DOWN_QTY: Long,EVENT_VOLUME_TOTAL_QTY: Long,EVENT_VOLUME_UP_QTY: Long,ACCESS_POINT_ID: String,d_CELL_NR: Long,d_CONTRACT_NR: Long,d_CUSTOMER_NR: Long,d_CUSTOMER_TOP_PARENT_NR: String,d_DEVICE_NR: Long,d_ORIGIN_DESTINATION_NR: Long,d_DIRECTION_NR: Long,d_OTHER_OPER_NR: Long,d_OTHER_SUBSCR_OPER_NR: Long,d_ROAMING_NR: Long,d_SALES_AGENT_NR: String,d_SERVED_OPER_NR: Long,d_SERVED_SUBSCR_OPER_NR: Long,d_TARIFF_MODEL_NR: Long,d_TERMINATION_NR: Long,d_USAGE_SERVICE_NR: Long,RUN_ID: String) extends Product with Serializable 
{ 
def canEqual(that:Any)=that.isInstanceOf[FactUsage] 
def productArity = 35 // Number of columns 

def productElement(idx: Int) = idx match 
{ 
case 0 => d_EVENT_TYPE_NR;case 1 =>EVENT_GRP_DESC;case 2 =>EVENT_DESC;case 3 =>CUST_TYPE_CD;case 4 =>TICKET_RATING_CD;case 5 =>BUS_UNIT_DESC;case 6 =>CUST_MKT_SEGM_DESC;case 7 =>EVENT_DTTM;case 8 =>EVENT_DTNR;case 9 =>SERVED_PARTY_IMEI_NUM;case 10 =>SERVED_PARTY_IMSI_NUM;case 11 =>SERVED_PARTY_PHONE_NUM;case 12 =>OTHER_PARTY_ID;case 13 =>EVENT_DURATION_QTY;case 14 =>EVENT_VOLUME_DOWN_QTY;case 15 =>EVENT_VOLUME_TOTAL_QTY;case 16 =>EVENT_VOLUME_UP_QTY;case 17 =>ACCESS_POINT_ID;case 18 =>d_CELL_NR;case 19 =>d_CONTRACT_NR;case 20 =>d_CUSTOMER_NR;case 21 =>d_CUSTOMER_TOP_PARENT_NR;case 22 =>d_DEVICE_NR;case 23 =>d_ORIGIN_DESTINATION_NR;case 24 =>d_DIRECTION_NR;case 25 =>d_OTHER_OPER_NR;case 26 =>d_OTHER_SUBSCR_OPER_NR;case 27 =>d_ROAMING_NR;case 28 =>d_SALES_AGENT_NR;case 29 =>d_SERVED_OPER_NR;case 30 =>d_SERVED_SUBSCR_OPER_NR;case 31 =>d_TARIFF_MODEL_NR;case 32 =>d_TERMINATION_NR;case 33 =>d_USAGE_SERVICE_NR;case 34 =>RUN_ID 
} 
} 

星火流代碼寫入到Elasticsearch

val rddAbcServerLog = lines.filter(x => x.toString.contains("abc_server_logs")) 
EsSparkStreaming.saveToEs(rddAbcServerLog.map(line => parser.formatDelimeted(line)).map(p => parser.runES(p.toString)), esindex + "/" + estype) 

我已經調試並沒有問題與lambda表達式中使用的函數。 錯誤出現在寫入Elasticsearch

錯誤

17/04/15 11:34:05 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) 
org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [xx.xxx.xx.xx:10200] returned Bad Request(400) - failed to parse; Bailing out.. 
     at org.elasticsearch.hadoop.rest.RestClient.processBulkResponse(RestClient.java:250) 
     at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:202) 
     at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:220) 
     at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:242) 
     at org.elasticsearch.hadoop.rest.RestRepository.doWriteToIndex(RestRepository.java:182) 
     at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:159) 
     at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67) 
     at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:102) 
     at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:102) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
     at org.apache.spark.scheduler.Task.run(Task.scala:89) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:744) 
17/04/15 11:34:05 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [xx.xxx.xx.xx:10200] returned Bad Request(400) - failed to parse; Bailing out.. 
     at org.elasticsearch.hadoop.rest.RestClient.processBulkResponse(RestClient.java:250) 
     at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:202) 
     at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:220) 
     at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:242) 
     at org.elasticsearch.hadoop.rest.RestRepository.doWriteToIndex(RestRepository.java:182) 
     at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:159) 
     at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67) 
     at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:102) 
     at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:102) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
     at org.apache.spark.scheduler.Task.run(Task.scala:89) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:744) 

17/04/15 11:34:05 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job 
17/04/15 11:34:05 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
17/04/15 11:34:05 INFO TaskSchedulerImpl: Cancelling stage 0 

注:代碼可能有奇怪的命名規則和屏蔽IP地址,我已經修改了代碼發佈到公共論壇

回答

0

你是什麼這樣做很麻煩並且容易出錯。相反,使用多個案例類。

case class Group(grpDesc: String, eventDesc: String) 
case class Event(dttm: String, dtnr: String) 

...等等

然後當你已經分組的所有相關物品進入自己的case類:

case class FactUsage(group: Group, event: Event, ...) 

您應該傳遞的FactUsage一個實例saveToEs

+0

感謝Vidya爲您的答案!但是這種方法的問題是:由於創建了多個case類,我有一個包含35個字段的索引,所以我的Elasticsearch索引將更改爲如下所示...:「_source」:{「group」:{...},「事件「:{....}} ....與多個case類一樣,將會寫入多個json。 – Rahul

+0

在Elasticsearch中存儲和索引嵌套對象的方法比試圖像Scala編譯器那樣愚弄Scala編譯器要容易得多。當然,最好的解決方案是升級到Scala 2.11。 – Vidya

+0

謝謝..這是簡單的方法,但它是唯一的解決方案,也將不得不改變ES索引映射。 Scala版本目前無法升級。 – Rahul