2016-10-10 137 views
0

我是Apache Flink的新手,並且試圖瞭解有關使用Kafka將Flink流式處理作業進行縮放的一些最佳實踐。有些問題我無法找到合適的答案,包括:Apache Flink流縮放

  1. 您需要運行多少個流式作業?運行過多的流是否存在可伸縮性問題?多少太多了?
  2. 如果我們確實運行了2000個流來滿足業務需求,那麼管理這些流的最佳方法是什麼?
  3. 從一個流向另一個流讀取流數據的首選方法是什麼?我們可以加入流,執行連續查詢等嗎?

如果這些問題看起來有點基本,請提前致謝,並表示歉意,但我試圖更好地處理這項技術。我已經閱讀了大部分文檔,但是由於缺乏這方面的經驗,可以肯定的是可能不會把一些概念放在一起。謝謝你的幫助!

回答

1

- >上有流的數目沒有限制,弗林克將根據作業管理器/任務管理器,所使用的並行化的存儲器/ CPU和時隙數的規模。我使用YARN來管理資源。如果連接的數據流量很高,那麼我們需要謹慎一點,因爲某些任務管理器並不是全部/大部分處理都會發生,因爲這會降低處理速度。由於某些任務管理器負荷過重,肯定會出現這種情況,並且需要對此進行預防性檢查,因此可能會出現卡夫卡流本身滯後或內部滯後現象。

- >連續查詢的支持已建成的最新版本弗林克部分,你可以檢查弗林克文檔了。

- >如果通過讀取一個數據流到另一個你的意思是連接在弗林克術語兩個流那麼我們就可以將它們連接起來的一個公共密鑰和保持價值狀態。請注意,值狀態在任務管理器中維護,不會在任務管理器之間共享。否則,如果您隱含兩個或更多流的聯合,那麼我們就可以構建flatmap函數,以使來自這些流的數據具有標準格式。工會

實施例: VAL流1:的數據流中[UserBookingEvent] = BookingClosure.getSource(runmode).getSource(ENV) .MAP(新ClosureMapFunction)

VAL流2:的數據流中[UserBookingEvent] = BookingCancel.getSource (runmode).getSource(ENV) .MAP(新CancelMapFunction)

VAL unionStream:的數據流中[UserBookingEvent] = stream1.union(流2)

import org.apache.flink.api.common.functions.MapFunction 
import org.json4s.jackson.JsonMethods.{parse => _, parseOpt => _} 
import org.json4s.native.JsonMethods._ 
import org.slf4j.{Logger, LoggerFactory} 

class CancelMapFunction extends MapFunction[String, Option[UserBookingEvent]] { 
    override def map(in: String): Option[UserBookingEvent] = { 
    val LOG: Logger = LoggerFactory.getLogger(classOf[CancelMapFunction]) 
    try { 
     implicit lazy val formats = org.json4s.DefaultFormats 

     val json = parse(in) 
     .............. 
    } catch { 
     case e: Exception => { 
     LOG.error("Could not parse Cancel Event= " + in + e.getMessage) 
     None 
     } 
    }