2012-08-14 145 views
1

我正在尋找實現一個自定義hadoop可寫類,其中一個字段是時間戳。我似乎無法在hadoop庫中找到一個類(例如Writable for Date或Calendar),這會使這一點變得簡單。我正在考慮在日曆上使用get/setTimeInMillis創建自定義可寫,但我想知道是否有更好的/內置的解決方案來解決這個問題。Hadoop可寫日期/日曆

回答

3

Hadoop中沒有可編寫的日曆/日期。考慮到您可以從Calendar對象中獲取timeInMillis,您可以使用LongWritable對一個日曆對象進行序列化,當且僅當您的應用程序始終使用默認的UTC時區(即它對時區「不可知」)時,它總是假定timeInMillis表示UTC時間)。

如果您使用其他時區或者您的應用程序需要能夠解釋timeInMillis相對於各個時區,則必須從頭開始編寫默認的Writable實現。

+0

感謝您的確認! – ChaseMedallion 2012-08-14 20:51:50

1

下面是我爲您生成的一個自定義寫入,以說明具有三個屬性(其中一個是日期)的可寫入項。您可以看到數據值持續很長時間,並且很容易將長轉換爲日期和從日期轉換。如果有三個屬性太多,我可以爲你生成一個帶日期的可寫。

package com.lmx.writable; 

import java.io.ByteArrayInputStream; 
import java.io.ByteArrayOutputStream; 
import java.io.DataInput; 
import java.io.DataInputStream; 
import java.io.DataOutput; 
import java.io.DataOutputStream; 
import java.io.IOException; 
import java.nio.ByteBuffer; 
import java.util.*; 
import com.eaio.uuid.UUID; 
import org.apache.hadoop.io.*; 
import org.apache.pig.ResourceSchema; 
import org.apache.pig.ResourceSchema.ResourceFieldSchema; 
import org.apache.pig.backend.executionengine.ExecException; 
import org.apache.pig.data.DataBag; 
import org.apache.pig.data.DataType; 
import org.apache.pig.data.DefaultDataBag; 
import org.apache.pig.data.Tuple; 
import org.apache.pig.data.TupleFactory; 
import org.json.JSONArray; 
import org.json.JSONException; 
import org.json.JSONObject; 

public class MyCustomWritable implements Writable { 

    public static int PROPERTY_DATE = 0; 
    public static int PROPERTY_COUNT = 1; 
    public static int PROPERTY_NAME = 2; 

    private boolean[] changeFlag = new boolean[3]; 

    private Date _date; 
    private int _count; 
    private String _name; 

    public MyCustomWritable() { 
    resetChangeFlags(); 
    } 

    public MyCustomWritable(Date _date, int _count, String _name) { 
    resetChangeFlags(); 
    setDate(_date); 
    setCount(_count); 
    setName(_name); 
    } 

    public MyCustomWritable(byte[] bytes) { 
    ByteArrayInputStream is = new ByteArrayInputStream(bytes); 
    DataInput in = new DataInputStream(is); 
    try { readFields(in); } catch (IOException e) { } 
    resetChangeFlags(); 
    } 



    public Date getDate() { 
    return _date; 
    } 

    public void setDate(Date value) { 
    _date = value; 
    changeFlag[PROPERTY_DATE] = true; 
    } 

    public int getCount() { 
    return _count; 
    } 

    public void setCount(int value) { 
    _count = value; 
    changeFlag[PROPERTY_COUNT] = true; 
    } 

    public String getName() { 
    return _name; 
    } 

    public void setName(String value) { 
    _name = value; 
    changeFlag[PROPERTY_NAME] = true; 
    } 

    public void readFields(DataInput in) throws IOException { 

      // Read Date _date 

     if (in.readBoolean()) { 
      _date = new Date(in.readLong()); 
      changeFlag[PROPERTY_DATE] = true; 
     } else { 
      _date = null; 
      changeFlag[PROPERTY_DATE] = false; 
     }  
      // Read int _count 

     _count = in.readInt(); 
     changeFlag[PROPERTY_COUNT] = true; 

      // Read String _name 

     if (in.readBoolean()) { 
      _name = Text.readString(in); 
      changeFlag[PROPERTY_NAME] = true; 
     } else { 
      _name = null; 
      changeFlag[PROPERTY_NAME] = false; 
     } 
    } 

    public void write(DataOutput out) throws IOException { 

      // Write Date _date 

     if (_date == null) { 
      out.writeBoolean(false); 
     } else { 
      out.writeBoolean(true); 
      out.writeLong(_date.getTime()); 
     } 

      // Write int _count 

     out.writeInt(_count); 

      // Write String _name 

     if (_name == null) { 
      out.writeBoolean(false); 
     } else { 
      out.writeBoolean(true); 
      Text.writeString(out,_name); 
     } 
    } 

    public byte[] getBytes() throws IOException { 
     ByteArrayOutputStream os = new ByteArrayOutputStream(); 
     DataOutputStream out = new DataOutputStream(os); 
     write(out); 
     out.flush(); 
     out.close(); 
     return os.toByteArray(); 
    } 

    public void resetChangeFlags() { 
    changeFlag[PROPERTY_DATE] = false; 
    changeFlag[PROPERTY_COUNT] = false; 
    changeFlag[PROPERTY_NAME] = false; 
    } 

    public boolean getChangeFlag(int i) { 
    return changeFlag[i]; 
    } 


    public byte[] getDateAsBytes() throws IOException { 
     ByteArrayOutputStream os = new ByteArrayOutputStream(); 
     DataOutputStream out = new DataOutputStream(os); 

      // Write Date _date 

     if (_date == null) { 
      out.writeBoolean(false); 
     } else { 
      out.writeBoolean(true); 
      out.writeLong(_date.getTime()); 
     } 

     out.flush(); 
     out.close(); 
     return os.toByteArray(); 
    } 

    public byte[] getCountAsBytes() throws IOException { 
     ByteArrayOutputStream os = new ByteArrayOutputStream(); 
     DataOutputStream out = new DataOutputStream(os); 

      // Write int _count 

     out.writeInt(_count); 

     out.flush(); 
     out.close(); 
     return os.toByteArray(); 
    } 

    public byte[] getNameAsBytes() throws IOException { 
     ByteArrayOutputStream os = new ByteArrayOutputStream(); 
     DataOutputStream out = new DataOutputStream(os); 

      // Write String _name 

     if (_name == null) { 
      out.writeBoolean(false); 
     } else { 
      out.writeBoolean(true); 
      Text.writeString(out,_name); 
     } 

     out.flush(); 
     out.close(); 
     return os.toByteArray(); 
    } 


    public void setDateFromBytes(byte[] b) throws IOException { 
     ByteArrayInputStream is = new ByteArrayInputStream(b); 
     DataInput in = new DataInputStream(is); 
     int len; 

      // Read Date _date 

     if (in.readBoolean()) { 
      _date = new Date(in.readLong()); 
      changeFlag[PROPERTY_DATE] = true; 
     } else { 
      _date = null; 
      changeFlag[PROPERTY_DATE] = false; 
     } 
    } 

    public void setCountFromBytes(byte[] b) throws IOException { 
     ByteArrayInputStream is = new ByteArrayInputStream(b); 
     DataInput in = new DataInputStream(is); 
     int len; 

      // Read int _count 

     _count = in.readInt(); 
     changeFlag[PROPERTY_COUNT] = true; 

    } 

    public void setNameFromBytes(byte[] b) throws IOException { 
     ByteArrayInputStream is = new ByteArrayInputStream(b); 
     DataInput in = new DataInputStream(is); 
     int len; 

      // Read String _name 

     if (in.readBoolean()) { 
      _name = Text.readString(in); 
      changeFlag[PROPERTY_NAME] = true; 
     } else { 
      _name = null; 
      changeFlag[PROPERTY_NAME] = false; 
     } 

    } 

    public Tuple asTuple() throws ExecException { 

     Tuple tuple = TupleFactory.getInstance().newTuple(3); 

     if (getDate() == null) { 
      tuple.set(0, (Long) null); 
     } else { 
      tuple.set(0, new Long(getDate().getTime())); 
     } 
     tuple.set(1, new Integer(getCount())); 
     if (getName() == null) { 
      tuple.set(2, (String) null); 
     } else { 
      tuple.set(2, getName()); 
     } 

     return tuple; 
    } 

    public static ResourceSchema getPigSchema() throws IOException { 

     ResourceSchema schema = new ResourceSchema(); 
     ResourceFieldSchema fieldSchema[] = new ResourceFieldSchema[3]; 
     ResourceSchema bagSchema; 
     ResourceFieldSchema bagField[]; 

     fieldSchema[0] = new ResourceFieldSchema(); 
     fieldSchema[0].setName("date"); 
     fieldSchema[0].setType(DataType.LONG); 

     fieldSchema[1] = new ResourceFieldSchema(); 
     fieldSchema[1].setName("count"); 
     fieldSchema[1].setType(DataType.INTEGER); 

     fieldSchema[2] = new ResourceFieldSchema(); 
     fieldSchema[2].setName("name"); 
     fieldSchema[2].setType(DataType.CHARARRAY); 

     schema.setFields(fieldSchema); 
     return schema; 

    } 

    public static MyCustomWritable fromJson(String source) { 

     MyCustomWritable obj = null; 

     try { 
      JSONObject jsonObj = new JSONObject(source); 
      obj = fromJson(jsonObj); 
     } catch (JSONException e) { 
      System.out.println(e.toString()); 
     } 

     return obj; 
    } 

    public static MyCustomWritable fromJson(JSONObject jsonObj) { 

     MyCustomWritable obj = new MyCustomWritable(); 

     try { 

      if (jsonObj.has("date")) { 
       obj.setDate(new Date(jsonObj.getLong("date"))); 
      } 

      if (jsonObj.has("count")) { 
       obj.setCount(jsonObj.getInt("count")); 
      } 

      if (jsonObj.has("name")) { 
       obj.setName(jsonObj.getString("name")); 
      } 

     } catch (JSONException e) { 
      System.out.println(e.toString()); 
      obj = null; 
     } 

     return obj; 
    } 

    public JSONObject toJson() { 

     try { 
      JSONObject jsonObj = new JSONObject(); 
      JSONArray jsonArray; 

      if (getDate() != null) { 
       jsonObj.put("date", getDate().getTime()); 
      } 
      jsonObj.put("count", getCount()); 

      if (getName() != null) { 
       jsonObj.put("name", getName()); 
      } 
      return jsonObj; 
     } catch (JSONException e) { } 

     return null;  
    } 

    public String toJsonString() { 

     return toJson().toString(); 

    } 
}