我使用pyspark與Kafka接收器來處理推文流。我的應用程序的其中一個步驟包括致電Google Natural Language API以獲取每條推文的情緒分數。但是,我看到API每次處理的推文都會接到幾個電話(我在Google雲端控制檯中看到了電話號碼)。另外,如果我打印tweetIDs(映射函數內),我會得到相同的ID 3或4次。在我的應用程序結束時,推文被髮送到卡夫卡的另一個主題,我得到了正確的推文數(沒有重複的ID),所以原則上一切正常,但我不知道如何避免調用Google API每次推文不止一次。火花流媒體重複網絡電話
這是否與Spark或Kafka中的一些配置參數有關?
這裏是我的控制檯輸出的一個例子:
TIME 21:53:36: Google Response for tweet 801181843500466177 DONE!
TIME 21:53:36: Google Response for tweet 801181854766399489 DONE!
TIME 21:53:36: Google Response for tweet 801181844808966144 DONE!
TIME 21:53:37: Google Response for tweet 801181854372012032 DONE!
TIME 21:53:37: Google Response for tweet 801181843500466177 DONE!
TIME 21:53:37: Google Response for tweet 801181854766399489 DONE!
TIME 21:53:37: Google Response for tweet 801181844808966144 DONE!
TIME 21:53:37: Google Response for tweet 801181854372012032 DONE!
但在卡夫卡接收我只得到4周處理的tweet(這是接受,因爲他們只有4個獨特的鳴叫正確的事)。
執行此代碼是:
def sendToKafka(rdd,topic,address):
publish_producer = KafkaProducer(bootstrap_servers=address,\
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
records = rdd.collect()
msg_dict = defaultdict(list)
for rec in records:
msg_dict["results"].append(rec)
publish_producer.send(resultTopic,msg_dict)
publish_producer.close()
kafka_stream = KafkaUtils.createStream(ssc, zookeeperAddress, "spark-consumer-"+myTopic, {myTopic: 1})
dstream_tweets=kafka_stream.map(lambda kafka_rec: get_json(kafka_rec[1]))\
.map(lambda post: add_normalized_text(post))\
.map(lambda post: tagKeywords(post,tokenizer,desired_keywords))\
.filter(lambda post: post["keywords"] == True)\
.map(lambda post: googleNLP.complementTweetFeatures(post,job_id))
dstream_tweets.foreachRDD(lambda rdd: sendToKafka(rdd,resultTopic,PRODUCER_ADDRESS))
你已經做了什麼?你可以把你的代碼粘貼到這個問題上嗎? –
我用代碼更新了問題。 googleNLP.complementTweetFeatures()向Google API發出一個請求並返回響應。 –