我建議你編程scala
爲spark
。如果您在mapreduce
中編程,僅對hadoop
有用,但在scala
中編程爲spark
將使您能夠在spark
以及hadoop
中處理。 Spark
已啓動,以解決mapreduce
模型中的缺陷。你可以在這個主題上找到許多資源。其中之一是this
關於你的問題,我建議你使用dataframe
首要任務是創建schema
爲dataframes。
val schema = StructType(Array(StructField("OgId", StringType),
StructField("ItemId", StringType),
StructField("segmentId", StringType),
StructField("Sequence", StringType),
StructField("Action", StringType)))
下一個任務是讀取兩個文件,並使用上述模式
import org.apache.spark.sql.functions._
val textRdd1 = sparkContext.textFile("input path to file1 in hdfs")
val rowRdd1 = textRdd1.map(line => Row.fromSeq(line.split("\\|\\^\\|", -1)))
var df1 = sqlContext.createDataFrame(rowRdd1, schema)
df1 = df1.withColumn("Action", regexp_replace($"Action", "[|!|]", ""))
val textRdd2 = sparkContext.textFile("input path to file 2 in hdfs")
val rowRdd2 = textRdd2.map(line => Row.fromSeq(line.split("\\|\\^\\|", -1)))
var df2 = sqlContext.createDataFrame(rowRdd2, schema)
df2 = df2.withColumn("Action", regexp_replace($"Action", "[|!|]", ""))
的df1
輸出是
+----------+------+---------+--------+------+
|OgId |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341|136 |4 |1 |I |
|4295877346|136 |4 |1 |I |
|4295877341|138 |2 |1 |I |
|4295877341|141 |4 |1 |I |
|4295877341|143 |2 |1 |I |
|4295877341|145 |14 |1 |I |
+----------+------+---------+--------+------+
和df2
輸出創建數據幀是
+----------+------+---------+--------+------+
|OgId |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341|213 |4 |1 |I |
|4295877341|215 |2 |1 |I |
|4295877341|141 |4 |1 |I |
|4295877341|143 |2 |1 |I |
|4295877343|149 |14 |2 |I |
+----------+------+---------+--------+------+
現在根據您的要求,如果OgId
與df2
匹配並且將df2
的所有附加到df1
,則要從df1
刪除rows
。這些要求可以做如下
val tempdf = df2.select("OgId").withColumnRenamed("OgId", "OgId_1")
df1 = df1.join(tempdf, df1("OgId") === tempdf("OgId_1"), "left")
df1 = df1.filter("OgId_1 is null").drop("OgId_1")
df1 = df1.union(df2)
最終輸出
+----------+------+---------+--------+------+
|OgId |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877346|136 |4 |1 |I |
|4295877341|213 |4 |1 |I |
|4295877341|215 |2 |1 |I |
|4295877341|141 |4 |1 |I |
|4295877341|143 |2 |1 |I |
|4295877343|149 |14 |2 |I |
+----------+------+---------+--------+------+
這最後的結果可以保存在hdfs
作爲
df1.write.format("com.databricks.spark.csv").save("output file path in hdfs")
我希望這是有益
注意:確保你寫入輸入路徑d輸出位置正確
爲什麼你想在mapreduce?我可以建議你在Spark和Hadoop的scala中回答嗎? –
是的請...一些代碼會很好 – SUDARSHAN
我猜你對Spark和Scala和dataFrame有所瞭解,是嗎? –