0
我正在使用斯卡拉的Spark Streaming將日誌數據寫入彈性搜索。使用火花流索引到Elasticsearch時出錯,具有多於22個參數的斯卡拉個案類
我不能夠創建大於22個參數的case scala需要在我的情況下,並不支持scala 2.10。 因此,使用下面的方法來創建類,而不是案例類
斯卡拉類
class FactUsage(d_EVENT_TYPE_NR: Long,EVENT_GRP_DESC: String,EVENT_DESC: String,CUST_TYPE_CD: Long,TICKET_RATING_CD: Long,BUS_UNIT_DESC: String,CUST_MKT_SEGM_DESC: String,EVENT_DTTM: String,EVENT_DTNR: Long,SERVED_PARTY_IMEI_NUM: String,SERVED_PARTY_IMSI_NUM: String,SERVED_PARTY_PHONE_NUM: Long,OTHER_PARTY_ID: String,EVENT_DURATION_QTY: Long,EVENT_VOLUME_DOWN_QTY: Long,EVENT_VOLUME_TOTAL_QTY: Long,EVENT_VOLUME_UP_QTY: Long,ACCESS_POINT_ID: String,d_CELL_NR: Long,d_CONTRACT_NR: Long,d_CUSTOMER_NR: Long,d_CUSTOMER_TOP_PARENT_NR: String,d_DEVICE_NR: Long,d_ORIGIN_DESTINATION_NR: Long,d_DIRECTION_NR: Long,d_OTHER_OPER_NR: Long,d_OTHER_SUBSCR_OPER_NR: Long,d_ROAMING_NR: Long,d_SALES_AGENT_NR: String,d_SERVED_OPER_NR: Long,d_SERVED_SUBSCR_OPER_NR: Long,d_TARIFF_MODEL_NR: Long,d_TERMINATION_NR: Long,d_USAGE_SERVICE_NR: Long,RUN_ID: String) extends Product with Serializable
{
def canEqual(that:Any)=that.isInstanceOf[FactUsage]
def productArity = 35 // Number of columns
def productElement(idx: Int) = idx match
{
case 0 => d_EVENT_TYPE_NR;case 1 =>EVENT_GRP_DESC;case 2 =>EVENT_DESC;case 3 =>CUST_TYPE_CD;case 4 =>TICKET_RATING_CD;case 5 =>BUS_UNIT_DESC;case 6 =>CUST_MKT_SEGM_DESC;case 7 =>EVENT_DTTM;case 8 =>EVENT_DTNR;case 9 =>SERVED_PARTY_IMEI_NUM;case 10 =>SERVED_PARTY_IMSI_NUM;case 11 =>SERVED_PARTY_PHONE_NUM;case 12 =>OTHER_PARTY_ID;case 13 =>EVENT_DURATION_QTY;case 14 =>EVENT_VOLUME_DOWN_QTY;case 15 =>EVENT_VOLUME_TOTAL_QTY;case 16 =>EVENT_VOLUME_UP_QTY;case 17 =>ACCESS_POINT_ID;case 18 =>d_CELL_NR;case 19 =>d_CONTRACT_NR;case 20 =>d_CUSTOMER_NR;case 21 =>d_CUSTOMER_TOP_PARENT_NR;case 22 =>d_DEVICE_NR;case 23 =>d_ORIGIN_DESTINATION_NR;case 24 =>d_DIRECTION_NR;case 25 =>d_OTHER_OPER_NR;case 26 =>d_OTHER_SUBSCR_OPER_NR;case 27 =>d_ROAMING_NR;case 28 =>d_SALES_AGENT_NR;case 29 =>d_SERVED_OPER_NR;case 30 =>d_SERVED_SUBSCR_OPER_NR;case 31 =>d_TARIFF_MODEL_NR;case 32 =>d_TERMINATION_NR;case 33 =>d_USAGE_SERVICE_NR;case 34 =>RUN_ID
}
}
星火流代碼寫入到Elasticsearch
val rddAbcServerLog = lines.filter(x => x.toString.contains("abc_server_logs"))
EsSparkStreaming.saveToEs(rddAbcServerLog.map(line => parser.formatDelimeted(line)).map(p => parser.runES(p.toString)), esindex + "/" + estype)
我已經調試並沒有問題與lambda表達式中使用的函數。 錯誤出現在寫入Elasticsearch
錯誤
17/04/15 11:34:05 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [xx.xxx.xx.xx:10200] returned Bad Request(400) - failed to parse; Bailing out..
at org.elasticsearch.hadoop.rest.RestClient.processBulkResponse(RestClient.java:250)
at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:202)
at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:220)
at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:242)
at org.elasticsearch.hadoop.rest.RestRepository.doWriteToIndex(RestRepository.java:182)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:159)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:102)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:102)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
17/04/15 11:34:05 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [xx.xxx.xx.xx:10200] returned Bad Request(400) - failed to parse; Bailing out..
at org.elasticsearch.hadoop.rest.RestClient.processBulkResponse(RestClient.java:250)
at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:202)
at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:220)
at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:242)
at org.elasticsearch.hadoop.rest.RestRepository.doWriteToIndex(RestRepository.java:182)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:159)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:102)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:102)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
17/04/15 11:34:05 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
17/04/15 11:34:05 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
17/04/15 11:34:05 INFO TaskSchedulerImpl: Cancelling stage 0
注:代碼可能有奇怪的命名規則和屏蔽IP地址,我已經修改了代碼發佈到公共論壇
感謝Vidya爲您的答案!但是這種方法的問題是:由於創建了多個case類,我有一個包含35個字段的索引,所以我的Elasticsearch索引將更改爲如下所示...:「_source」:{「group」:{...},「事件「:{....}} ....與多個case類一樣,將會寫入多個json。 – Rahul
在Elasticsearch中存儲和索引嵌套對象的方法比試圖像Scala編譯器那樣愚弄Scala編譯器要容易得多。當然,最好的解決方案是升級到Scala 2.11。 – Vidya
謝謝..這是簡單的方法,但它是唯一的解決方案,也將不得不改變ES索引映射。 Scala版本目前無法升級。 – Rahul