2016-03-11 91 views
2

火花應用中,我們做兩個火花背景,星火作業失敗

1)從文件系統中讀取數據。

2)用於連接和加載數據到Cassandra。

在一個應用程序中,我們只能運行一個火花火花上下文,所以我們停止第一個然後開始第二個。

我收到以下錯誤。

Error 1) 16/03/10 05:40:44 ERROR Utils: Uncaught exception in thread  Thread-2 
java.io.IOException: Target log file already exists  (hdfs:///var/log/spark/apps/application_1457586850134_0001_2) 
    at `org.apache.spark.scheduler.EventLoggingListener.stop(EventLoggingListener.scala:225)` 

這是因爲,在Hadoop中運行作業(第1上下文)時,目錄/文件不必出現,以便運行第二個方面的日誌豬病/日誌文件存在 我這樣是上述錯誤。

我通過設置解決了問題spark.eventLog.overwrite =真


Error 2) WARN executor.CoarseGrainedExecutorBackend: An unknown (ip-10-93-141-13.ec2.internal:48849) driver disconnected. 16/03/10 06:47:37               ERROR executor.CoarseGrainedExecutorBackend: Driver 10.93.141.13:48849disassociated! Shutting down. 

我試圖增加

spark.yarn.driver.memoryOverhead = 1024

spark.yarn.executor.memoryOverhead = 1024

但問題仍然存在。


錯誤3)

Exception in thread "main" java.io.IOException: Failed to connect to /10.93.141.13:46008 
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216) 
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167) 
    at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:200) 
    at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:187) 
    at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:183) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

我已經在覈心節點檢查中,它不是在端口監聽46008.


錯誤4)

WARN YarnAllocator: Container marked as failed:    container_1457586850134_0006_01_000006 on host: ip-10-164-169-   46.ec2.internal. Exit status: 1. Diagnostics: Exception from container- launch. 
Container id: container_1457586850134_0006_01_000006 
Exit code: 1 
Stack trace: ExitCodeException exitCode=1: 
at org.apache.hadoop.util.Shell.runCommand(Shell.java:545) 
at org.apache.hadoop.util.Shell.run(Shell.java:456) 
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:722) 
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211) 
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) 
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) 
at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:745) 


Container exited with a non-zero exit code 1 

16/03/10 06:47:17 WARN ApplicationMaster: Reporter thread fails 1 time(s) in a row. 
java.lang.IllegalStateException: RpcEnv already stopped. 
at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:159) 
    at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:131) 
    at org.apache.spark.rpc.netty.NettyRpcEnv.send(NettyRpcEnv.scala:192) 
    at org.apache.spark.rpc.netty.NettyRpcEndpointRef.send(NettyRpcEnv.scala:516) 
    at org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$processCompletedContainers$1$$anonfun$apply$7.apply(YarnAllocator.scala:531) 
    at org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$processCompletedContainers$1$$anonfun$apply$7.apply(YarnAllocator.scala:512) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$processCompletedContainers$1.apply(YarnAllocator.scala:512) 
    at org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$processCompletedContainers$1.apply(YarnAllocator.scala:442) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
    at org.apache.spark.deploy.yarn.YarnAllocator.processCompletedContainers(YarnAllocator.scala:442) 
    at org.apache.spark.deploy.yarn.YarnAllocator.allocateResources(YarnAllocator.scala:242) 
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$1.run(ApplicationMaster.scala:368) 

上面的錯誤是因爲容器失敗汽車無。


上面的spark問題似乎是由於郵件列表中引發的多個上下文引起的。

https://mail-archives.apache.org/mod_mbox/spark-issues/201602.mbox/%[email protected]A%3E

sparkContext.stop()無法騰出資源。


我運行使用下列選項

--class com.mobi.vserv.driver.Query5kPids        
    --conf spark.eventLog.overwrite=true 
    --conf spark.yarn.executor.memoryOverhead=1024      
    --conf spark.yarn.driver.memoryOverhead=1024        
    --num-executors 4     
    --executor-memory 3g    
    --executor-cores 2     
    --driver-memory 3g 

我上EMR運行,與主和2從屬節點,主設備具有8個核心和16GB存儲器和每個從具有4個核心5120 MB可用內存。

以下是我的代碼。

public class Query5kPids implements Serializable{ 

    static List<UserSetGet> ListFromS3 = new ArrayList<UserSetGet>(); 

    public static void main(String[] args) throws JSONException, IOException, InterruptedException, URISyntaxException { 


    SparkConf conf = new SparkConf(); 
    conf.setAppName("Spark-Cassandra Integration"); 
    conf.setMaster("yarn-cluster"); 
    conf.set("spark.cassandra.connection.host", "12.16.193.19"); 
    conf.set("spark.cassandra.connection.port", "9042"); 


    SparkConf conf1 = new SparkConf().setAppName("SparkAutomation").setMaster("yarn-cluster"); 

    Query5kPids app1 = new Query5kPids(conf1); 
    app1.run1(file); 

    Query5kPids app = new Query5kPids(conf); 
    System.out.println("Both RDD has been generated"); 
    app.run(); 

    } 

    private void run() throws JSONException, IOException, InterruptedException { 

    JavaSparkContext sc = new JavaSparkContext(conf); 
    query(sc); 
    sc.stop(); 
    } 

    private void run1(File file) throws JSONException, IOException,  InterruptedException { 
    JavaSparkContext sc = new JavaSparkContext(conf); 
    getData(sc,file); 
    sc.stop(); 

    } 

    private void getData(JavaSparkContext sc, File file) { 

    JavaRDD<String> Data = sc.textFile(file.toString()); 
    System.out.println("RDD Count is " + Data.count()); 

    // Other map opetations to convert to UserSetGet RDD. 
    ListFromS3 = Data.collect(); 

    } 
    private void query(JavaSparkContext sc) { 

    System.out.prin`enter code here`tln("RDD Count is " + ListFromS3.size()); 
    //This gets printed. 
    //Which means it application is coming to the second part of the program. 

    for (int i = 0; i < ListFromS3.size(); i++) { 

    sb.append(ListFromS3.get(i).getApnid()); 
    sb.append(','); 
    } 
    sb.setLength(sb.length() - 3); 

    JavaRDD<CassandraRow> cassandraRDD = CassandraJavaUtil.javaFunctions(sc).cassandraTable("dmp", "user_profile_spark_test").select  ("app_day_count", "app_first_seen","app_last_seen", "app_usage_count", "total_day_count", "total_usage_count") 
.where("apnid IN ('" + sb + "')"); 

    if(cassandraRDD.isEmpty()){ 

    JavaRDD<UserSetGet> rddFromGz = sc.parallelize(ListFromS3); 

    CassandraJavaUtil.javaFunctions(rddFromGz).writerBuilder("dmp", "user_profile_spark_test", mapToRow(UserSetGet.class)).saveToCassand(); 
     logger.info("DataSaved"); 
    } 
    } 

    } 

下面是我的POM

<dependencies> 
    <dependency> 
    <groupId>org.apache-extras.cassandra-jdbc</groupId> 
    <artifactId>cassandra-jdbc</artifactId> 
    <version>1.2.5</version> 
    </dependency> 

    <dependency> 
    <groupId>junit</groupId> 
    <artifactId>junit</artifactId> 
    <version>3.8.1</version> 
    <scope>test</scope> 
    </dependency> 
    <dependency> 
    <groupId>org.codehaus.jettison</groupId> 
    <artifactId>jettison</artifactId> 
    <version>1.3.7</version> 
    </dependency> 
    <dependency> 
    <groupId>log4j</groupId> 
    <artifactId>log4j</artifactId> 
    <version>1.2.17</version> 
    </dependency> 


    <dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-core_2.10</artifactId> 
    <version>1.6.0</version> 
    </dependency> 

    <dependency> 
    <groupId>com.datastax.spark</groupId> 
    <artifactId>spark-cassandra-connector_2.10</artifactId> 
    <version>1.5.0-M1</version> 
    </dependency> 

    <dependency> 
    <groupId>com.datastax.cassandra</groupId> 
    <artifactId>cassandra-driver-core</artifactId> 
    <version>2.1.6</version> 
    </dependency> 

    <dependency> 
    <groupId>com.datastax.spark</groupId> 
    <artifactId>spark-cassandra-connector-java_2.10</artifactId> 
    <version>1.5.0-M3</version> 
    </dependency> 

    <dependency> 
    <groupId>org.apache.commons</groupId> 
    <artifactId>commons-collections4</artifactId> 
    <version>4.1</version> 
    </dependency> 
    </dependencies> 

回答

1

我們正在運行與當地的火花背景下的測試和使用下面的 「黑客」 來解決衝突問題

sc.stop() //要避免Akka重新綁定到相同的端口,因爲它不會在關機時立即解除綁定

System.clearProperty(「spark.driver.port」)

你使用2種不同的火花上下文的任何原因?爲什麼你不能只使用1?

+0

感謝您的回答,我應該在sc.stop()之前或之後調用System.clearProperty(「spark.driver.port」),並抱歉因爲System.clearProperty將選擇spark.driver.port ?因爲第一個上下文從文件系統讀取數據,第二個從cassandra讀取,所以我還使用了兩個spark上下文。 –

+0

我之所以問過,我使用默認的驅動程序端口,並沒有明確指出,只是爲了添加它是好的如果我明確地給出 –

+0

我明確給兩個上下文的驅動程序端口,但現在它給予無法連接到/10.230.132.121:51324,這是核心節點之一的IP。容器仍然失敗。 –