據我瞭解,你想從RDD1集在RDD2 MIN和MAX之間下降值。請看看下面的工作
val rdd1 = sc.parallelize(Seq(("chr1", 10016), ("chr1", 10017), ("chr1", 10018)))
val rdd2 = sc.parallelize(Seq(("chr1", 10000, 20000), ("chr1",20000, 30000)))
rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show()
scala> val rdd1=sc.parallelize(Seq(("chr1", 10016),("chr1", 10017),("chr1", 10018),("chr1", 20026),("chr1", 20036),("chr1", 25016),("chr1", 26026),("chr2", 40016),("chr2", 40116),("chr2", 50016),("chr3", 70016)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24
scala> val rdd2=sc.parallelize(Seq(("chr1", 10000, 20000),("chr1", 20000 , 30000),("chr2", 40000 ,50000),("chr2", 50000 ,60000),("chr3", 70000 ,80000),("chr3", 810001 ,910000),("chr3", 860001 ,960000),("chr3", 910001 ,1010000)))
rdd2: org.apache.spark.rdd.RDD[(String, Int, Int)] = ParallelCollectionRDD[34] at parallelize at <console>:24
scala> rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show()
+----+-----+
|name|count|
+----+-----+
|chr3| 1|
|chr1| 7|
|chr2| 3|
+----+-----+
編輯 如果從文件中讀取,我會用以下
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
val sqlContext = new SQLContext(sc)
val nameValueSchema = StructType(Array(StructField("name", StringType, true),StructField("value", IntegerType, true)))
val nameMinMaxSchema = StructType(Array(StructField("name", StringType, true),StructField("min", IntegerType, true),StructField("max", IntegerType, true)))
val rdd1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").schema(nameValueSchema).load("rdd1.csv")
val rdd2 = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").schema(nameMinMaxSchema).load("rdd2.csv")
rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show()
這將在所有節點上運行,也沒有必要並行化呼叫。引用在這裏documentation
DEF並行[T](SEQ ID NO:SEQ [T],numSlices:INT = defaultParallelism)(隱式爲arg0:ClassTag [T]):RDD [T]永久 分發的本地的Scala收集以形成RDD。
爲什麼第二個rdd中的列不是唯一的?這是否意味着我們可以在第一個rdd的價值適合第二個? – jtitusj
第二個RDD定義了RDD1中值的範圍 –