2017-03-24 28 views
0

與Apache Flink一起新增,並且採用流處理框架的一般方式,我有幾個關於它的問題,特別是與並行性有關的問題。關於在YARN集羣上使用Flink進行並行性的問題

首先這是我的代碼:

object KafkaConsuming { 

    def main(args: Array[String]) { 

    // **** CONFIGURATION & PARAMETERS **** 
    val params: ParameterTool = ParameterTool.fromArgs(args) 

    val env = StreamExecutionEnvironment.getExecutionEnvironment 
     env.setParallelism(8) 
     env.getConfig.setGlobalJobParameters(params) 

    // **** Kafka CONNECTION **** 
    val properties = new Properties(); 
    properties.setProperty("bootstrap.servers", params.get("server")); 

    // **** Get KAFKA source **** 
    val stream: DataStream[String] = env.addSource(new FlinkKafkaConsumer010[String](params.get("topic"), new SimpleStringSchema(), properties)) 

    // **** PROCESSING **** 
    val logs: DataStream[MinifiedLog] = stream.map(x => LogParser2.parse(x)) 

    val sessions = logs.map { x => (x.timestamp, x.bytesSent, 1l, 1)} 

    val sessionCnt: DataStream[(Long, Long, Long, Int)] = sessions 
     .keyBy(3).window(TumblingProcessingTimeWindows.of(Time.seconds(10))) 
     .reduce((x: (Long, Long, Long, Int), y: (Long, Long, Long, Int)) => (x._1, x._2 + y._2, x._3 + y._3, x._4)) 
     .map { z => (z._1, z._2/10, z._3/10, z._4)} 


    // **** OUTPUT **** 
    val output: DataStream[String] = sessionCnt.map(x => (x.toString() + "\n")) 
    output.writeToSocket("X.X.X.X", 3333, new SimpleStringSchema) 

    env.execute("Kafka consuming") 

    } 
} 

當我想要我的集羣上運行它,我運行此命令:

./bin/flink run -m yarn-cluster -yn 8 /directories/myjar.jar --server X.X.X.X --topic mytopic 

這是工作的罰款。現在,這裏是我的問題:

我在弗林克的Web UI得到這個:

Flink web UI 1

1.爲什麼收到的記錄永遠的發送,而數據量的一半記錄是一樣的嗎?

然後,如果我去到了窗口的細節:

Flink web UI 2 顯然,所有的過程是在我的奴隸和4只在一個線程中完成的!來源也一樣。只有一個線程用於接收數據。

2.爲什麼Flink沒有使用該步驟可能的所有線程?

我注意到源,窗口和接收器是由不同的從屬進程處理的,但我仍然希望該進程在羣集上並行完成。

我在這篇文章上閱讀:https://stackoverflow.com/a/32329010/5035392,如果卡夫卡源只有一個分區(這是我的情況),Flink不能共享不同節點上的任務。但是,我的窗口處理應該可以做到嗎?

如果這些都是微不足道的問題,我很抱歉。我不確定我做錯了什麼是Flink或我的集羣配置。謝謝。

回答

1

Ad。 2同一個鍵的所有值都在一個TaskManager上處理。 在你的案例中,sessions.keyBy(3)流的每個元素具有相同的密鑰 - >1,因此所有計算都在單個任務槽中執行。

+0

謝謝你的回答。 – ImbaBalboa

+0

你有第一個問題的想法嗎? – ImbaBalboa