2016-11-23 73 views
0

我有兩個數據集已經使用相同的分區程序進行分區並存儲在HDFS中。這些數據集是兩個不同的Spark作業的輸出,我們沒有控制權。現在,我想加入這兩個數據集來產生不同的信息。用Spark加入HDFS中的兩個數據文件?

Example: 

Data Set 1: 
ORDER_ID CUSTOMER_ID ITEMS 
OD1  C1   1,2,3 -> partition 0 
OD2  C2   3,4,5 -> partition 0 
OD3  C4   1,2,3 -> partition 1 
OD4  C3   1,3  -> partition 1 

Data Set 1: 
ORDER_ID CUSTOMER_ID REFUND_ITEMS 
OD1  C1   1  -> partition 0 
OD2  C2   5  -> partition 0 
OD3  C4   2,3 -> partition 1 
OD4  C3   3  -> partition 1 

Options are: 

1) Create two RDDs from the datasets and join them. 
2) Create one RDD using one of the dataset. 
    -> For each partition in the RDD get the actual partition id i.e OD1 -> 0, OD3 -> 1 (using some custom logic) 
    -> Load data from HDFS for that partition for dataset 2 
    -> Iterate over both the dataset and produce combined result. 

For option 2 I don't know how to read a specific file form HDFS in the Spark executor. (I have the full URI for location of the file) 

回答

0

您可以嘗試創建2個數據框並使用SQL將它們連接起來。請找到下面的代碼。

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder 
import org.apache.spark.sql.Encoder 

// For implicit conversions from RDDs to DataFrames 
import spark.implicits._ 

case class struc_dataset(ORDER_ID: String,CUSTOMER_ID: String, ITEMS:String) 

//Read file1 
val File1DF = spark.sparkContext 
    .textFile("temp/src/file1.txt") 
    .map(_.split("\t")) 
    .map(attributes => struc_dataset(attributes(0), attributes(1),attributes(3))).toDF() 

//Register as Temp view - Dataset1 
File1DF.createOrReplaceTempView("Datset1") 

//Read file2 
val File2DF = spark.sparkContext 
    .textFile("temp/src/file2.txt") 
    .map(_.split("\t")) 
    .map(attributes => struc_dataset(attributes(0),attributes(1),attributes(3))).toDF() 

//Register as Temp view - Dataset2 
File2DF.createOrReplaceTempView("Datset2") 

// SQL statement to create final dataframe (JOIN) 
val finalDF = spark.sql("SELECT * FROM Dataset1 ds1 JOIN Dataset2 ds2 on ds1.ORDER_ID=ds2.ORDER_ID AND ds1.CUSTOMER_ID=ds2.CUSTOMER_ID") 

finalDF.show()