2017-03-02 100 views
0

我已將我的工作代碼庫從spark 1.6遷移到2.1。運行我的代碼時發生錯誤。它顯示錯誤,而我正在使用toLocalIterator方法RDD。我試圖從錯誤日誌中獲取粘合劑似乎不是工作。從Spark 1.6遷移到Spark 2.1到LocalIterator拋出錯誤

logger.info("Generating common words started!.") 
logger.info("Reading from mongo db") 
mongo_db = MongoDBConnector() 
logger.info("Connecting to mongo database.") 
db = mongo_db.connect_database() 
logger.info("Update lda_vector of all_documents to default value -1.") 
mongo_db.update_lda_vector(db, passive_article) 
logger.info("Getting documents from mongo datbase to train.") 
records = mongo_db.get_documents(db, passive_article, years=0, days=2) 
logger.info("Parallel data for spark execution.") 
rdd_records = sparkContext.parallelize(records) 
logger.info("Total number of records: " + str(rdd_records.count())) 
logger.info("Spark map and parse documents") 
rdd_cached_docs = rdd_records.map(parse_required_data).map(tokenize_text).map(remove_stopwords).map(
    remove_common_words).map(filter_token_using_pos_tgs) 
rdd_cached_list = rdd_cached_docs.map(lambda data_tuple: data_tuple[1]).flatMap(
    lambda data: [porter_stemmer.value.stem(i) for i in data]) 
logger.info("Generating the word list from tokenized text") 
logger.info("Total word list count "+str(rdd_cached_list.count())) 
word_list = list(rdd_cached_list.toLocalIterator()) 

錯誤日誌

17/03/02 18:03:45 INFO TaskSetManager: Starting task 0.0 in stage 2.0 
(TID 8, localhost, executor driver, partition 0, PROCESS_LOCAL, 580886 bytes) 
    17/03/02 18:03:45 INFO Executor: Running task 0.0 in stage 2.0 (TID 8) 
    Traceback (most recent call last): 
     File "/home/st701am/projects/recs/spark/lda_builder.py", line 257, in <module> 
     pre_process_documents(passive_article) 
     File "/home/st701am/projects/recs/spark/lda_builder.py", line 108, in pre_process_documents 
     word_list = list(rdd_cached_list.toLocalIterator()) 
     File "/opt/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 140, in _load_from_socket 
     File "/opt/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 144, in load_stream 
     File "/opt/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 161, in _read_with_length 
     File "/opt/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 555, in read_int 
     File "/opt/anaconda3/lib/python3.5/socket.py", line 575, in readinto 
     return self._sock.recv_into(b) 
    socket.timeout: timed out 
    17/03/02 18:03:48 INFO SparkContext: Invoking stop() from shutdown hook 
    17/03/02 18:03:48 INFO SparkUI: Stopped Spark web UI at http://172.28.20.110:4040 
    17/03/02 18:03:48 INFO DAGScheduler: Job 2 failed: toLocalIterator at /home/st701am/projects/recs/spark/lda_builder.py:108, took 3.813652 s 
    17/03/02 18:03:48 INFO DAGScheduler: ResultStage 2 (toLocalIterator at /home/st701am/projects/recs/spark/lda_builder.py:108) failed in 3.806 s due to Stage cancelled because SparkContext was shut down 
    17/03/02 18:03:48 ERROR PythonRDD: Error while sending iterator 
    org.apache.spark.SparkException: Job 2 cancelled because SparkContext was shut down 
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:808) 
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:806) 
      at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) 

回答

0

我不得不使用toLocalIterator同樣的問題上2.0,我發現,如果我緩存RDD然後超時消失。試着用替換

logger.info("Total word list count "+str(rdd_cached_list.count())) 
word_list = list(rdd_cached_list.toLocalIterator()) 

rdd_cached_list.cache() 
logger.info("Total word list count "+str(rdd_cached_list.count())) 
word_list = list(rdd_cached_list.toLocalIterator()) 

但爲什麼您使用list(iterator)collect rdd會不會更容易?