2016-07-30 55 views
0

我是新來的Apache Spark和一張簡單的地圖功能實現爲PySpark地圖不工作

from pyspark import SparkContext 
sc = SparkContext('local', 'pyspark') 

f = open("Tweets_tokenised.txt") 
tokenised_tweets = f.readlines() 

f = open("positive.txt") 
pos_words=f.readlines() 
f = open("negative.txt") 
neg_words=f.readlines() 
def sentiment(line): 
    global pos_words 
    global neg_words 
    pos = 0 
    neg = 0 

    for word in line.split(): 
     if word in pos_words: 
      pos=pos+1 

     if word in neg_words: 
      neg=neg+1 

    if(pos > neg): 
     return 1 
    else: 
     return 0 
dist_tweets=sc.textFile("Tweets_tokenised.txt").map(sentiment) 
#(lambda line: sentiment(line)) 
dist_tweets.saveAsTextFile("RDD.txt") 

基本上我讀文件(含標記化和去梗鳴叫),然後做一個簡單的正負字數在它的map函數中(第3行),但RDD.txt沒有任何內容。函數的情緒根本沒有被調用。 有人能指出錯誤

回答

2

不能在Apache Spark改變map轉型中的全局變量的值來實現這一點,你需要一個Accumulator,使用起來我認爲這是不正確的做法然而,即使使用。

在你的情況下,如果你的pos_wordsneg_words不是那麼大,你可以將它們定義爲Broadcast列表,然後通過sentiment來計數。

喜歡的東西:

pos = sc.broadcast(["good", "gold", "silver"]) 
neg = sc.broadcast(["evil", "currency", "fiat"]) 

# I will suppose that every record is a different tweet and are stored in tuples. 
tweets = sc.parallelize([("banking", "is", "evil"), ("gold", "is", "good")]) 

(tweets 
.flatMap(lambda x: x) 
.map(lambda x: (1 if x in pos.value else -1 if x in neg.value else 0, 1)) 
.reduceByKey(lambda a, b: a + b).take(3)) 

# notice that I count neutral words. 
# output -> [(0, 3), (1, 2), (-1, 1)] 

注意,你可以檢查例子權here

PD:如果您的想法是計算每條消息的正面和負面詞,則該方法會略有不同。

+0

謝謝..你對改變全局變量是正確的...我得到了錯誤... – Solo