我在Apache flink中有一個小的用例,它是一個批處理系統。我需要處理文件的收集。每個文件的處理必須由一臺機器來處理。我有以下代碼。一直只有一個任務槽被佔用,並且文件被一個接一個地處理。我有6個節點(所以有6個任務管理器),並在每個節點中配置了4個任務槽。所以,我預計每次處理24個文件。flink作業沒有跨機器分佈
class MyMapPartitionFunction extends RichMapPartitionFunction[java.io.File, Int] {
override def mapPartition(
myfiles: java.lang.Iterable[java.io.File],
out:org.apache.flink.util.Collector[Int])
: Unit = {
var temp = myfiles.iterator()
while(temp.hasNext()){
val fp1 = getRuntimeContext.getDistributedCache.getFile("hadoopRun.sh")
val file = new File(temp.next().toURI)
Process(
"/bin/bash ./run.sh " + argumentsList(3)+ "/" + file.getName + " " + argumentsList(7) + "/" + file.getName + ".csv",
new File(fp1.getAbsoluteFile.getParent))
.lines
.foreach{println}
out.collect(1)
}
}
}
我發起弗林克爲./bin/start-cluster.sh命令和Web用戶界面會顯示它有6個任務管理器,任務24個插槽。
該文件夾包含約49個文件。當我在這個集合上創建mapPartition時,我期望跨越49個並行進程。但是,在我的基礎設施中,它們都是一個接一個地處理的。這意味着只有一臺機器(一個任務管理器)處理所有49個文件名。我想要的是,如每個插槽配置2個任務,我期望同時處理24個文件。
任何指針肯定會在這裏幫助。我有這些參數在flink-conf.yaml文件中
jobmanager.heap.mb: 2048
taskmanager.heap.mb: 1024
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.preallocate: false
parallelism.default: 24
在此先感謝。有人能夠告訴我我要去哪裏嗎?
嘗試添加** mapParallelism(49)** ** mapPartition(新的MyMapPartitionFunction())**之後**。 ** env.fromCollection()**將創建並行度爲1的操作符(即使您已將flink-conf.yaml中的作業並行度配置爲24,因爲它使用** NonParallelInput **輸入格式)。如果不設置並行性,*分區映射*操作符將從源代碼繼承其並行性。 – David