2017-06-16 61 views
1

輸出:爲什麼我的火花的工作停留在卡夫卡流提交給Spark在由MINICUBE創建kubernetes羣簇火花作業後

foreachRDD在myfile.scala:

----------------- RUNNING ---------------------- 
[Stage 0:>               (0 + 0)/2]17/06/16 16:08:15 INFO VerifiableProperties: Verifying properties 
17/06/16 16:08:15 INFO VerifiableProperties: Property group.id is overridden to xxx 
17/06/16 16:08:15 INFO VerifiableProperties: Property zookeeper.connect is overridden to 
xxxxxxxxxxxxxxxxxxxxx 
[Stage 0:>               (0 + 0)/2] 

從火花網頁UI信息:49個+細節

org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:625) myfile.run(myfile.scala:49)Myjob $。主要(Myjob.scala:100) Myjob.main(Myjob.scala) sun.reflect.NativeMethodAccessorImpl.invoke0(本機方法) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43 ) org.apache.spark.deploy.SparkSubmit $ .org $ apache $ spark $ deploy $ SparkSubmit $$ runMain(SparkSubmit.scala:743) org.apache.spark.deploy.SparkSubmit $ .doRunMain $ 1(SparkSubmit.scala:187) org.apache.spark.deploy.SparkSubmit $ .submit(SparkSubmit.scala:212) org.apache.spark.deploy.SparkSubmit $ .main(SparkSubmit.scala:126) org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

我的代碼:

println("----------------- RUNNING ----------------------"); 
    eventsStream.foreachRDD { rdd => 
     println("xxxxxxxxxxxxxxxxxxxxx") 
     //println(rdd.count()); 
    if(!rdd.isEmpty) 
    { 
     println("yyyyyyyyyyyyyyyyyyyyyyy") 
     val df = sqlContext.read.json(rdd); 
     df.registerTempTable("data"); 

     val rules = rulesSource.rules(); 
     var resultsRDD : RDD[(String,String,Long,Long,Long,Long,Long,Long)]= sc.emptyRDD; 
     rules.foreach { rule => 
     ... 
     } 

     sqlContext.dropTempTable("data") 
    } 
    else 
    { 
     println("-------"); 
     println("NO DATA"); 
     println("-------"); 
    } 
} 

任何想法?由於

UPDATE

我的火花的工作在獨立的火花泊塢窗容器中運行良好。但如果提交給kubernetes集羣中的spark集羣,它將卡住在kafka流中。不知道爲什麼?

火花主YAML的文件是從https://github.com/phatak-dev/kubernetes-spark/blob/master/spark-master.yaml

apiVersion: extensions/v1beta1 
kind: Deployment 
metadata: 
    labels: 
    name: spark-master 
    name: spark-master 
spec: 
    replicas: 1 
    template: 
    metadata: 
     labels: 
     name: spark-master 
    spec: 
     containers: 
     - name : spark-master 
     image: spark-2.1.0-bin-hadoop2.6 
     imagePullPolicy: "IfNotPresent" 
     name: spark-master 
     ports: 
     - containerPort: 7077 
      protocol: TCP 
     command: 
     - "/bin/bash" 
     - "-c" 
     - "--" 
     args : 
- './start-master.sh ; sleep infinity' 
+0

我有類似的問題。我正在爲卡法卡經紀人0.10使用實驗性火花流。一個任務卡住了,沒有記憶。另一個很快返回。所以整個事情都卡住了。 –

回答

1

日誌將有助於診斷問題。

實質上,您無法在RDD操作中創建另一個RDD。 即rdd1.map{rdd2.count()}無效

瞭解如何在導入implicit sqlContext之後將RDD轉換爲數據幀。

 import sqlContext.implicits._ 
     eventsStream.foreachRDD { rdd => 

      println("yyyyyyyyyyyyyyyyyyyyyyy") 

      val df = rdd.toDF(); 
      df.registerTempTable("data"); 
      .... //Your logic here. 
      sqlContext.dropTempTable("data") 
     } 
+0

我的火花作業在獨立火花的碼頭集裝箱中運行良好。但如果提交給kubernetes集羣中的spark集羣,它將卡住在kafka流中。不知道爲什麼? – BAE

+0

你的火花日誌說什麼?你能看到火花Web UI嗎?它給你提示嗎?在批處理持續時間結束後,看看Spark Web UI是否有Streaming選項卡。 – Manas