2017-05-04 75 views
9

我在Apache flink中有一個小的用例,它是一個批處理系統。我需要處理文件的收集。每個文件的處理必須由一臺機器來處理。我有以下代碼。一直只有一個任務槽被佔用,並且文件被一個接一個地處理。我有6個節點(所以有6個任務管理器),並在每個節點中配置了4個任務槽。所以,我預計每次處理24個文件。flink作業沒有跨機器分佈

class MyMapPartitionFunction extends RichMapPartitionFunction[java.io.File, Int] { 
    override def mapPartition(
     myfiles: java.lang.Iterable[java.io.File], 
     out:org.apache.flink.util.Collector[Int]) 
    : Unit = { 
    var temp = myfiles.iterator() 
    while(temp.hasNext()){ 
     val fp1 = getRuntimeContext.getDistributedCache.getFile("hadoopRun.sh") 
     val file = new File(temp.next().toURI) 
     Process(
     "/bin/bash ./run.sh " + argumentsList(3)+ "/" + file.getName + " " + argumentsList(7) + "/" + file.getName + ".csv", 
     new File(fp1.getAbsoluteFile.getParent)) 
     .lines 
     .foreach{println} 
     out.collect(1) 
    } 
    } 
} 

我發起弗林克爲./bin/start-cluster.sh命令和Web用戶界面會顯示它有6個任務管理器,任務24個插槽。

該文件夾包含約49個文件。當我在這個集合上創建mapPartition時,我期望跨越49個並行進程。但是,在我的基礎設施中,它們都是一個接一個地處理的。這意味着只有一臺機器(一個任務管理器)處理所有49個文件名。我想要的是,如每個插槽配置2個任務,我期望同時處理24個文件。

任何指針肯定會在這裏幫助。我有這些參數在flink-conf.yaml文件中

jobmanager.heap.mb: 2048 
taskmanager.heap.mb: 1024 
taskmanager.numberOfTaskSlots: 4 
taskmanager.memory.preallocate: false 
parallelism.default: 24 

在此先感謝。有人能夠告訴我我要去哪裏嗎?

+1

嘗試添加** mapParallelism(49)** ** mapPartition(新的MyMapPartitionFunction())**之後**。 ** env.fromCollection()**將創建並行度爲1的操作符(即使您已將flink-conf.yaml中的作業並行度配置爲24,因爲它使用** NonParallelInput **輸入格式)。如果不設置並行性,*分區映射*操作符將從源代碼繼承其並行性。 – David

回答

2

正如David所述,問題是env.fromCollection(Iterable[T])創建DataSource與非並行InputFormat。因此,DataSource執行的並行性爲1。隨後的運營商(mapPartition)從源頭繼承了這種並行性,以便它們可以鏈接(這節省了我們的一個網絡洗牌)。

來解決這個問題的方法是明確地通過重新平衡源DataSet

env.fromCollection(folders).rebalance() 

,或者顯式地設置在隨後的操作者(mapPartition)所希望的平行度:

env.fromCollection(folders).mapPartition(...).setParallelism(49) 
+0

非常感謝Rohrmann和David。重新平衡()看起來更乾淨,它也工作了! – Bala