- >上有流的數目沒有限制,弗林克將根據作業管理器/任務管理器,所使用的並行化的存儲器/ CPU和時隙數的規模。我使用YARN來管理資源。如果連接的數據流量很高,那麼我們需要謹慎一點,因爲某些任務管理器並不是全部/大部分處理都會發生,因爲這會降低處理速度。由於某些任務管理器負荷過重,肯定會出現這種情況,並且需要對此進行預防性檢查,因此可能會出現卡夫卡流本身滯後或內部滯後現象。
- >連續查詢的支持已建成的最新版本弗林克部分,你可以檢查弗林克文檔了。
- >如果通過讀取一個數據流到另一個你的意思是連接在弗林克術語兩個流那麼我們就可以將它們連接起來的一個公共密鑰和保持價值狀態。請注意,值狀態在任務管理器中維護,不會在任務管理器之間共享。否則,如果您隱含兩個或更多流的聯合,那麼我們就可以構建flatmap函數,以使來自這些流的數據具有標準格式。工會
實施例: VAL流1:的數據流中[UserBookingEvent] = BookingClosure.getSource(runmode).getSource(ENV) .MAP(新ClosureMapFunction)
VAL流2:的數據流中[UserBookingEvent] = BookingCancel.getSource (runmode).getSource(ENV) .MAP(新CancelMapFunction)
VAL unionStream:的數據流中[UserBookingEvent] = stream1.union(流2)
import org.apache.flink.api.common.functions.MapFunction
import org.json4s.jackson.JsonMethods.{parse => _, parseOpt => _}
import org.json4s.native.JsonMethods._
import org.slf4j.{Logger, LoggerFactory}
class CancelMapFunction extends MapFunction[String, Option[UserBookingEvent]] {
override def map(in: String): Option[UserBookingEvent] = {
val LOG: Logger = LoggerFactory.getLogger(classOf[CancelMapFunction])
try {
implicit lazy val formats = org.json4s.DefaultFormats
val json = parse(in)
..............
} catch {
case e: Exception => {
LOG.error("Could not parse Cancel Event= " + in + e.getMessage)
None
}
}