與Apache Flink一起新增,並且採用流處理框架的一般方式,我有幾個關於它的問題,特別是與並行性有關的問題。關於在YARN集羣上使用Flink進行並行性的問題
首先這是我的代碼:
object KafkaConsuming {
def main(args: Array[String]) {
// **** CONFIGURATION & PARAMETERS ****
val params: ParameterTool = ParameterTool.fromArgs(args)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(8)
env.getConfig.setGlobalJobParameters(params)
// **** Kafka CONNECTION ****
val properties = new Properties();
properties.setProperty("bootstrap.servers", params.get("server"));
// **** Get KAFKA source ****
val stream: DataStream[String] = env.addSource(new FlinkKafkaConsumer010[String](params.get("topic"), new SimpleStringSchema(), properties))
// **** PROCESSING ****
val logs: DataStream[MinifiedLog] = stream.map(x => LogParser2.parse(x))
val sessions = logs.map { x => (x.timestamp, x.bytesSent, 1l, 1)}
val sessionCnt: DataStream[(Long, Long, Long, Int)] = sessions
.keyBy(3).window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce((x: (Long, Long, Long, Int), y: (Long, Long, Long, Int)) => (x._1, x._2 + y._2, x._3 + y._3, x._4))
.map { z => (z._1, z._2/10, z._3/10, z._4)}
// **** OUTPUT ****
val output: DataStream[String] = sessionCnt.map(x => (x.toString() + "\n"))
output.writeToSocket("X.X.X.X", 3333, new SimpleStringSchema)
env.execute("Kafka consuming")
}
}
當我想要我的集羣上運行它,我運行此命令:
./bin/flink run -m yarn-cluster -yn 8 /directories/myjar.jar --server X.X.X.X --topic mytopic
這是工作的罰款。現在,這裏是我的問題:
我在弗林克的Web UI得到這個:
1.爲什麼收到的記錄永遠的發送,而數據量的一半記錄是一樣的嗎?
然後,如果我去到了窗口的細節:
顯然,所有的過程是在我的奴隸和4只在一個線程中完成的!來源也一樣。只有一個線程用於接收數據。
2.爲什麼Flink沒有使用該步驟可能的所有線程?
我注意到源,窗口和接收器是由不同的從屬進程處理的,但我仍然希望該進程在羣集上並行完成。
我在這篇文章上閱讀:https://stackoverflow.com/a/32329010/5035392,如果卡夫卡源只有一個分區(這是我的情況),Flink不能共享不同節點上的任務。但是,我的窗口處理應該可以做到嗎?
如果這些都是微不足道的問題,我很抱歉。我不確定我做錯了什麼是Flink或我的集羣配置。謝謝。
謝謝你的回答。 – ImbaBalboa
你有第一個問題的想法嗎? – ImbaBalboa