2012-05-10 62 views
2

我正在寫書面處理二進制格式的大的時間序列數據文件看起來像這樣(新線這裏可讀性M/R工作,實際數據是連續的,很明顯):Hadoop中的MapReduce創建自定義InputFormat和RecordReader對於二進制文件

TIMESTAMP_1---------------------TIMESTAMP_1 
TIMESTAMP_2**********TIMESTAMP_2 
TIMESTAMP_3%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%TIMESTAMP_3 
.. etc 

哪裏時間戳是一個簡單的8字節結構,識別爲例如由前2個字節。如上所示,實際數據被限制在重複值時間戳之間,並且包含一個或多個預定義的結構。我想編寫一個自定義InputFormat將發射鍵/值對映射器:

< TIMESTAMP_1, --------------------- > 
< TIMESTAMP_2, ********** > 
< TIMESTAMP_3, %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% > 

從邏輯上講,我想保持目前的TIMESTAMP的軌道,並聚集所有的數據,直到TIMESTAMP是再次檢測,然後發送我的<TIMESTAMP, DATA>對作爲記錄。我的問題是分裂之間同步的RecordReader裏面,所以如果某個讀者收到以下分裂

# a split occurs inside my data 
reader X: TIMESTAMP_1-------------- 
reader Y: -------TIMESTAMP_1 TIMESTAMP_2****.. 

# or inside the timestamp 
or even: @@@@@@@TIMES 
     TAMP_1-------------- .. 

什麼是解決這個的好辦法?我有一個簡單的方法來訪問文件偏移,使得我CustomRecordReader可以拆分之間同步和不丟失數據?我覺得我在處理分裂方面存在一些概念上的空白,所以也許對這些解釋有所幫助。謝謝。

回答

2

你也可以繼承的FileInputFormat具體子類,例如,SeqenceFileAsBinaryInputFormat,並覆蓋isSplitable()方法返回false

import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat; 

public class NonSplitableBinaryFile extends SequenceFileAsBinaryInputFormat{ 

    @Override 
    protected boolean isSplitable(FileSystem fs, Path file) { 
     return false; 
    } 

    @Override 
    public RecordReader getRecordReader(InputSplit split, JobConf job, 
    Reporter reporter) throws IOException { 
    //return your customized record reader here 
    } 
} 
+0

如何保持跟蹤你的文件的大小,而寫它HDFS?當文件的大小接近閾值時,關閉它並打開一個新文件進行寫入。正如大衛所說,爲了不失去地方性。 –

3

一般來說,不能簡單地創建支持拆分輸入格式,因爲你應該能夠找出從分界線移動到哪裏以獲得一致的記錄。 XmlInputFormat是這種格式的好例子。
我建議首先考慮,如果你確實需要裂開的投入?您可以將輸入格式定義爲不可拆分,並且不具有所有這些問題。
如果你的文件一般都不會大得多然後塊大小 - 你什麼都不會。如果他們這樣做 - 你將失去部分數據局部性。

+0

我的文件是100的MB到GB的10年代的順序,所以肯定裂開的。如果我阻止文件被分割,我是否會受到重大性能影響? – sa125

+0

你的表現擊中將取決於處理/網絡速度的關係速度。在一個映射器中處理大文件意味着它大部分來自網絡,而不是本地驅動器。現在,如果你處理10mb /秒 - 你不會感覺到它。如果你處理400 - 你做(假設1 GBit網絡)。您還可以增加這些文件的塊大小,從而減少影響 –

相關問題