2個RDDS我有兩個RDDs
加入使用Scala的
第一個(productID,category)
第二個(customerID,productID,quantity)
我怎樣才能讓輸出看起來像(customerID,category,quantity)
?
的邏輯是與第一rdd
對應category
替換的第二rdd
每個productID
。
我想用階爲解決
2個RDDS我有兩個RDDs
加入使用Scala的
第一個(productID,category)
第二個(customerID,productID,quantity)
我怎樣才能讓輸出看起來像(customerID,category,quantity)
?
的邏輯是與第一rdd
對應category
替換的第二rdd
每個productID
。
我想用階爲解決
看來,你有兩個case classes
爲你的兩個rdds
case class Products(productId:String, category:String)
case class Customer(customerId:String, productId:String, quantity:Int)
你有兩個rdds
作爲
val rdd1 = sc.parallelize(Seq(
Products("product1", "category1"),
Products("product2", "category2"),
Products("product3", "category3"),
Products("product4", "category4")
))
val rdd2 = sc.parallelize(Seq(
Customer("customer1", "product1", 5),
Customer("customer1", "product2", 6),
Customer("customer2", "product3", 2),
Customer("customer2", "product4", 9)
))
你可以簡單地join
這兩個rdds
與productId但在加入他們之前,你將不得不創造以productId作爲關鍵字編輯pairRDD
。
rdd1.map(prod => (prod.productId, prod))
rdd2.map(customer => (customer.productId, customer))
最後一步是一個簡單的join
並選擇您想要的值。
rdd1.join(rdd2).map(x => (x._2._2.customerId, x._2._1.category, x._2._2.quantity))
我希望這有助於
謝謝...... 但是這裏'客戶'和'產品'是一對中的一個值 我不能以這種方式或者使用mapValues(x => x(0),...來訪問類的值。 )' –
'顧客'和'產品'不是一個價值。他們是'Tuple2'。如果將它們組合爲一個'array',那麼你可以做'x(0)',但它們在'Tuples'中,所以正確的獲取值的方法是使用'_1 .....''你可以將'tuples'到'數組',但這將是額外的工作。如果它仍然不清楚,你可以要求更多。 :) –
這將幫助人們顯示你已經與樣本數據一起嘗試什麼來幫助你。無論如何,如果我正確地理解你的問題,容易閱讀的方法是首先由產品ID的RDDS轉換爲DataFrames,然後加入他們的行列,類似以下內容:
val rddProduct = sc.parallelize(Seq(
(101, "cat1"),
(102, "cat1"),
(103, "cat2"),
(104, "cat3")
))
// Convert 1st RDD to DataFrame
val dfProduct = rddProduct.toDF("productId", "category")
val rddOrder = sc.parallelize(Seq(
(1, 101, 10),
(1, 103, 5),
(2, 101, 15),
(2, 102, 10),
(2, 103, 10),
(3, 101, 15),
(3, 102, 5),
(3, 104, 5)
))
// Convert 2nd RDD to DataFrame
val dfOrder = rddOrder.toDF("customerId", "productId", "quantity")
// Join dataframes by prodId
val dfResult = dfOrder.as("o").join(
dfProduct.as("p"), $"o.productId" === $"p.productId", "inner"
).
select($"o.customerId", $"p.category", $"o.quantity").
orderBy($"o.customerId", $"p.category", $"o.quantity")
dfResult.show
+----------+--------+--------+
|customerId|category|quantity|
+----------+--------+--------+
| 1| cat1| 10|
| 1| cat2| 5|
| 2| cat1| 10|
| 2| cat1| 15|
| 2| cat2| 10|
| 3| cat1| 5|
| 3| cat1| 15|
| 3| cat3| 5|
+----------+--------+--------+
// Convert back to RDD, if necessary
val rddResult = dfResult.rdd
歡迎堆棧溢出。請查看[Tour](https://stackoverflow.com/tour)和[Asking](https://stackoverflow.com/help/asking)。 – Shiro