免責聲明產生RDD後:我是新來的星火呼叫收集()不返回呼籲通過combineByKey功能
我有一個RDD它看起來像:
[(T,[Tina, Thomas]), (T,[Tolis]), (C,[Cory, Christine]), (J,[Joseph, Jimmy, James, Jackeline, Juan]), (J,[Jimbo, Jina])]
和我調用combineByKey並得到一個JavaPairRDD字符,整數>
這個調用似乎工作正確(控制流從這一點通過,並在調試器foo似乎有某種價值)
JavaPairRDD<Character, Integer> foo = rdd.combineByKey(createAcc, addAndCount, combine);
System.out.println(foo.collect());
我的問題是,調用foo.collect()後程序沒有返回; 你有什麼想法嗎?我試着用Eclipse調試器來調試,但我有一點機會都沒有
我使用的Spark版本2.0.0和Java 8
編輯:由combineByKey調用的函數的代碼如下(這是顯然是一個僞代碼,因爲我是新來的火花,我與調用 combineByKey的目標是找到字符串beloning到每個鍵的列表)的總長度:
Function<Iterable<String>, Integer> createAcc =
new Function<Iterable<String>, Integer>() {
public Integer call(Iterable<String> x) {
int counter = 0;
Iterator<String> it = x.iterator();
while (it.hasNext()) {
counter++;
}
return counter;
}
};
Function2<Integer, Iterable<String>, Integer> addAndCount =
new Function2<Integer,Iterable<String>, Integer>() {
public Integer call(Integer acc , Iterable<String> x) {
int counter = 0;
Iterator<String> it = x.iterator();
while (it.hasNext()) {
counter++;
}
return counter + acc;
}
};
Function2<Integer,Integer,Integer> combine =
new Function2<Integer,Integer, Integer>() {
public Integer call(Integer x, Integer y) {
return x+y;
}
};
UPDATE2:要求的記錄是以下
16/11/11 17:21:32 INFO SparkContext: Starting job: count at Foo.java:265 16/11/11 17:21:32 INFO DAGScheduler: Got job 9 (count at Foo.java:265) with 3 output partitions 16/11/11 17:21:32 INFO DAGScheduler: Final stage: ResultStage 20 (count at Foo.java:265) 16/11/11 17:21:32 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 19, ShuffleMapStage 18) 16/11/11 17:21:32 INFO DAGScheduler: Missing parents: List() 16/11/11 17:21:32 INFO DAGScheduler: Submitting ResultStage 20 (MapPartitionsRDD[24] at combineByKey at Foo.java:264), which has no missing parents 16/11/11 17:21:32 INFO MemoryStore: Block broadcast_12 stored as values in memory (estimated size 6.7 KB, free 1946.0 MB) 16/11/11 17:21:32 INFO MemoryStore: Block broadcast_12_piece0 stored as bytes in memory (estimated size 3.4 KB, free 1946.0 MB) 16/11/11 17:21:32 INFO BlockManagerInfo: Added broadcast_12_piece0 in memory on xxx.xxx.xx.xx:55712 (size: 3.4 KB, free: 1946.1 MB) 16/11/11 17:21:32 INFO SparkContext: Created broadcast 12 from broadcast at DAGScheduler.scala:1012 16/11/11 17:21:32 INFO DAGScheduler: Submitting 3 missing tasks from ResultStage 20 (MapPartitionsRDD[24] at combineByKey at Foo.java:264) 16/11/11 17:21:32 INFO TaskSchedulerImpl: Adding task set 20.0 with 3 tasks 16/11/11 17:21:32 INFO TaskSetManager: Starting task 0.0 in stage 20.0 (TID 30, localhost, partition 0, ANY, 5288 bytes) 16/11/11 17:21:32 INFO Executor: Running task 0.0 in stage 20.0 (TID 30) 16/11/11 17:21:32 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 3 blocks 16/11/11 17:21:32 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
什麼是數據的大小? 「foo.count()」的結果是什麼? – Yaron
數據的大小非常小(我使用的rdd是過期的問題)。調用foo.count()似乎沒有返回(我假設它是出於同樣的原因)。 – XII
combineByKey轉換僅在您調用某個操作時執行(例如,計數或收集)。問題可能在於combineByKey函數。你能提供更多細節嗎? – Marie