2017-06-03 58 views
-1

2個RDDS我有兩個RDDs加入使用Scala的

第一個(productID,category)

第二個(customerID,productID,quantity)

我怎樣才能讓輸出看起來像(customerID,category,quantity)

的邏輯是與第一rdd對應category替換的第二rdd每個productID

我想用階爲解決

+0

歡迎堆棧溢出。請查看[Tour](https://stackoverflow.com/tour)和[Asking](https://stackoverflow.com/help/asking)。 – Shiro

回答

1

看來,你有兩個case classes爲你的兩個rdds

case class Products(productId:String, category:String) 
case class Customer(customerId:String, productId:String, quantity:Int) 

你有兩個rdds作爲

val rdd1 = sc.parallelize(Seq(
    Products("product1", "category1"), 
    Products("product2", "category2"), 
    Products("product3", "category3"), 
    Products("product4", "category4") 
)) 
val rdd2 = sc.parallelize(Seq(
    Customer("customer1", "product1", 5), 
    Customer("customer1", "product2", 6), 
    Customer("customer2", "product3", 2), 
    Customer("customer2", "product4", 9) 
)) 

你可以簡單地join這兩個rdds與productId但在加入他們之前,你將不得不創造以productId作爲關鍵字編輯pairRDD

rdd1.map(prod => (prod.productId, prod)) 
rdd2.map(customer => (customer.productId, customer)) 

最後一步是一個簡單的join並選擇您想要的值。

rdd1.join(rdd2).map(x => (x._2._2.customerId, x._2._1.category, x._2._2.quantity)) 

我希望這有助於

+0

謝謝...... 但是這裏'客戶'和'產品'是一對中的一個值 我不能以這種方式或者使用mapValues(x => x(0),...來訪問類的值。 )' –

+0

'顧客'和'產品'不是一個價值。他們是'Tuple2'。如果將它們組合爲一個'array',那麼你可以做'x(0)',但它們在'Tuples'中,所以正確的獲取值的方法是使用'_1 .....''你可以將'tuples'到'數組',但這將是額外的工作。如果它仍然不清楚,你可以要求更多。 :) –

0

這將幫助人們顯示你已經與樣本數據一起嘗試什麼來幫助你。無論如何,如果我正確地理解你的問題,容易閱讀的方法是首先由產品ID的RDDS轉換爲DataFrames,然後加入他們的行列,類似以下內容:

val rddProduct = sc.parallelize(Seq(
    (101, "cat1"), 
    (102, "cat1"), 
    (103, "cat2"), 
    (104, "cat3") 
)) 

// Convert 1st RDD to DataFrame 
val dfProduct = rddProduct.toDF("productId", "category") 

val rddOrder = sc.parallelize(Seq(
    (1, 101, 10), 
    (1, 103, 5), 
    (2, 101, 15), 
    (2, 102, 10), 
    (2, 103, 10), 
    (3, 101, 15), 
    (3, 102, 5), 
    (3, 104, 5) 
)) 

// Convert 2nd RDD to DataFrame 
val dfOrder = rddOrder.toDF("customerId", "productId", "quantity") 

// Join dataframes by prodId 
val dfResult = dfOrder.as("o").join(
    dfProduct.as("p"), $"o.productId" === $"p.productId", "inner" 
). 
    select($"o.customerId", $"p.category", $"o.quantity"). 
    orderBy($"o.customerId", $"p.category", $"o.quantity") 

dfResult.show 
+----------+--------+--------+ 
|customerId|category|quantity| 
+----------+--------+--------+ 
|   1| cat1|  10| 
|   1| cat2|  5| 
|   2| cat1|  10| 
|   2| cat1|  15| 
|   2| cat2|  10| 
|   3| cat1|  5| 
|   3| cat1|  15| 
|   3| cat3|  5| 
+----------+--------+--------+ 

// Convert back to RDD, if necessary 
val rddResult = dfResult.rdd