2017-04-27 34 views
0

我有一些關於ElasticSearch的數據需要在HDFS上發送。我試圖使用豬(這是我第一次使用它),但我有一些問題需要爲我的數據定義正確的模式。首先,我嘗試使用選項'es.output.json=true'org.elasticsearch.hadoop.pig.EsStorage加載JSON,並且我可以正確加載/轉儲數據,並使用STORE A INTO 'hdfs://path/to/store';將它們保存爲HDS格式的HDFS。之後,在HIVE上定義一個外部表格,我可以查詢這些數據。這是工作的完整的例子罰款(我刪除了所有SSL從代碼屬性):從ES加載數據並使用pig存儲爲HDro HDFS

REGISTER /path/to/commons-httpclient-3.1.jar; 
REGISTER /path/to/elasticsearch-hadoop-5.3.0.jar; 

A = LOAD 'my-index/log' USING org.elasticsearch.hadoop.pig.EsStorage(
'es.nodes=https://addr1:port,https://addr2:port2,https://addr3:port3', 
'es.query=?q=*', 
'es.output.json=true'); 

STORE A INTO 'hdfs://path/to/store'; 

我如何保存我的數據AVRO到HDFS?我想我需要使用AvroStorage,但我還應該定義一個加載數據的模式,或者JSON就夠了?我試圖用LOAD...USING...AS命令定義模式,並設置es.mapping.date.rich=false而不是es.output.json=true(我的數據非常複雜,帶有地圖和類似的地圖),但它不起作用。我不確定問題出在語法上,還是在方法本身上。對正確的方向提示一下就很好了。

UPDATE

這是我與es.mapping.date.rich=false嘗試的例子。我的問題是,如果一個字段爲空,則所有字段的順序都是錯誤的。

A = LOAD 'my-index/log' USING org.elasticsearch.hadoop.pig.EsStorage(
    'es.nodes=https://addr1:port,https://addr2:port2,https://addr3:port3', 
    'es.query=?q=*', 
    'es.mapping.date.rich=false') 
    AS(
    field1:chararray, 
    field2:chararray, 
    field3:map[chararray,fieldMap:map[],chararray], 
    field4:chararray, 
    field5:map[] 
); 

B = FOREACH A GENERATE field1, field2; 

STORE B INTO 'hdfs://path/to/store' USING AvroStorage(' 
{ 
    "type" : "foo1", 
    "name" : "foo2", 
    "namespace" : "foo3", 
    "fields" : [ { 
    "name" : "field1", 
    "type" : ["null","string"], 
    "default" : null 
    }, { 
    "name" : "field2", 
    "type" : ["null","string"], 
    "default" : null 
    } ] 
} 
'); 

回答

0

對於未來的讀者,我決定用spark,而不是因爲它比pig快得多。要在hdfs上保存avro文件,我正在使用databrick庫。