2016-02-25 55 views
-1

我有一些數據如下,我已經讀入通常的火花RDD(無架構)每個鍵一個新列創建值:如何爲用於基於條件火花的Scala

before transformation

現在,我想創建一個新列。如果客戶的事件包含w作爲值,則新列將被設置爲1。所以造成RDD將是:

after treansformation

我一直沒能做到這一點。我至今爲以下,其中數據1是讀入RDD數據:

val data2 = data1.groupBy(_._2) 
    .map(_._2.map{ case (a1: Array[String], a2, a3, a4) => 
    val myString = "w" 
    if (a1.contains(myString)) { (a1,a2,a3,a4,array_of_ones) else (a1,a2,a3,a4,array_of_zeros)} 
     }) 

1http://i.stack.imgur.com/P7bTx.jpgenter代碼在這裏

在上面,array_of_ones和array_of_zeros必須具有相同的長度爲A1的每個分區。我怎樣才能做到這一點?如果可能,請假定不允許加入RDD。謝謝。你能解決這個

+1

這實在是很難理解你想要什麼。客戶3沒有'w',但是在indicator3中有一些。此外,可以張貼正確的類型而不是截圖的實際數據結構?類型很重要..最後,加入與groupBy具有幾乎相同的性能。並不是說這是必需的。 – zero323

+0

對此我感到抱歉。 Indicator3對於客戶3必須具有0,對於客戶4必須具有1。對於事件和客戶,類型是字符串,對於指標是Int。我將數據讀入一個元組(a1,a2,a3,a4)。我會在2天內更新問題。我很抱歉。 – eugenerory

+0

不需要道歉,但請糾正這:) – zero323

回答

1

一種方法是使用DataFrames

import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions.sum 

val df = data1.toDF("event", "customer", "indicator1", "indicator2") 
val w = Window.partitionBy($"customer").rowsBetween(Long.MinValue, Long.MaxValue) 

val isW = ($"event" === "w").cast("long") 
val indicator3 = (sum(isW).over(w) > 0).cast("long") 

df.withColumn("indicator3", indicator3)