0
我目前正在使用Spark和Scalacheck,並試圖過濾RDD [(A,Long)] (其中A是從Avro文件讀取的寄存器,Long是從zipWithUniqueId()函數獲得)從存儲在緩衝區中的相同RDD中取出。過濾來自另一個元組列表的元組的RDD
我的意圖是測試該樣本的一些屬性,一旦失敗,請在該RDD的樣本中再次測試該屬性,該樣本不包含之前取樣的任何值。 我將rdd存儲在一個var中,所以我可以重新分配一次我過濾它。 我的代碼是這樣的:
val samplingSeed = new Random(System.currentTimeMillis()).nextLong()
val sampled = rdd.takeSample(withReplacement = false, bufferSize, samplingSeed)
val buffer: JQueue[(A, Long)] = new JConcurrentLinkedQueue[(A, Long)]
//Sampled as Array converts to queue
for (i <- 0 to sampled.length - 1)
buffer.add(sampled(i).asInstanceOf[(A, Long)])
//rdd is assigned to a var for persistence
//filter here and leave out all the tuples in buffer based in the
//Long value in each tuple
rdd= rdd.filter{foo}
我怎麼能做到這一點?
您可以播放設定採樣的ID,並在過濾器檢查的ID是集:'VAL IDS = sc.broadcast( buffer.toSet.map(_._ 2)); rdd.filter(v =>!ids.value.contains(v._2))' –
像魅力一樣工作謝謝 – mtelloz
不客氣,我已經創建了答案。請接受它 –