2016-11-17 56 views
1

WindowAssigner中,元素被分配給一個或多個TimeWindow實例。在滑動事件時間窗口的情況下,這發生在SlidingEventTimeWindows#assignWindowsApache Flink:窗口函數和時間開始

在具有size=5slide=1,與timestamp=0的元素被分配到下面的窗口的窗口的情況下:

  1. 窗口(開始= 0,結束= 5)
  2. 窗口(開始= -1 ,端= 4)
  3. 窗口(開始= -2,端= 3)
  4. 窗口(開始= -3,端= 2)
  5. 窗口(開始= -4,端= 1)

在一個畫面:

      +-> Beginning of time 
          | 
          | 
+----------------------------------------------+ 
|  size = 5    +--+ element  | 
| slide = 1    |     | 
|       v     | 
| t=[ 0,5[ Window 1   XXXXX    | 
| t=[-1,4[ Window 2  XXXXX    | 
| t=[-2,3[ Window 3  XXXXX    | 
| t=[-3,2[ Window 4  XXXXX     | 
| t=[-4,1[ Window 5  XXXXX     | 
|            | 
| time(-4 to +4)  ----     | 
|      4321| 
+---------------------------+------------------+ 
          | 
          | 
          | 
          + 

有沒有辦法告訴弗林克,有時間的開始和以前一樣,有沒有窗戶?如果沒有,從哪裏開始尋找改變?在上述情況下,Flink應該只有一個窗口(t=[4,8[ Window 1)作爲第一個元素。像這樣:

      +-> Beginning of time 
          | 
          | 
+-----------------------------------------------+ 
|  size = 5    +--+ element  | 
| slide = 1    |     | 
|       v     | 
| t=[ 0,5[ Window 1   XXXXX    | 
| t=[ 1,6[ Window 2   XXXXX    | 
| t=[ 2,7[ Window 3   XXXXX    | 
| t=[ 3,8[ Window 4   XXXXX   | 
| t=[ 4,9[ Window 5    XXXXX   | 
|            | 
| time(-4 to +8)  ----     | 
|      4321| 
+---------------------------+-------------------+ 
          | 
          | 
          | 
          + 

一旦窗口數量達到並超過窗口大小,這將不再有效果。然後,在上述情況下,所有元素都在5個窗口內。


腳註:

  1. org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows#assignWindows

回答

1

目前沒有辦法指定弗林克作業的有效時間間隔。考慮到您也可能希望將您的工作應用於歷史數據,這也可能有點問題。

你能做什麼,不過,是濾除剛開始的時候了手動啓動前窗:

val env = StreamExecutionEnvironment.getExecutionEnvironment 

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

val startTime = 1 
val windowLength = 2 
val slide = 1 

val input = env.fromElements((1,1), (2,2), (3,3)).assignAscendingTimestamps(x => x._2) 

val windowed = input.timeWindowAll(Time.milliseconds(windowLength), Time.milliseconds(slide)).apply{ 
    (window, iterable, collector: Collector[Int]) => 
    if (window.getStart >= startTime) { 
     collector.collect(iterable.map(_._1).reduce(_ + _)) 
    } else { 
     // discard early windows 
    } 
} 

windowed.print() 

env.execute() 
+0

這適用於現在作爲「解決方法」。我猜它不是那麼受歡迎的請求:) –

+0

即使作爲解決方法,此解決方案也可能不起作用,因爲您可能需要從流中讀取「startTime」(如果是事件時間)。而且似乎沒有簡單的方法可以在你自己的'WindowAssigner'中實現這一點,因爲沒有簡單的方法(或者我沒有找到任何方法)來存儲讀取的第一個元素的時間,這對分配器來說是可訪問的。 – cvb

0

我可能會發現這個問題更好的解決方法。 這個想法是將水印設置到將來足夠遠的位置,以便您的窗口將有足夠的數據。早期的窗戶仍然會在那裏,但它們將被丟棄。

這裏是概念證明爲AssignerWithPeriodicWatermarks[T]

class WMG[T](wait: Long) extends AssignerWithPeriodicWatermarks[T] { 
    var t: Option[Long] = None 
    var firstTime = true 

    override def extractTimestamp(el: T, prevTs: Long): Long = { 
     t = Some(prevTs) 
     prevTs 
    } 

    override def getCurrentWatermark(): Watermark = (t, firstTime) match { 
     case (None, _) => return null 
     case (Some(v), false) => new Watermark(v) 
     case (Some(v), true) => { 
     firstTime = false 
     new Watermark(v + wait) 
     } 
    } 
    } 

wait是你的第一個窗口的大小。

似乎工作正常,但我不明白flink足以肯定。