2016-08-17 105 views
0

我嘗試創建一個包含其他系列RDD的JavaRDD。其他RDD中的Java Spark RDD?

RDDMachine.foreach(machine - > startDetectionNow()) 在裏面,機器啓動一個查詢到ES並獲得另一個RDD。我收集所有這些(1200hits)並轉換爲列表。機器開始使用該清單之後

首先:是否可以做到這一點?如果不是,我能以哪種方式嘗試做一些不同的事情?

讓我告訴我嘗試做:

 SparkConf conf = new SparkConf().setAppName("Algo").setMaster("local"); 
    conf.set("es.index.auto.create", "true"); 
    conf.set("es.nodes", "IP_ES"); 
    conf.set("es.port", "9200"); 
    sparkContext = new JavaSparkContext(conf); 

    MyAlgoConfig config_algo = new MyAlgoConfig(Detection.byPrevisionMerge); 

    Machine m1 = new Machine("AL-27", "IP1", config_algo); 
    Machine m2 = new Machine("AL-20", "IP2", config_algo); 
    Machine m3 = new Machine("AL-24", "IP3", config_algo); 
    Machine m4 = new Machine("AL-21", "IP4", config_algo); 

    ArrayList<Machine> Machines = new ArrayList(); 
    Machines.add(m1); 
    Machines.add(m2); 
    Machines.add(m3); 
    Machines.add(m4); 

    JavaRDD<Machine> machineRDD = sparkContext.parallelize(Machines); 

    machineRDD.foreach(machine -> machine.startDetectNow()); 

我想開始我的算法在每一個必須從位於Elasticsearch數據學習機。


public boolean startDetectNow() 


    // MEGA Requete ELK 
    JavaRDD dataForLearn = Elastic.loadElasticsearch(
      Algo.sparkContext 
      , "logstash-*/Collector" 
      , Elastic.req_AvgOfCall(
        getIP() 
        , "hour" 
        , "2016-04-16T00:00:00" 
        , "2016-06-10T00:00:00")); 

    JavaRDD<Hit> RDD_hits = Elastic.mapToHit(dataForLearn); 
    List<Hit> hits = Elastic.RddToListHits(RDD_hits); 

所以我試圖讓每一個「機器」查詢的所有數據。 我的問題是:是否有可能用Spark做到這一點?或者用其他方式? 當我在Spark中啓動它時;當代碼圍繞第二個RDD時,它的接縫就像鎖一樣。

和錯誤消息是:

16/08/17 0時17分十三秒INFO SparkContext:開始的工作:收集在Elastic.java:94 16/08/17 0時17分十三秒INFO DAGScheduler:有1個輸出分區的作業1(在Elastic.java:94收集) 16/08/17 00:17:13 INFO DAGScheduler:最後階段:ResultStage 1(收集在Elastic.java:94) 16/08/17 00:17:13信息DAGScheduler:最後階段的父母:列表() 16/08/17 00:17:13信息DAGScheduler:缺少父母:列表() 16/08/17 00:17:13 INFO DAGScheduler:提交ResultStage 1(地圖上的MapPartitionsRDD [4]在Elastic.java:106),其中沒有丟失的父母 16/08/17 00:17:13 INFO MemoryStore:將存儲爲內存中的值的廣播塊_1(估計大小4.3 KB,免費7.7 KB) 16/08/17 00:17:13 INFO MemoryStore:在存儲器中存儲字節數據塊broadcast_1_piece0(估計大小2.5 KB,免費10.2 KB) 16/08/17 00:17:13 INFO BlockManagerInfo:在localhost上增加broadcast_1_piece0:46356(大小:2.5 KB,免費:511.1 MB) 16/08/17 00:17:13信息SparkContext:在DAG​​Scheduler.scala從廣播創建廣播1:1006 16/08/17 00:17:13信息DAGScheduler:從ResultStage 1提交1個丟失的任務(MapPartitionsRDD [4 ] at map at Elastic.java:106) 16/08/17 00:17:13 INFO TaskSchedulerImpl:將任務集1.0添加爲1個任務 ^ C16/08/17 00:17:22 INFO SparkContext:調用stop()從關機ho確定 16/08/17 00:17:22 INFO SparkUI:停止Spark網絡用戶界面http://192.168.10.23:4040 16/08/17 00:17:22信息DAGScheduler:ResultStage 0(GuardConnect.java:60的foreach)在10,292秒內失敗 16/08/17 00:17:22 INFO DAGScheduler:作業0失敗:foreach在GuardConnect.java:60,花費10,470974 s 異常在線程「主要」org.apache.spark.SparkException:作業0取消,因爲SparkContext已關閉 at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ cleanUpAfterSchedulerStop $ 1.apply(DAGScheduler.scala:806) at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ cleanUpAfterSchedulerStop $ 1.apply( DAGScheduler.scala:804) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cl eanUpAfterSchedulerStop(DAGScheduler.scala:804) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop。的onStop(DAGScheduler.scala:1658)免 在org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) 在org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1581)在 org.apache.spark.SparkContext $$ anonfun $停止$ 9.apply $ MCV $ SP(SparkContext.scala:1740) 在org.apache.spark.util.Utils $ .tryLogNonFatalError(Utils.scala 1229) 在組織.apache.spark.SparkContext.stop(SparkContext.scala:1739) 在org.apache.spark.SparkContext $$ anonfun $ 3.apply $ MCV $ SP(SparkContext.scala:596) 在org.apache.spark.util .SparkShutdownHook.run(ShutdownHookManager.scala:267) 在org.apache.spark.util.SparkShutdownHookManager $$ anonfun runAll $ $ 1 $$ anonfun申請$ $ $ MCV SP 1.適用$ $ $ MCV SP(ShutdownHookManager.scala:239 ) 在org.apache。 spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1 $$ anonfun $ apply_schema $ MCV $ SP $ 1.適用(ShutdownHookManager.scala:239) 在org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1 $$ anonfun $ apply_schema $ MCV $ SP $ 1.適用(ShutdownHookManager.scala:239) 在org.apache.spark.util.Utils $ .logUncaughtExceptions(Utils.scala 1765) 在org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1.適用$ MCV $ SP(ShutdownHookManager.scala:239) 在org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1.適用(ShutdownHookManager.scala:239) 在org.apache.spark .util.SparkShutdownHookManager $$ anonfun $ runAll $ 1.適用(ShutdownHookManager.scala:239) 在scala.util.Try $。適用(Try.scala:161) 在org.apache.spark.util.SparkShutdownHookManager.runAll( ShutdownHookManager.scala:2 39) 在org.apache.spark.util.SparkShutdownHookManager $$匿名$ 2.run(ShutdownHookManager.scala:218) 在org.apache.hadoop.util.ShutdownHookManager $ 1.run(ShutdownHookManager.java:54) 在有機.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 在org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) 在org.apache.spark.SparkContext.runJob(SparkContext.scala 1845) 在org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) 在org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) 在org.apache.spark.rdd.RDD $$ anonfun $ $的foreach 1.適用(RDD.scala:912) 在org.apache.spark.rdd.RDD $$ anonfun $ $的foreach 1.適用(RDD.scala:910) 在org.apach e.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:150) 在org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:111) 在org.apache.spark.rdd.RDD。用示波器(RDD.scala:316) 在org.apache.spark.rdd.RDD.foreach(RDD.scala:910) 在org.apache.spark.api.java.JavaRDDLike $ class.foreach(JavaRDDLike.scala: 332) 在org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:46) 在com.seigneurin.spark.GuardConnect.main(GuardConnect.java:60) 在sun.reflect.NativeMethodAccessorImpl。 invoke0(本地方法)在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccess orImpl.java:43) 在java.lang.reflect.Method.invoke(Method.java:498) 在org.apache.spark.deploy.SparkSubmit $ .ORG阿帕奇$ $ $踢部署$ SparkSubmit $$ runMain( SparkSubmit.scala:731) 在org.apache.spark.deploy.SparkSubmit $ .doRunMain $ 1(SparkSubmit.scala:181) 在org.apache.spark.deploy.SparkSubmit $ .submit(SparkSubmit.scala:206) 在org.apache.spark.deploy.SparkSubmit $。主要(SparkSubmit.scala:121) 在org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 17年8月16日零時17分22秒ERROR LiveListenerBus:SparkListenerBus已alreadyloggedin停止!丟棄事件SparkListenerStageCompleted([email protected]) 16/08/17零時17分22秒INFO DAGScheduler:結果塔格1(收集在彈性。java:94)在9,301秒失敗 16/08/17 00:17:22錯誤LiveListenerBus:SparkListenerBus已停止!刪除事件SparkListenerStageCompleted([email protected]) 16/08/17 00:17:22錯誤LiveListenerBus:SparkListenerBus已停止!刪除事件SparkListenerJobEnd(0,1471385842813,JobFailed(org.apache.spark.SparkException:因爲SparkContext被關閉而取消了作業0)) 16/08/17 00:17:22 INFO DAGScheduler:作業1失敗:在Elastic處收集。 java:94,花了9,317650秒 16/08/17 00:17:22錯誤執行程序:階段0.0中的任務0.0中的異常(TID 0) org.apache.spark.SparkException:因爲關閉了SparkContext而導致作業1取消在org.apache.spark.scheduler.DAGScheduler上執行下列操作: $$ anonfun $ cleanUpAfterSchedulerStop $ 1.apply(DAGScheduler.scala:806) at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ cleanUpAfterSchedulerStop $ 1.apply(DAGScheduler.scala :804) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.s卡拉:804) 在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1658) 在org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) 在org.apache。 spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1581) at org.apache.spark.SparkContext $$ anonfun $ stop $ 9.apply $ mcV $ sp(SparkContext.scala:1740) at org.apache.spark。 util.Utils $ .tryLogNonFatalError(Utils.scala:1229) at org.apache.spark.SparkContext.stop(SparkContext.scala:1739) at org.apache.spark.SparkContext $$ anonfun $ 3.apply $ mcV $ sp (SparkContext.scala:596) at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:267) at org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1 $$ anonfun $ appl y $ mcV $ sp $ 1.apply $ mcV $ sp(ShutdownHookManager.scala:239) at org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply(ShutdownHookManager .scala:239) at org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply(ShutdownHookManager.scala:239) at org.apache.spark.util .Utils $ .logUncaughtExceptions(Utils.scala:1765) at org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1.apply $ mcV $ sp(ShutdownHookManager.scala:239) at org.apache.spark。 util.SparkShutdownHookManager $$ anonfun $ runAll $ 1.apply(ShutdownHookManager.scala:239) at org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1.apply(ShutdownHookManager.scala:239) at scala.util。試試$ .apply(Try.scala:161) at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:239) at org.apache.spark.util.SparkShutdownHookManager $$ anon $ 2.run(ShutdownHookManager.scala:218) at org.apache。 hadoop.util.ShutdownHookManager $ 1.run(ShutdownHookManager.java:54) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala :1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark.SparkContext.runJob (SparkContext.scala:1929) at org.apache.spark.rdd.RDD $$ anonfun $ collect $ 1.apply(RDD.scala:927) at org.apac he.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD。 withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.collect(RDD.scala:926) at org.apache.spark.api.java.JavaRDDLike $ class.collect(JavaRDDLike.scala: 339) at com.seigneurin.spark.Elastic org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:46) 。RddToListHits(Elastic.java:94) at com.seigneurin.spark.OXO.prepareDataAndLearn(OXO.java:126) at com.seigneurin.spark.OXO.startDetectNow(OXO.java:148) at com.seigneurin。 spark.GuardConnect.lambda $ main $ 1282d8df $ 1(GuardConnect.java:60) at org.apache.spark.api.java.JavaRDDLike $$ anonfun $ foreach $ 1.apply(JavaRDDLike.scala:332) at org.apache .spark.api.java.JavaRDDLike $$ anonfun $ foreach $ 1.apply(JavaRDDLike.scala:332) at scala.collection.Iterator $ class.foreach(Iterator.scala:727) at org.apache.spark.InterruptibleIterator .foreach(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD $$ anonfun $ foreach $ 1 $$ anonfun $ apply $ 32.apply(RDD.scala:912) at org.apache.spark.rdd .RDD $$ anonfun $ $的foreach 1 $$ anonf un $ apply $ 32.apply(RDD.scala:912) at org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext $$ anonfun $運行作業$ 5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89 ) at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 16/08/17 00:17:22錯誤LiveListenerBus:SparkListenerBus已停止!刪除事件SparkListenerJobEnd(1,1471385842814,JobFailed(org.apache.spark.SparkException:因爲SparkContext被關閉而取消作業1)) 16/08/17 00:17:22信息MapOutputTrackerMasterEndpoint:MapOutputTrackerMasterEndpoint已停止! 16/08/17 00:17:22 INFO MemoryStore:MemoryStore已清除 16/08/17 00:17:22信息BlockManager:BlockManager已停止 16/08/17 00:17:22信息BlockManagerMaster:BlockManagerMaster已停止 16/08/17 00:17:22信息OutputCommitCoordinator $ OutputCommitCoordinatorEndpoint:OutputCommitCoordinator已停止! 16/08/17 00:17:22 INFO RemoteActorRefProvider $ RemotingTerminator:關閉遠程守護進程。 16/08/17 00:17:22 INFO RemoteActorRefProvider $ RemotingTerminator:遠程守護進程關閉;繼續沖洗遠程傳輸。 16/08/17 00:17:22信息TaskSetManager:在階段1.0(TID 1,本地主機,分區0,ANY,6751字節)中啓動任務0.0 16/08/17 00:17:22錯誤收件箱:忽略錯誤 java.util.concurrent.RejectedExecutionException:任務[email protected][email protected]中被拒絕[Terminated,pool size = 0,active threads = 0,queued tasks = 0,completed tasks = 1] at java.util.concurrent.ThreadPoolExecutor $ AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) at java.util .concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at org.apache.spark.executor.Executor.launchTask(Executor.scala:128) at org.apache.spark.scheduler.local.LocalEndpoint $$ anonfun $ reviveOffers $ 1.apply(LocalBackend.scala:86) at org.apache.spark.scheduler.local.LocalEndpoint $$ anonfun $ reviveOffers $ 1.apply( LocalBackend.scala:84) at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalBackend.scala:84) at org。scala.collection.immutable.List.foreach(List.scala:318) apache.spark.scheduler.local.LocalEndpoint $$ anonfun $ receive $ 1.applyOrElse(LocalBackend.scala:69) at org.apache.spark.rpc.netty.Inbox $$ anonfun $ process $ 1.apply $ mcV $ sp( Inbox.scala:116) at org.apache.spark.rpc.netty.Inbox.saflaCall(Inbox.scala:204) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) at org.apache.spark.rpc.netty.Dispatcher $ Me ssageLoop。運行(Dispatcher.scala:215) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617) at java。 lang.Thread.run(Thread.java:745) 16/08/17 00:17:22 INFO SparkContext:成功停止SparkContext 16/08/17 00:17:22 INFO ShutdownHookManager:關機鉤叫 16/08/17 00:17:22信息ShutdownHookManager:刪除目錄/ tmp/spark-8bf65e78-a916-4cc0-b4d1-1b0ec9a07157 16/08/17 00:17:22信息RemoteActorRefProvider $ RemotingTerminator:遠程關閉。 16/08/17 0點十七分22秒INFO ShutdownHookManager:刪除/ tmp目錄/火花8bf65e78-a916-4cc0-b4d1-1b0ec9a07157 /的httpd-6d3aeb80-808c-4749-8f8b-ac9341f990a7

如果謝謝你可以給我一些建議。

+0

我們需要內部異常來幫助。所有這些都說明你的foreach存在問題。 –

+0

哼哼,也許是因爲我有一段時間(1)剛剛在Rdd 之後?我在想,我可以用RDD來完成這項工作。我添加了錯誤 –

+0

的完整信息RDD的RDD實際上並不合理,但是,有一種欺騙編譯器進行編譯的方法。 – GameOfThrows

回答

0

您無法在RDD內部創建RDD,無論RDD的類型如何。 這是第一條規則。這是因爲RDD是一個指向你的數據的抽象。

+0

好的,謝謝你的建議;) –