2016-12-09 69 views
1

在通過Marathon的每個從節點上運行Mesos External Shuffle Service。當我們通過dcos CLI在粗粒度模式下提交spark工作而沒有動態分配時,所有工作都按預期工作。但是當我們用動態分配提交相同的工作時,它就失敗了。如何運行帶動態資源分配的spark + cassandra + mesos(dcos)?

16/12/08 19:20:42 ERROR OneForOneBlockFetcher: Failed while starting block fetches 
java.lang.RuntimeException: java.lang.RuntimeException: Failed to open file:/tmp/blockmgr-d4df5df4-24c9-41a3-9f26-4c1aba096814/30/shuffle_0_0_0.index 
at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:234) 
... 
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) 
... 
Caused by: java.io.FileNotFoundException: /tmp/blockmgr-d4df5df4-24c9-41a3-9f26-4c1aba096814/30/shuffle_0_0_0.index (No such file or directory) 

全面介紹:

  • 我們使用Azure的門戶網站安裝Mesos(DCOS)馬拉松。
  • 通過宇宙的軟件包我們安裝:卡桑德拉,Spark和馬拉松磅
  • 我們卡桑德拉生成的測試數據。
  • 在我的筆記本電腦安裝DCOS CLI

當我提交的工作如下一切按預期工作:

./dcos spark run --submit-args="--properties-file coarse-grained.conf --class portal.spark.cassandra.app.ProductModelPerNrOfAlerts http://marathon-lb-default.marathon.mesos:10018/jars/spark-cassandra-assembly-1.0.jar" 
Run job succeeded. Submission id: driver-20161208185927-0043 

success

cqlsh:sp> select count(*) from product_model_per_alerts_by_date ; 

count 
------- 
476 

粗grained.conf:

spark.cassandra.connection.host 10.32.0.17 
spark.serializer org.apache.spark.serializer.KryoSerializer 
spark.executor.cores 1 
spark.executor.memory 1g 
spark.executor.instances 2 
spark.submit.deployMode cluster 
spark.cores.max 4 

portal.spark.cassandra.app.ProductModelPerNrOfAlerts:

package portal.spark.cassandra.app 

import org.apache.spark.sql.{SQLContext, SaveMode} 
import org.apache.spark.{SparkConf, SparkContext} 

object ProductModelPerNrOfAlerts { 
    def main(args: Array[String]): Unit = { 

    val conf = new SparkConf(true) 
        .setAppName("cassandraSpark-ProductModelPerNrOfAlerts") 

    val sc = new SparkContext(conf) 

    val sqlContext = new SQLContext(sc) 

    import sqlContext.implicits._ 

    val df = sqlContext 
      .read 
      .format("org.apache.spark.sql.cassandra") 
      .options(Map("table" -> "asset_history", "keyspace" -> "sp")) 
      .load() 
      .select("datestamp","product_model","nr_of_alerts") 

    val dr = df 
      .groupBy("datestamp","product_model") 
      .avg("nr_of_alerts") 
      .toDF("datestamp","product_model","nr_of_alerts") 

    dr.write 
      .mode(SaveMode.Overwrite) 
      .format("org.apache.spark.sql.cassandra") 
      .options(Map("table" -> "product_model_per_alerts_by_date", "keyspace" -> "sp")) 
      .save() 


    sc.stop() 
} 
} 

動態分配

通過馬拉松,我們運行Mesos外部隨機服務:

{ 
    "id": "spark-mesos-external-shuffle-service-tt", 
    "container": { 
    "type": "DOCKER", 
    "docker": { 
     "image": "jpavt/mesos-spark-hadoop:mesos-external-shuffle-service-1.0.4-2.0.1", 
     "network": "BRIDGE", 
     "portMappings": [ 
      { "hostPort": 7337, "containerPort": 7337, "servicePort": 7337 } 
     ], 
     "forcePullImage":true, 
     "volumes": [ 
     { 
      "containerPath": "/tmp", 
      "hostPath": "/tmp", 
      "mode": "RW" 
     } 
     ] 
    } 
    }, 
    "instances": 9, 
    "cpus": 0.2, 
    "mem": 512, 
    "constraints": [["hostname", "UNIQUE"]] 
} 

Dockerfile爲jpavt/mesos-火花的Hadoop:mesos-外部洗牌服務-1.0.4-2.0.1:

FROM mesosphere/spark:1.0.4-2.0.1 
WORKDIR /opt/spark/dist 
ENTRYPOINT ["./bin/spark-class", "org.apache.spark.deploy.mesos.MesosExternalShuffleService"] 

現在,當我提交的工作與動態分配失敗:

./dcos spark run --submit-args="--properties-file dynamic-allocation.conf --class portal.spark.cassandra.app.ProductModelPerNrOfAlerts http://marathon-lb-default.marathon.mesos:10018/jars/spark-cassandra-assembly-1.0.jar" 
Run job succeeded. Submission id: driver-20161208191958-0047 

failure

select count(*) from product_model_per_alerts_by_date ; 

count 
------- 
5 

動態分配。CONF:從mesos

spark.cassandra.connection.host 10.32.0.17 
spark.serializer org.apache.spark.serializer.KryoSerializer 
spark.executor.cores 1 
spark.executor.memory 1g 
spark.submit.deployMode cluster 
spark.cores.max 4 

spark.shuffle.service.enabled true 
spark.dynamicAllocation.enabled true 
spark.dynamicAllocation.minExecutors 2 
spark.dynamicAllocation.maxExecutors 5 
spark.dynamicAllocation.cachedExecutorIdleTimeout 120s 
spark.dynamicAllocation.schedulerBacklogTimeout 10s 
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 20s 
spark.mesos.executor.docker.volumes /tmp:/tmp:rw 
spark.local.dir /tmp 

日誌:

16/12/08 19:20:42 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 18.0 KB, free 366.0 MB) 
16/12/08 19:20:42 INFO TorrentBroadcast: Reading broadcast variable 7 took 21 ms 
16/12/08 19:20:42 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 38.6 KB, free 366.0 MB) 
16/12/08 19:20:42 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 0, fetching them 
16/12/08 19:20:42 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://[email protected]:45422) 
16/12/08 19:20:42 INFO MapOutputTrackerWorker: Got the output locations 
16/12/08 19:20:42 INFO ShuffleBlockFetcherIterator: Getting 4 non-empty blocks out of 58 blocks 
16/12/08 19:20:42 INFO TransportClientFactory: Successfully created connection to /10.32.0.11:7337 after 2 ms (0 ms spent in bootstraps) 
16/12/08 19:20:42 INFO ShuffleBlockFetcherIterator: Started 1 remote fetches in 13 ms 
16/12/08 19:20:42 ERROR OneForOneBlockFetcher: Failed while starting block fetches java.lang.RuntimeException: java.lang.RuntimeException: Failed to open file: /tmp/blockmgr-d4df5df4-24c9-41a3-9f26-4c1aba096814/30/shuffle_0_0_0.index 
at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:234) 
... 
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) 
... 
Caused by: java.io.FileNotFoundException: /tmp/blockmgr-d4df5df4-24c9-41a3-9f26-4c1aba096814/30/shuffle_0_0_0.index (No such file or directory) 

原木馬拉松火花mesos-外部洗牌服務-TT:

... 
16/12/08 19:20:29 INFO MesosExternalShuffleBlockHandler: Received registration request from app 704aec43-1aa3-4971-bb98-e892beeb2c45-0008-driver-20161208191958-0047 (remote address /10.32.0.4:49710, heartbeat timeout 120000 ms). 
16/12/08 19:20:31 INFO ExternalShuffleBlockResolver: Registered executor AppExecId{appId=704aec43-1aa3-4971-bb98-e892beeb2c45-0008-driver-20161208191958-0047, execId=2} with ExecutorShuffleInfo{localDirs=[/tmp/blockmgr-14525ef0-22e9-49fb-8e81-dc84e5fba8b2], subDirsPerLocalDir=64, shuffleManager=org.apache.spark.shuffle.sort.SortShuffleManager} 
16/12/08 19:20:38 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 8157825166903585542 
java.lang.RuntimeException: Failed to open file: /tmp/blockmgr-14525ef0-22e9-49fb-8e81-dc84e5fba8b2/16/shuffle_0_55_0.index 
at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:234) 
... 
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) 
Caused by: java.io.FileNotFoundException: /tmp/blockmgr-14525ef0-22e9-49fb-8e81-dc84e5fba8b2/16/shuffle_0_55_0.index (No such file or directory) 
... 

但文件存在於給定的從機箱上:

$ ls -l /tmp/blockmgr-14525ef0-22e9-49fb-8e81-dc84e5fba8b2/16/shuffle_0_55_0.index 
-rw-r--r-- 1 root root 1608 Dec 8 19:20 /tmp/blockmgr-14525ef0-22e9-49fb-8e81-dc84e5fba8b2/16/shuffle_0_55_0.index 


stat shuffle_0_55_0.index 
    File: 'shuffle_0_55_0.index' 
    Size: 1608  Blocks: 8   IO Block: 4096 regular file 
    Device: 801h/2049d Inode: 1805493  Links: 1 
    Access: (0644/-rw-r--r--) Uid: ( 0/ root) Gid: ( 0/ root) 
    Access: 2016-12-08 19:20:38.163188836 +0000 
    Modify: 2016-12-08 19:20:38.163188836 +0000 
    Change: 2016-12-08 19:20:38.163188836 +0000 
    Birth: - 

回答

0

有馬拉松外部洗牌服務的配置,而不是路徑container.docker.volumes錯誤,我們應該使用container.volumes路徑。

正確配置:

{ 
    "id": "mesos-external-shuffle-service-simple", 
    "container": { 
    "type": "DOCKER", 
    "docker": { 
     "image": "jpavt/mesos-spark-hadoop:mesos-external-shuffle-service-1.0.4-2.0.1", 
     "network": "BRIDGE", 
     "portMappings": [ 
      { "hostPort": 7337, "containerPort": 7337, "servicePort": 7337 } 
     ], 
     "forcePullImage":true 
    }, 
    "volumes": [ 
     { 
      "containerPath": "/tmp", 
      "hostPath": "/tmp", 
      "mode": "RW" 
     } 
    ] 
    }, 
    "instances": 9, 
    "cpus": 0.2, 
    "mem": 512, 
    "constraints": [["hostname", "UNIQUE"]] 
} 
1

我不熟悉DCOS,馬拉松和天青不過,我對Mesos和極光與碼頭工人使用動態資源分配(Mesos外部洗牌服務)。

  • 每個Mesos代理節點都有自己的外部洗牌服務(即一個mesos代理的一個外部洗牌服務)?
  • spark.local.dir設置完全相同的字符串並指向相同的目錄?你的spark.local.dir洗牌服務是/tmp雖然,我不知道DCOS設置。
  • spark.local.dir目錄對於兩者都可讀寫嗎?如果docker啓動了mesos agent和外部shuffle服務,主機上的spark.local.dir務必安裝到兩個容器上。

EDIT

  • 如果SPARK_LOCAL_DIRS(mesos或獨立)環境變量設置,spark.local.dir將被替代。