2017-06-22 167 views
0

我想在Spark2中運行一個批處理作業,它需要一個巨大的列表作爲輸入並迭代列表來執行處理。該程序執行罰款約8000名記錄的名單,然後打破例外:spark(java) - 太多的打開的文件

WARN Lost task 0.0 in stage 421079.0 (TID 996338, acusnldlenhww4.cloudapp.net, executor 1): java.io.FileNotFoundException: /data/1/hadoop/yarn/local/usercache/A2159537-MSP01/appcache/application_1497532405817_0072/blockmgr-73dc563c-8ea5-4f2d-adfe-6c60cf3e3968/0d/shuffle_145960_0_0.index.cfb6d5ea-8c7b-41a1-acc3-2c840e7f8998 (Too many open files) 
     at java.io.FileOutputStream.open0(Native Method) 
     at java.io.FileOutputStream.open(FileOutputStream.java:270) 
     at java.io.FileOutputStream.<init>(FileOutputStream.java:213) 
     at java.io.FileOutputStream.<init>(FileOutputStream.java:162) 
     at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:144) 
     at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:128) 
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) 
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) 
     at org.apache.spark.scheduler.Task.run(Task.scala:99) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 
    (org.apache.spark.scheduler.TaskSetManager) 

neo4j數據庫用作輸入。我從neo4j讀取300k節點作爲輸入,並在輸入rdd上執行for循環。

在SparkConf中嘗試設置spark.shuffle.consolidateFilestrue。但那並不奏效。

回答

1

如果可能,增加ulimit - 以克服這一點。

要麼減少每個節點使用的reducer或core的數量。但它對你的工作有一定的性能影響。

在一般情況下,如果你的集羣有:

assigned cores = `n`; 

,並在運行工作與:

reducers = `k` 

然後星火將並行打開n * k文件,並開始寫作。

默認ulimit爲:1024這對於大規模應用來說太低。

使用ulimit -a查看打開文件的當前最大數量。

我們可以暫時更改打開文件的數量;通過更新系統配置文件。

看到這些文件是相同的:

/etc/sysctl.conf 
/etc/security/limits.conf