2017-02-24 134 views
0

在獨立的spark中,我試圖從一個數據框寫入Elasticsearch。雖然我可以得到它的工作,但我無法弄清楚如何寫入格式爲'index_name- {ts_col:{YYYY-mm-dd}}'的動態命名索引,其中'ts_col'是一個日期時間字段在數據集中。是否有可能使用elasticsearch-hadoop/spark寫入帶有格式化日期的動態創建的Elasticsearch索引?

我見過各種各樣的帖子說這種類型的語法應該可以工作,但是當我嘗試它時,我會收到包含在底部的錯誤。它似乎首先檢查在創建索引之前索引是否存在,但它將未格式化的索引名稱傳遞給該索引,而不是動態創建索引名稱。我已經嘗試使用python elasticsearch模塊以相同語法首先創建索引,但它無法處理動態索引名稱。

是否有任何解決方案可用於我,或者是否必須遍歷Spark中的數據集才能找到所表示的每個日期,創建我需要的索引,然後一次寫入一個索引?我錯過了明顯的東西嗎? Logstash很輕鬆地做到這一點,我不明白爲什麼我不能在Spark中使用它。

下面是我使用的寫命令(嘗試了它不同的變化也是如此):

df.write.format("org.elasticsearch.spark.sql") 
    .option('es.index.auto.create', 'true') 
    .option('es.resource', 'index_name-{ts_col:{YYYY.mm.dd}}/type_name') 
    .option('es.mapping.id', 'es_id') 
    .save() 

下面是我使用的jar:

elasticsearch-hadoop-5.0.0/dist/elasticsearch-spark-20_2.11-5.0.0.jar 

這是我得到的錯誤,當我使用上面的寫命令:

ERROR NetworkClient: Node [##.##.##.##:9200] failed (Invalid target URI [email protected]/index_name-{ts_col:{YYYY.mm.dd}}/type_name); selected next node [##.##.##.##:9200]

...

...

Py4JJavaError: An error occurred while calling o114.save. : org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed;

如果我設置改寫爲True,我得到:

Py4JJavaError: An error occurred while calling o58.save. : org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: no such index null at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:488) at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:446) at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:436) at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:363) at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:92) at org.elasticsearch.hadoop.rest.RestRepository.delete(RestRepository.java:455) at org.elasticsearch.spark.sql.ElasticsearchRelation.insert(DefaultSource.scala:500) at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:94) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:442) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745)

如果我嘗試使用Elasticsearch Python客戶端可以提前創建索引我得到:

RequestError: TransportError(400, u'invalid_index_name_exception', u'Invalid index name [index_name-{ts_col:YYYY.MM.dd}], must be lowercase')

回答

1

你並不需要再次把日期格式大括號內。你可以閱讀更多關於它的here

.option('es.resource', 'index_name-{ts_col:{YYYY.mm.dd}}/type_name')

改變上面,如下圖所示:

.option('es.resource', 'index_name-{ts_col:YYYY.mm.dd}/type_name') 

注:確保您ts_col領域有適當的日期格式。

+0

對不起,延遲響應,但我終於回到嘗試這個,它的工作原理!我有兩個問題。我的花括號過多,我使用的是時間戳列,而不僅僅是日期列。一旦我添加了一個新的日期列,我就可以基於此創建索引。下面是工作的示例代碼: df.write \ \t .format( 「org.elasticsearch.spark.sql」)\ \t。選項( 'es.index.auto.create', '真')\ \t。選項( 'es.write.operation', 'UPSERT')\ \t .mode( '追加')\ \t。選項( 'es.mapping.id', 'ES_ID')\ \t .save( 「%s- {es_date:YYYY.MM.dd} /%s」%(index,type)) – Jim

相關問題