2016-11-10 263 views
2

免責聲明產生RDD後:我是新來的星火呼叫收集()不返回呼籲通過combineByKey功能

我有一個RDD它看起來像:

[(T,[Tina, Thomas]), (T,[Tolis]), (C,[Cory, Christine]), (J,[Joseph, Jimmy, James, Jackeline, Juan]), (J,[Jimbo, Jina])]

和我調用combineByKey並得到一個JavaPairRDD字符,整數>

這個調用似乎工作正確(控制流從這一點通過,並在調試器foo似乎有某種價值)

JavaPairRDD<Character, Integer> foo = rdd.combineByKey(createAcc, addAndCount, combine); 
System.out.println(foo.collect()); 

我的問題是,調用foo.collect()後程序沒有返回; 你有什麼想法嗎?我試着用Eclipse調試器來調試,但我有一點機會都沒有

我使用的Spark版本2.0.0和Java 8

編輯:由combineByKey調用的函數的代碼如下(這是顯然是一個僞代碼,因爲我是新來的火花,我與調用 combineByKey的目標是找到字符串beloning到每個鍵的列表)的總長度:

  Function<Iterable<String>, Integer> createAcc = 

      new Function<Iterable<String>, Integer>() { 

        public Integer call(Iterable<String> x) { 
          int counter = 0; 
          Iterator<String> it = x.iterator(); 
          while (it.hasNext()) { 
            counter++; 
          } 
          return counter; 
        } 
      }; 

      Function2<Integer, Iterable<String>, Integer> addAndCount = 

      new Function2<Integer,Iterable<String>, Integer>() { 

        public Integer call(Integer acc , Iterable<String> x) { 
          int counter = 0; 
          Iterator<String> it = x.iterator(); 
          while (it.hasNext()) { 
            counter++; 
          } 
          return counter + acc; 
        } 
      }; 

      Function2<Integer,Integer,Integer> combine = 

      new Function2<Integer,Integer, Integer>() { 

        public Integer call(Integer x, Integer y) { 
          return x+y; 
        } 
      }; 

UPDATE2:要求的記錄是以下

16/11/11 17:21:32 INFO SparkContext: Starting job: count at Foo.java:265 16/11/11 17:21:32 INFO DAGScheduler: Got job 9 (count at Foo.java:265) with 3 output partitions 16/11/11 17:21:32 INFO DAGScheduler: Final stage: ResultStage 20 (count at Foo.java:265) 16/11/11 17:21:32 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 19, ShuffleMapStage 18) 16/11/11 17:21:32 INFO DAGScheduler: Missing parents: List() 16/11/11 17:21:32 INFO DAGScheduler: Submitting ResultStage 20 (MapPartitionsRDD[24] at combineByKey at Foo.java:264), which has no missing parents 16/11/11 17:21:32 INFO MemoryStore: Block broadcast_12 stored as values in memory (estimated size 6.7 KB, free 1946.0 MB) 16/11/11 17:21:32 INFO MemoryStore: Block broadcast_12_piece0 stored as bytes in memory (estimated size 3.4 KB, free 1946.0 MB) 16/11/11 17:21:32 INFO BlockManagerInfo: Added broadcast_12_piece0 in memory on xxx.xxx.xx.xx:55712 (size: 3.4 KB, free: 1946.1 MB) 16/11/11 17:21:32 INFO SparkContext: Created broadcast 12 from broadcast at DAGScheduler.scala:1012 16/11/11 17:21:32 INFO DAGScheduler: Submitting 3 missing tasks from ResultStage 20 (MapPartitionsRDD[24] at combineByKey at Foo.java:264) 16/11/11 17:21:32 INFO TaskSchedulerImpl: Adding task set 20.0 with 3 tasks 16/11/11 17:21:32 INFO TaskSetManager: Starting task 0.0 in stage 20.0 (TID 30, localhost, partition 0, ANY, 5288 bytes) 16/11/11 17:21:32 INFO Executor: Running task 0.0 in stage 20.0 (TID 30) 16/11/11 17:21:32 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 3 blocks 16/11/11 17:21:32 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms

+0

什麼是數據的大小? 「foo.count()」的結果是什麼? – Yaron

+0

數據的大小非常小(我使用的rdd是過期的問題)。調用foo.count()似乎沒有返回(我假設它是出於同樣的原因)。 – XII

+0

combineByKey轉換僅在您調用某個操作時執行(例如,計數或收集)。問題可能在於combineByKey函數。你能提供更多細節嗎? – Marie

回答

1

這是一個簡單的Java問題:你的「while」循環永遠不會調用它.next,並且永遠不會結束。

改變他們

while (it.hasNext()) { 
     it.next(); 
     counter++; 
    } 
+0

似乎我是有史以來最大的noob – XII