2016-09-21 54 views
3

我有以下格式的json數據集,每行一個條目。Spark列中的數組的每個值的映射

{ "sales_person_name" : "John", "products" : ["apple", "mango", "guava"]} 
{ "sales_person_name" : "Tom", "products" : ["mango", "orange"]} 
{ "sales_person_name" : "John", "products" : ["apple", "banana"]} 
{ "sales_person_name" : "Steve", "products" : ["apple", "mango"]} 
{ "sales_person_name" : "Tom", "products" : ["mango", "guava"]} 

我想知道誰賣了最大的芒果等等。 因此,我想將文件加載到數據框,併爲每個事務發出陣列中每個產品值的(鍵,值)(產品,名稱)對。

var df = spark.read.json("s3n://sales-data.json") 
df.printSchema() 
root 
|-- sales_person_name: string (nullable = true) 
|-- products: array (nullable = true) 

var nameProductsMap = df.select("sales_person_name", "products").show() 
+-----------------+--------------------+ 
|sales_person_name| products   | 
+-----------------+--------------------+ 
|    John|[mango, apple,... | 
|    Tom|[mango, orange,... | 
|    John|[apple, banana... | 

var resultMap = df.select("products", "sales_person_name") 
        .map(r => (r(1), r(0))) 
        .show() //This is where I am stuck. 

我無法找出正確的方式爆炸()行(0),並有它的所有行(1)值一次發射值。任何人都可以提出一種方法謝謝!

+0

給定示例的預期輸出是什麼? – Nyavro

+0

芒果:約翰(4),湯姆(2),格雷格(1)...香蕉:湯姆(5),約翰(2)... – lazywiz

+0

我想這樣的:var actorHashtagsMap = df.select(「products 「,」sales_person_name「)。map(r => {0} .map(x =>(x,r(1))) – lazywiz

回答

1

UPDATE

下面的代碼應該工作

import org.apache.spark.sql.functions.explode 
import scala.collection.mutable 

val resultMap = df.select(explode($"products"), $"sales_person_name") 


def counter(l: TraversableOnce[Any]) = { 
    val temp = mutable.Map[Any, Int]() 
    for (i <- l) { 
     if(temp.contains(i)) temp(i) += 1 
     else temp(i) = 1 
    } 
    temp 
} 

resultsMap.map(x => (x(0), Array(x(1)))). 
      reduceByKey(_ ++ _). 
      map { case (x,y) => (x, counter(y).toArray) } 

所得輸出:Array((banana,Array((John,1))), (guava,Array((Tom,1), (John,1))), (orange,Array((Tom,1))), (apple,Array((Steve,1), (John,2))), (mango,Array((Tom,2), (Steve,1), (John,1))))

+0

這是一箇中間步驟。我終於想要通過col產品來減少它:Apple:John(4),Tom(2),Steve(1);芒果:史蒂夫(3),湯姆(1); // – lazywiz

+0

更新了減少操作的答案。 – septra

+0

感謝代碼@septa然而,我收到以下錯誤。錯誤:值reduceByKey不是org.apache.spark.sql.Dataset [(Any,Array [Any])] 的成員可能的原因:可能是在value減少之前缺少分號?我正在處理任何種類的map(),我正在對結果集進行處理。我正在使用Spark 2.0。任何線索? – lazywiz

4
val exploded = df.explode("products", "product") { a: mutable.WrappedArray[String] => a } 
val result = exploded.drop("products") 
result.show() 

打印:

+-----------------+-------+ 
|sales_person_name|product| 
+-----------------+-------+ 
|    John| apple| 
|    John| mango| 
|    John| guava| 
|    Tom| mango| 
|    Tom| orange| 
|    John| apple| 
|    John| banana| 
|   Steve| apple| 
|   Steve| mango| 
|    Tom| mango| 
|    Tom| guava| 
+-----------------+-------+ 
+0

謝謝Zohar!你看起來很容易。我不得不把import語句放在mutable._中,並且爆炸在select()結果而不是df上。我明白了你的想法,現在很簡單。謝謝! – lazywiz