我有一個DataFrame
我必須應用一系列的過濾器查詢。例如,我按如下方式加載我的DataFrame
。如何根據Spark DataFrame對查詢/計數進行並行化/分配?
val df = spark.read.parquet("hdfs://box/some-parquet")
然後我有一堆「隨意」過濾器如下。
- C0 = '真' 和C1 = '假'
- C0 = '假' 和C3 = '真'
- 等等...
我通常把這些使用util方法動態過濾。
val filters: List[String] = getFilters()
我要做的是將這些過濾器的DataFrame
得到計數。例如。
val counts = filters.map(filter => {
df.where(filter).count
})
我注意到,在映射到過濾器時不是並行/分佈式操作。如果我將過濾器粘貼到RDD/DataFrame中,則此方法也不起作用,因爲那時我將執行嵌套數據框操作(正如我在SO上讀過的,它不允許在Spark中)。類似下面的提供了一個NullPointerException(NPE)。
val df = spark.read.parquet("hdfs://box/some-parquet")
val filterRDD = spark.sparkContext.parallelize(List("C0='false'", "C1='true'"))
val counts = filterRDD.map(df.filter(_).count).collect
Caused by: java.lang.NullPointerException at org.apache.spark.sql.Dataset.filter(Dataset.scala:1127) at $anonfun$1.apply(:27) at $anonfun$1.apply(:27) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at scala.collection.AbstractIterator.to(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:912) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:912) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
有什麼辦法並行/在火花DataFrame
分配計數過濾器?順便說一下,我在Spark v2.0.2上。
假設你想要達到的目標是在輸入DATAS單程(否則,有可能是沒有收穫期待了這個問題),我d將過濾器函數返回給返回1(過濾器匹配)或0(無過濾器匹配)的UDF,通過UDF向數據框添加1列,並對添加的列執行groupBy/count,這將導致1行數據幀,掌握所有的計數。 – GPI
你能舉個例子嗎? –