2014-07-11 53 views
0

假設我有以下的JSON:豬的新手,如何將JSON轉換爲另一個JSON,並使用豬的關鍵值對子集?

{ 
"state":"VA", 
"fruit":[ 
{"name":"Bannana", 
"color":"Yellow", 
"cost":1.6 
}, 
{"name":"Apple", 
"color":"Red" 
"cost":1.4 
} 
]} 

在豬,我怎麼變換上述以下:

{ 
"state":"VA", 
"fruit":[ 
{"name":"Bannana",, 
"cost":1.6 
}, 
{"name":"Apple", 
"cost":1.4 
} 
]} 

我已經試過:

A = #load file 
B = FOREACH A GENERATE 
state, 
fruit.name, 
fruit.cost; 

和如下:

A = #load file 
B = FOREACH A GENERATE 
state, 
fruit as (m:bag{FruitInfo.(tuple(name:string, cost:double))}); 

似乎不管我做什麼我一直在獲取嵌套數組。我正在嘗試做什麼?我選擇了豬的數據轉換能力。請注意,數據使用AvroStorage加載。

+0

看看這裏:http://pig.apache.org/docs/r0 .12.0/basic.html – aelor

+0

嘗試使用UDF。我已將工作代碼發佈爲答案。任何限制不使用UDF? –

回答

0

我懷疑它是否可以在沒有UDF的情況下完成。

的Java UDF:

package com.example.exp1; 

import java.io.IOException; 
import java.util.Iterator; 

import org.apache.pig.EvalFunc; 
import org.apache.pig.data.BagFactory; 
import org.apache.pig.data.DataBag; 
import org.apache.pig.data.DataType; 
import org.apache.pig.data.Tuple; 
import org.apache.pig.data.TupleFactory; 
import org.apache.pig.impl.logicalLayer.schema.Schema; 
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; 

public class ReformatBag extends EvalFunc<DataBag> { 

    @Override 
    public DataBag exec(Tuple input) throws IOException { 

     DataBag returnBag = BagFactory.getInstance().newDefaultBag(); 
     if (input == null || input.size() == 0 || input.get(0) == null) 
      return null; 
     try { 
      // Get the bag 
      DataBag bag = DataType.toBag(input.get(0)); 
      // Iterate throughout the bag 
      Iterator it = bag.iterator(); 
      while (it.hasNext()) { 
       // assign the current 
       Tuple t = (Tuple) it.next(); 
       // Create a new Tuple of size 2 where the refactor the data 
       Tuple reformatTuple = TupleFactory.getInstance().newTuple(2); 
       // Add 1st field (name) 
       reformatTuple.set(0, t.get(0)); 
       // Add 3rd field (price) 
       reformatTuple.set(1, t.get(2)); 
       //add to the bag. Continue iterating. 
       returnBag.add(reformatTuple); 
      } 
     } catch (Exception e) { 
      throw new IOException("Caught exception processing input row ", e); 
     } 
     return returnBag; 
    } 

    public Schema outputSchema(Schema input) { 
     try { 
      Schema outputSchema = new Schema(); 
      Schema bagSchema = new Schema(); 
      // Tuple schema which holds name and cost schema 
      Schema innerTuple = new Schema(); 
      innerTuple.add(new FieldSchema("name", DataType.CHARARRAY)); 
      innerTuple.add(new FieldSchema("cost", DataType.FLOAT)); 

      // Add the tuple schema holding name & cost to the bag schema 
      bagSchema.add(new FieldSchema("t1", innerTuple, DataType.TUPLE)); 
      // Return type of the UDF 
      outputSchema.add(new FieldSchema("fruit", bagSchema, DataType.BAG)); 
      return outputSchema; 
     } catch (Exception e) { 
      return null; 
     } 
    } 

} 

豬代碼:

REGISTER /path/to/jar/exp1-1.0-SNAPSHOT.jar; 

input_data = LOAD 'text.json' 
     USING JsonLoader('state:chararray,fruit:{(name:chararray,color:chararray,cost:float)}'); 

reformat_data = FOREACH input_data 
     GENERATE state,com.example.exp1.ReformatBag(fruit); 
STORE 
     reformat_data 
     INTO 'output.json' 
     USING JsonStorage(); 

輸出:

{"state":"VA","fruit":[{"name":"Bannana","cost":1.6},{"name":"Apple","cost":1.4}]} 
相關問題