2016-09-23 66 views
0

我是apache spark和scala的新手,並嘗試通過示例學習它。我有簡單的設置城市公交車位置(行號時間經度latitute)的:如何在RDD中找到最近的記錄

9, 23/09/16 10:20, 123.3, 123.3 
9, 23/09/16 10:21, 125.3, 125.3 

數轉換後,我獲得對象的RDD:

class BusPosition(val line: String, val time: DateTime, val position: Point) 

接下來,我想有幀的RDD,如:

class BusFrame(
    val line: String, val time1: DateTime, val time2: DateTime, 
    val position1: Point, val position2: Point) 

每幀將加入兩個最接近的時間記錄。有誰知道如何創建這樣的設置並找到最近的鄰居?我搜索了但找不到合適的答案。

+0

一些選項:a)重新分區,對分區進行排序並執行線性掃描,b)使用帶有滯後/超前的窗函數。 – zero323

+0

謝謝,但沒有完全弄明白。你能給我一些簡單的代碼示例嗎? – Hejwo

回答

1

有很多方法可以解決這個問題。首先讓調整類星火SQL更好的互操作性:窗口

import org.apache.spark.sql.expressions.Window 

val w = Window.partitionBy("line").orderBy("time1") 

val time2 = lag($"time1", 1).over(w).alias("time2") 
val position2 = lag($"position1", 1).over(w).alias("position2") 

data.toDF("line", "time1", "position1") 
    .select($"*", time2, position2) 
    .na.drop(Array("time2", "position2")) 
    .as[BusFrame] 
  • 滑動:

    import org.apache.spark.mllib.rdd.RDDFunctions._ 
    
    data.orderBy("line", "time").rdd.sliding(2).collect { 
        case Array(BusPosition(l1, t1, p1), BusPosition(l2, t2, p2)) if l1 == l2 => 
        BusFrame(...) 
    } 
    
  • 自定義分區和訂購

    import java.sql.Timestamp 
    
    case class Point(longitude: Double, latitute: Double) 
    case class BusPosition(line: String, time: Timestamp, position: Point) 
    
    case class BusFrame(
        line: String, time1: Timestamp, time2: Timestamp, 
        position1: Point, position2: Point) 
    
    val data = Seq(
        BusPosition(
        "9", Timestamp.valueOf("2016-09-23 10:20:00"), Point(123.3, 123.3)), 
        BusPosition(
        "9", Timestamp.valueOf("2016-09-23 10:21:00"), Point(125.3, 125.3)), 
        BusPosition(
        "7", Timestamp.valueOf("2015-08-01 00:20:12"), Point(123.9, 122.9)), 
        BusPosition(
        "7", Timestamp.valueOf("2015-08-01 00:00:22"), Point(124.0, 122.6)) 
    ).toDS() 
    
    1. 窗口功能

      import org.apache.spark.Partitioner 
      import scala.math.Ordering 
      
      class LineTimestampPartitioner(n: Int) extends Partitioner { 
          def numPartitions: Int = n 
          def getPartition(key: Any): Int = ??? // Partition based on line 
      } 
      
      // Order by line first, timestamp second 
      implicit val lineTimestampOrd: Ordering[(String, java.sql.Timestamp)] = ??? 
      
      data.rdd 
          .keyBy(bp => (bp.line, bp.time)) 
          .repartitionAndSortWithinPartitions(new LineTimestampPartitioner(n)) 
          .values 
          .mapPartitions(_.sliding(2).collect { 
          ??? // Like for mllib sliding 
          }) 
      
  • +0

    感謝零323!可怕的答案 – Hejwo