2016-08-24 125 views
0

我是新來激發使用星火1.6.1兩名工人各有內存1GB和分配5芯,運行在一個33MB文件的代碼。Pyspark錯誤Java堆空間錯誤

此代碼用於在火花中索引單詞。

from textblob import TextBlob as tb 
from textblob_aptagger import PerceptronTagger 
import numpy as np 
import nltk.data 
import Constants 
from pyspark import SparkContext,SparkConf 
import nltk 

TOKENIZER = nltk.data.load('tokenizers/punkt/english.pickle') 

def word_tokenize(x): 
    return nltk.word_tokenize(x) 

def pos_tag (s): 
    global TAGGER 
    return TAGGER.tag(s) 

def wrap_words (pair): 
    ''' associable each word with index ''' 
    index = pair[0] 
    result = [] 
    for word, tag in pair[1]: 
    word = word.lower() 
    result.append({ "index": index, "word": word, "tag": tag}) 
    index += 1 
    return result 

if __name__ == '__main__': 

    conf = SparkConf().setMaster(Constants.MASTER_URL).setAppName(Constants.APP_NAME) 
    sc = SparkContext(conf=conf) 
    data = sc.textFile(Constants.FILE_PATH) 

    sent = data.flatMap(word_tokenize).map(pos_tag).map(lambda x: x[0]).glom() 
    num_partition = sent.getNumPartitions() 
    base = list(np.cumsum(np.array(sent.map(len).collect()))) 
    base.insert(0, 0) 
    base.pop() 
    RDD = sc.parallelize(base,num_partition) 
    tagged_doc = RDD.zip(sent).map(wrap_words).cache() 

對於較小的文件< 25MB的代碼工作正常,但給錯誤的文件,其尺寸較大的是25MB。
幫我解決這個問題或提供替代這個問題?

回答

0

這是因爲.collect()。當你將rdd轉換成經典的python變量(或np.array)時,你會失去所有的東西,所有的數據都被收集在同一個地方。

+0

你能否提出一個替代解決方案呢? – arjun045

+0

你會解釋你在做什麼,這是一個不同的問題。無論如何,如果你使用spark,collect只是爲了調試,堅持rdd操作。有很多方法可以在pyspark中寫出累計總和,如果你在某個地方停留一段時間後再進行一些討論,然後再進行一些研究。 – marmouset