2017-05-25 69 views
2

我已經結構化HDF基礎文本文件,其中有這樣的數據(file.txt的):如何處理增量更新在Hadoop的HDFS地圖,減少

OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!| 

4295877341|^|136|^|4|^|1|^|I|!| 
4295877346|^|136|^|4|^|1|^|I|!| 
4295877341|^|138|^|2|^|1|^|I|!| 
4295877341|^|141|^|4|^|1|^|I|!| 
4295877341|^|143|^|2|^|1|^|I|!| 
4295877341|^|145|^|14|^|1|^|I|!| 
123456789|^|145|^|14|^|1|^|I|!| 

的file.txt的的尺寸爲30 GB。

我有大小的增量數據FILE1.TXT約2 GB即將在同一格式HFDS象下面這樣:

OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!| 

4295877341|^|213|^|4|^|1|^|I|!| 
4295877341|^|213|^|4|^|1|^|I|!| 
4295877341|^|215|^|2|^|1|^|I|!| 
4295877341|^|141|^|4|^|1|^|I|!| 
4295877341|^|143|^|2|^|1|^|I|!| 
4295877343|^|149|^|14|^|2|^|I|!| 
123456789|^|145|^|14|^|1|^|D|!| 

現在我必須結合file.txt的和FILE1.TXT和創建最終包含所有唯一記錄的文本文件。

這兩個文件中的關鍵都是OrgId。如果在第一個文件中找到相同的OrgId,那麼我必須用新的OrgId替換,如果不是,那麼我必須插入新的OrgId。

最終輸出是這樣的。

OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!| 

4295877346|^|136|^|4|^|1|^|I|!| 
4295877341|^|213|^|4|^|1|^|I|!| 
4295877341|^|215|^|2|^|1|^|I|!| 
4295877341|^|141|^|4|^|1|^|I|!| 
4295877341|^|143|^|2|^|1|^|I|!| 
4295877343|^|149|^|14|^|2|^|I|!| 

我該怎麼做mapreduce?

我不打算爲HIVE解決方案,因爲我有這麼多獨特的文件,這樣大約10,000,所以我必須在HIVE中創建10,000個分區。

對此用例有任何建議使用Spark?

+0

爲什麼你想在mapreduce?我可以建議你在Spark和Hadoop的scala中回答嗎? –

+0

是的請...一些代碼會很好 – SUDARSHAN

+0

我猜你對Spark和Scala和dataFrame有所瞭解,是嗎? –

回答

3

我建議你編程scalaspark。如果您在mapreduce中編程,僅對hadoop有用,但在scala中編程爲spark將使您能夠在spark以及hadoop中處理。 Spark已啓動,以解決mapreduce模型中的缺陷。你可以在這個主題上找到許多資源。其中之一是this

關於你的問題,我建議你使用dataframe

首要任務是創建schema爲dataframes。

val schema = StructType(Array(StructField("OgId", StringType), 
    StructField("ItemId", StringType), 
    StructField("segmentId", StringType), 
    StructField("Sequence", StringType), 
    StructField("Action", StringType))) 

下一個任務是讀取兩個文件,並使用上述模式

import org.apache.spark.sql.functions._ 
val textRdd1 = sparkContext.textFile("input path to file1 in hdfs") 
val rowRdd1 = textRdd1.map(line => Row.fromSeq(line.split("\\|\\^\\|", -1))) 
var df1 = sqlContext.createDataFrame(rowRdd1, schema) 
df1 = df1.withColumn("Action", regexp_replace($"Action", "[|!|]", "")) 

val textRdd2 = sparkContext.textFile("input path to file 2 in hdfs") 
val rowRdd2 = textRdd2.map(line => Row.fromSeq(line.split("\\|\\^\\|", -1))) 
var df2 = sqlContext.createDataFrame(rowRdd2, schema) 
df2 = df2.withColumn("Action", regexp_replace($"Action", "[|!|]", "")) 

df1輸出是

+----------+------+---------+--------+------+ 
|OgId  |ItemId|segmentId|Sequence|Action| 
+----------+------+---------+--------+------+ 
|4295877341|136 |4  |1  |I  | 
|4295877346|136 |4  |1  |I  | 
|4295877341|138 |2  |1  |I  | 
|4295877341|141 |4  |1  |I  | 
|4295877341|143 |2  |1  |I  | 
|4295877341|145 |14  |1  |I  | 
+----------+------+---------+--------+------+ 

df2輸出創建數據幀是

+----------+------+---------+--------+------+ 
|OgId  |ItemId|segmentId|Sequence|Action| 
+----------+------+---------+--------+------+ 
|4295877341|213 |4  |1  |I  | 
|4295877341|215 |2  |1  |I  | 
|4295877341|141 |4  |1  |I  | 
|4295877341|143 |2  |1  |I  | 
|4295877343|149 |14  |2  |I  | 
+----------+------+---------+--------+------+ 

現在根據您的要求,如果OgIddf2匹配並且將df2的所有附加到df1,則要從df1刪除rows。這些要求可以做如下

val tempdf = df2.select("OgId").withColumnRenamed("OgId", "OgId_1") 

df1 = df1.join(tempdf, df1("OgId") === tempdf("OgId_1"), "left") 
df1 = df1.filter("OgId_1 is null").drop("OgId_1") 
df1 = df1.union(df2) 

最終輸出

+----------+------+---------+--------+------+ 
|OgId  |ItemId|segmentId|Sequence|Action| 
+----------+------+---------+--------+------+ 
|4295877346|136 |4  |1  |I  | 
|4295877341|213 |4  |1  |I  | 
|4295877341|215 |2  |1  |I  | 
|4295877341|141 |4  |1  |I  | 
|4295877341|143 |2  |1  |I  | 
|4295877343|149 |14  |2  |I  | 
+----------+------+---------+--------+------+ 

這最後的結果可以保存在hdfs作爲

df1.write.format("com.databricks.spark.csv").save("output file path in hdfs") 

我希望這是有益

注意:確保你寫入輸入路徑d輸出位置正確

+0

非常感謝你,我會實現這一目標..非常適合我.. – SUDARSHAN

+0

如果您遇到任何問題,請讓我知道。 :)如果你認爲答案值得一個,請點贊。 ;)謝謝 –

+0

嗨Ramesh只是一個問題,如果我將有增量文件,沒有相同的頭文件作爲基本文件,那麼這種解決方案將工作?而且我的增量文件是爲了所以如果我們加入,然後更新順序將是保存? – SUDARSHAN