2016-06-12 39 views
4

我已經使用sc.setCheckpointDir方法設置了檢查點目錄。RDD.checkpoint()不在檢查點目錄中存儲任何數據

/checkpointDirectory/ 

我然後創建了一個RDD的檢查點:rdd.checkpoint(),並在目錄中,我現在看到一個新的目錄代表新的關卡,在字母隨機字符串的形式。在那個目錄裏面什麼也沒有。

/checkpointDirectory/37d2812a-bca2-4fc5-b5d4-221ae03a6d25/ [empty] 

然後做了幾個轉換之後,我跑rdd.checkpoint()一遍,仍然沒有什麼,最近創建的目錄

/checkpointDirectory/37d2812a-bca2-4fc5-b5d4-221ae03a6d25/ [empty] 

我使用checkpoint()錯了嗎?我應該在目錄中看到什麼才能知道它的正常工作?

回答

4

checkpoint,與Spark中的許多其他操作一樣,是懶惰的。數據實際上是檢查點,當且僅當給定的RDD被實現時。您看到的空目錄是特定於應用程序的檢查點目錄。

如果您希望檢查點發生,您必須觸發一個操作來評估相應的RDD。舉例(本地模式):

import glob 
import os 
from urllib.parse import urlparse 

sc.setCheckpointDir("/tmp/checkpoints/") 
ch_dir = os.path.join(urlparse(sc._jsc.getCheckpointDir().orElse("")).path, "*") 

rdd = sc.range(1000, 10) 
plus_one = rdd.map(lambda x: x + 1) 
plus_one.cache() 
plus_one.checkpoint() # No checkpoint dir here yet 

[os.path.split(x)[-1] for x in glob.glob(ch_dir)] 
## [] 
plus_one.isCheckpointed() 
## False 

# After count is executed you'll see rdd specific checkpoint dir 
plus_one.count() 
[os.path.split(x)[-1] for x in glob.glob(ch_dir)] 
## ['rdd-1'] 
plus_one.isCheckpointed() 
## True 

您還可以分析調試字符串之前:

## (8) PythonRDD[1] at RDD at PythonRDD.scala:48 [Memory Serialized 1x Replicated] 
## | ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:475 [Memory Serialized 1x Replicated] 

和行動後:

## (8) PythonRDD[1] at RDD at PythonRDD.scala:48 [Memory Serialized 1x Replicated] 
## |  CachedPartitions: 8; MemorySize: 168.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B 
## | ReliableCheckpointRDD[3] at count at <ipython-input-16-96e746c56973>:1 [Memory Serialized 1x Replicated] 

正如你可以看到RDD將被計算之前從零開始,但在count之後,您將獲得ReliableCheckpointRDD

0

檢查點將定期完成(即檢查點持續時間)。您需要將檢查點持續時間告訴給您的spark上下文。

相關問題