2

我有一個DataFrame我必須應用一系列的過濾器查詢。例如,我按如下方式加載我的DataFrame如何根據Spark DataFrame對查詢/計數進行並行化/分配?

val df = spark.read.parquet("hdfs://box/some-parquet") 

然後我有一堆「隨意」過濾器如下。

  • C0 = '真' 和C1 = '假'
  • C0 = '假' 和C3 = '真'
  • 等等...

我通常把這些使用util方法動態過濾。

val filters: List[String] = getFilters() 

我要做的是將這些過濾器的DataFrame得到計數。例如。

val counts = filters.map(filter => { 
df.where(filter).count 
}) 

我注意到,在映射到過濾器時不是並行/分佈式操作。如果我將過濾器粘貼到RDD/DataFrame中,則此方法也不起作用,因爲那時我將執行嵌套數據框操作(正如我在SO上讀過的,它不允許在Spark中)。類似下面的提供了一個NullPointerException(NPE)。

val df = spark.read.parquet("hdfs://box/some-parquet") 
val filterRDD = spark.sparkContext.parallelize(List("C0='false'", "C1='true'")) 
val counts = filterRDD.map(df.filter(_).count).collect 
 
Caused by: java.lang.NullPointerException 
    at org.apache.spark.sql.Dataset.filter(Dataset.scala:1127) 
    at $anonfun$1.apply(:27) 
    at $anonfun$1.apply(:27) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1336) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:912) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:912) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

有什麼辦法並行/在火花DataFrame分配計數過濾器?順便說一下,我在Spark v2.0.2上。

+1

假設你想要達到的目標是在輸入DATAS單程(否則,有可能是沒有收穫期待了這個問題),我d將過濾器函數返回給返回1(過濾器匹配)或0(無過濾器匹配)的UDF,通過UDF向數據框添加1列,並對添加的列執行groupBy/count,這將導致1行數據幀,掌握所有的計數。 – GPI

+0

你能舉個例子嗎? –

回答

1

通過這樣做,唯一可預期的增益(可能非常大)將僅在輸入數據上通過一次。

我會做它像這樣(編程解決方案,但相當於SQL是可能的):

  1. 將您的過濾器,返回1或0
  2. 添加一列每個這些UDF的UDF的
  3. 分組通過/總結您的數據。

樣本火花會是這樣的:

scala> val data = spark.createDataFrame(Seq("A", "BB", "CCC").map(Tuple1.apply)).withColumnRenamed("_1", "input") 

data: org.apache.spark.sql.DataFrame = [input: string] 

scala> data.show 
+-----+ 
|input| 
+-----+ 
| A| 
| BB| 
| CCC| 
+-----+ 

scala> val containsBFilter = udf((input: String) => if(input.contains("B")) 1 else 0) 
containsBFilter: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(StringType))) 

scala> val lengthFilter = udf((input: String) => if (input.length < 3) 1 else 0) 
lengthFilter: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(StringType))) 

scala> data.withColumn("inputLength", lengthFilter($"input")).withColumn("containsB", containsBFilter($"input")).select(sum($"inputLength"), sum($"containsB")).show 

+----------------+--------------+ 
|sum(inputLength)|sum(containsB)| 
+----------------+--------------+ 
|    2|    1| 
+----------------+--------------+ 
+0

這非常聰明。我喜歡。你認爲使用Scala的平行集合類有助於並行化嗎?正如你所說的,用你的方法,這只是數據的一個傳遞,但是UDF將隨着過濾器數量的增加而線性增長。但是如果沒有性能問題,我想沒關係。 –

+0

I * guess * Spark足夠聰明,可以處理額外的列(例如通過僅傳遞一次數據來計算它們),因爲它們沒有依賴關係。它需要更多的空間,但我們正在討論多頭,這非常有效。使用並行集合不符合Spark的目的:Spark分區是並行性的單位(1執行器核心= 1分區),不要插入你自己的(你只需要爲Spark分區爭奪CPU),只需分割正確的方式。 – GPI

+0

我一直在修補這種方法幾天。我認爲這不適合我的情況。首先,關於並行集合,這種方法只能在我的驅動程序中並行化,而不是跨集羣,這不是我想要的。其次,我必須動態創建這些UDF(我問過另一個SO問題)。但即使我解決了動態創建UDF的問題,df = df.withColumn(「someFilter」,someUdf(...))操作的時間過長。我可以有任意數量的動態UDF(過濾器),這將添加許多列。 –

相關問題