3
我正在嘗試執行以下操作,在將創建隊列的單獨線程中啓動流偵聽器,稍後將對這些隊列進行處理...但是Storm
不會在執行任何操作後執行任何操作線。它卡在那裏。線程在Storm Cluster中不工作
我的代碼如下所示:
import os, sys, traceback, random, StringIO, time
import random
from uuid import uuid4
from select import select
from subprocess import Popen,PIPE
import pyinotify
import simplejson, pycurl
import sys, signal
import twitter
import tweepy
import Queue
import threading
try:
import simplejson as json
except ImportError:
import json
import storm
queue = Queue.Queue()
class MyModelParser(tweepy.parsers.ModelParser):
def parse(self, method, payload):
result = super(MyModelParser, self).parse(method, payload)
result._payload = json.loads(payload)
return result
class CustomStreamListener(tweepy.StreamListener):
''' Handles data received from the stream. '''
def __init__(self, api, q):
self.api = api
self.queue = q
self.queue.put('lalala')
def on_status(self, status):
self.queue.put('%s' % status.author.screen_name)
self.queue.task_done()
def on_error(self, status_code):
return True # To continue listening
def on_timeout(self):
return True # To continue listening
class Starter():
def __init__(self,q):
self.queue = q
hashtag = ['justinbieber','snooki','daddy_yankee','MikeTyson','iamdiddy','lala']
auth = self.t_auth()
api = tweepy.API(auth, parser=MyModelParser())
stream = tweepy.streaming.Stream(auth,CustomStreamListener(api,queue))
stream.filter(follow=None, track=hashtag)
def t_auth(self):
consumer_key=""
consumer_secret=""
access_key = ""
access_secret = ""
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_key, access_secret)
return auth
class TwitterSpout(storm.Spout):
SPOUT_NAME = "TwitterSpout"
queue = queue
def initialize(self, conf, context):
self.pid = os.getpid()
try:
t = threading.Thread(target=Starter(self.queue))
t.daemon=True
t.start()
except KeyboardInterrupt, e:
self.log('\n\nStopping')
raise
不知道這麼多蟒蛇。仍然在問,如果你只是試圖推進隊列,然後從那裏進行處理,它是否有效? – abhi 2013-05-04 10:07:53