checkpoint
,與Spark中的許多其他操作一樣,是懶惰的。數據實際上是檢查點,當且僅當給定的RDD被實現時。您看到的空目錄是特定於應用程序的檢查點目錄。
如果您希望檢查點發生,您必須觸發一個操作來評估相應的RDD。舉例(本地模式):
import glob
import os
from urllib.parse import urlparse
sc.setCheckpointDir("/tmp/checkpoints/")
ch_dir = os.path.join(urlparse(sc._jsc.getCheckpointDir().orElse("")).path, "*")
rdd = sc.range(1000, 10)
plus_one = rdd.map(lambda x: x + 1)
plus_one.cache()
plus_one.checkpoint() # No checkpoint dir here yet
[os.path.split(x)[-1] for x in glob.glob(ch_dir)]
## []
plus_one.isCheckpointed()
## False
# After count is executed you'll see rdd specific checkpoint dir
plus_one.count()
[os.path.split(x)[-1] for x in glob.glob(ch_dir)]
## ['rdd-1']
plus_one.isCheckpointed()
## True
您還可以分析調試字符串之前:
## (8) PythonRDD[1] at RDD at PythonRDD.scala:48 [Memory Serialized 1x Replicated]
## | ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:475 [Memory Serialized 1x Replicated]
和行動後:
## (8) PythonRDD[1] at RDD at PythonRDD.scala:48 [Memory Serialized 1x Replicated]
## | CachedPartitions: 8; MemorySize: 168.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
## | ReliableCheckpointRDD[3] at count at <ipython-input-16-96e746c56973>:1 [Memory Serialized 1x Replicated]
正如你可以看到RDD將被計算之前從零開始,但在count
之後,您將獲得ReliableCheckpointRDD
。