2016-06-11 60 views
4

這裏我粘貼我在火花上運行的python代碼,以便對數據執行一些分析。我可以在少量的數據集上運行以下程序。但是當大型數據集出現時,它說「第一階段包含一個非常大的任務(17693 KB),建議的最大任務大小爲100 KB」。如何解決:火花中的大尺寸任務

import os 
import sys 
import unicodedata 
from operator import add 

try: 
    from pyspark import SparkConf 
    from pyspark import SparkContext 
except ImportError as e: 
    print ("Error importing Spark Modules", e) 
    sys.exit(1) 

def tokenize(text): 
    resultDict = {} 
    text = unicodedata.normalize('NFKD', text).encode('ascii','ignore') 

    str1= text[1] 
    str2= text[0] 

    arrText= text.split(str1) 

    ss1 = arrText[0].split("/") 

    docID = ss1[0].strip() 

    docName = ss[1].strip() 

    resultDict[docID+"_"+docName] = 1 

    return resultDict.iteritems() 

sc=SparkContext('local') 
textfile = sc.textFile("path to my data") 
fileContent = textfile.flatMap(tokenize) 
rdd = sc.parallelize(fileContent.collect()) 
rdd= rdd.map(lambda x: (x[0], x[1])).reduceByKey(add) 
print rdd.collect() 
#reduceByKey(lambda a,b: a+b) 
rdd.coalesce(1).saveAsTextFile("path to result") 

在這裏,我發佈了一個更多的警告:在此之後的工作沒有運行。有人可以幫我弄這個嗎。

16/06/10 19:19:58 WARN TaskSetManager: Stage 1 contains a task of very large size (17693 KB). The maximum recommended task size is 100 KB. 
16/06/10 19:19:58 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 5314, localhost, partition 0,PROCESS_LOCAL, 18118332 bytes) 
16/06/10 19:19:58 INFO Executor: Running task 0.0 in stage 1.0 (TID 5314) 
16/06/10 19:43:00 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:43480 in memory (size: 3.9 KB, free: 511.1 MB) 
16/06/10 19:43:00 INFO ContextCleaner: Cleaned accumulator 2 
+3

這個警告基本上說,你的一個轉換函數太大了。既然你說這個工作不會在這之後繼續下去,我會尋找一些增加記憶足跡的inf循環。 –

+0

你的意思是說我們需要在使用flag --executor-memory進行編譯時增加內存。 –

回答

1

當Spark序列化任務時,它會遞歸地序列化完整的閉包上下文。在這種情況下,邏輯罪魁禍首似乎是您在tokenize中使用的unicodedata。我可能是錯的,但在代碼中沒有看到其他重要的數據結構。 (注意,我通常在Spark中使用Scala,而我的Python是生鏽的。)我想知道庫是否受到執行程序節點上不可用的大量數據結構的支持。

處理這些類型的問題的典型模式是:

  1. 確保所有圖書館都可以執行人節點上。

  2. 使用廣播變量將重型數據結構分發給執行者。

無關,除非你正在使用這個作爲調試工具,你用collect做的所有數據的許多不必要的集合恢復到驅動程序。轉化可以鏈接:

sc.textFile(...).flatMap(...).map(...).reduceByKey(add).coalesce(1).saveAsTextFile(...) 
+0

它對我來說很有意義...我沒有'收集'就運行它。來到編碼,我必須編碼大文本才能訪問它。順便說一下,這項工作完成了所有的地圖功能。它只是停在reduceByKey上...所以我懷疑'unicodedata'是否有問題。 –

+0

順便說一句,我在單個節點上運行它......會是這個問題的原因...... –

+0

你說得對,那麼問題不應該是'unicodedata'。它應該是「添加」,這沒有多大意義。以本地模式運行僅僅意味着您可能會有額外的RAM限制,但例外情況表明,該問題特別與基於閉包的序列化大小相關,而與RAM內處理大小相反。 – Sim