2016-10-27 95 views
3

的範圍內有2個非常大的RDD(每個都有超過暢想記錄),第一個是:比較2火花RDD確保值從第一個是在第二RDD

rdd1.txt(name,value):  
chr1 10016 
chr1 10017 
chr1 10018 
chr1 20026 
chr1 20036 
chr1 25016 
chr1 26026 
chr2 40016 
chr2 40116 
chr2 50016 
chr3 70016 

rdd2.txt(name,min,max): 
chr1  10000 20000 
chr1  20000 30000 
chr2  40000 50000 
chr2  50000 60000 
chr3  70000 80000 
chr3 810001 910000 
chr3 860001 960000 
chr3 910001 1010000 

值是有效的,只有當它在最小值和第二RDD,名稱的的計數的最大值之間的範圍內發生的意志加1,如果其有效

取上述作爲一個例子,CHR1的發生7.

我怎麼能得到火花斯卡拉的結果?

千恩萬謝

+0

爲什麼第二個rdd中的列不是唯一的?這是否意味着我們可以在第一個rdd的價值適合第二個? – jtitusj

+0

第二個RDD定義了RDD1中值的範圍 –

回答

2

嘗試:

val rdd1 = sc.parallelize(Seq(
    ("chr1", 10016), ("chr1", 10017), ("chr1", 10018))) 
val rdd2 = sc.parallelize(Seq(
    ("chr1", 10000, 20000), ("chr1",20000, 30000))) 

rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")) 
.where($"value".between($"min", $"max")) 
+0

scala> r1.toDF(「name」,「value」)。join(r2.toDF(「name」,「min」,「max」),Seq(「 ($「min」,$「max」)) java.lang.IllegalArgumentException:需求失敗:列數不匹配。 舊列名稱(1):_1 新列名稱(2):名稱,值 –

+0

如果我理解正確,您還可以'rdd1.toDF(「name」,「min」,「max」)。groupBy('名稱).agg(min('min).as(「min」),max('max).as(「max」))''在加入之前讓它更有效率(我敢肯定這是一個優化.. ..) – Wilmerton

+0

我會嘗試它,並檢查它是否更有效後,我有一個完整的答案我的問題 –

0

據我瞭解,你想從RDD1集在RDD2 MIN和MAX之間下降值。請看看下面的工作

val rdd1 = sc.parallelize(Seq(("chr1", 10016), ("chr1", 10017), ("chr1", 10018))) 
val rdd2 = sc.parallelize(Seq(("chr1", 10000, 20000), ("chr1",20000, 30000))) 
rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show() 


scala> val rdd1=sc.parallelize(Seq(("chr1", 10016),("chr1", 10017),("chr1", 10018),("chr1", 20026),("chr1", 20036),("chr1", 25016),("chr1", 26026),("chr2", 40016),("chr2", 40116),("chr2", 50016),("chr3", 70016))) 
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24 

scala> val rdd2=sc.parallelize(Seq(("chr1",  10000, 20000),("chr1",  20000 , 30000),("chr2",  40000 ,50000),("chr2",  50000 ,60000),("chr3",  70000 ,80000),("chr3", 810001 ,910000),("chr3", 860001 ,960000),("chr3", 910001 ,1010000))) 
rdd2: org.apache.spark.rdd.RDD[(String, Int, Int)] = ParallelCollectionRDD[34] at parallelize at <console>:24 


scala> rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show() 
+----+-----+ 
|name|count| 
+----+-----+ 
|chr3| 1| 
|chr1| 7| 
|chr2| 3| 
+----+-----+ 

編輯 如果從文件中讀取,我會用以下

import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}; 

val sqlContext = new SQLContext(sc) 
val nameValueSchema = StructType(Array(StructField("name", StringType, true),StructField("value", IntegerType, true))) 
val nameMinMaxSchema = StructType(Array(StructField("name", StringType, true),StructField("min", IntegerType, true),StructField("max", IntegerType, true))) 
val rdd1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").schema(nameValueSchema).load("rdd1.csv") 
val rdd2 = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").schema(nameMinMaxSchema).load("rdd2.csv") 
rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show() 

這將在所有節點上運行,也沒有必要並行化呼叫。引用在這裏documentation

DEF並行[T](SEQ ID NO:SEQ [T],numSlices:INT = defaultParallelism)(隱式爲arg0:ClassTag [T]):RDD [T]永久 分發的本地的Scala收集以形成RDD。

+0

非常感謝完整的指令和結果是預期的,但rdd1.txt和rdd2.txt是非常非常大,如何在沒有硬編碼的情況下實現並行化? –

+0

不知道我是否理解沒有硬編碼的並行化,可以詳細說明一下嗎? –

+0

硬編碼@WuFei是什麼意思? – eliasah