2013-04-26 62 views
3

我正在嘗試執行以下操作,在將創建隊列的單獨線程中啓動流偵聽器,稍後將對這些隊列進行處理...但是Storm不會在執行任何操作後執行任何操作線。它卡在那裏。線程在Storm Cluster中不工作

我的代碼如下所示:

import os, sys, traceback, random, StringIO, time 
import random 
from uuid import uuid4 
from select import select 
from subprocess import Popen,PIPE 
import pyinotify 
import simplejson, pycurl 
import sys, signal 
import twitter 
import tweepy 
import Queue 
import threading 
try: 
    import simplejson as json 
except ImportError: 
    import json 

import storm 
queue = Queue.Queue() 

class MyModelParser(tweepy.parsers.ModelParser): 
    def parse(self, method, payload): 
     result = super(MyModelParser, self).parse(method, payload) 
     result._payload = json.loads(payload) 
     return result 

class CustomStreamListener(tweepy.StreamListener): 
    ''' Handles data received from the stream. ''' 
    def __init__(self, api, q): 
     self.api = api 
     self.queue = q 
     self.queue.put('lalala') 

    def on_status(self, status): 
     self.queue.put('%s' % status.author.screen_name) 
     self.queue.task_done() 

    def on_error(self, status_code): 
     return True # To continue listening 

    def on_timeout(self): 
     return True # To continue listening 

class Starter(): 
    def __init__(self,q): 
     self.queue = q 
     hashtag = ['justinbieber','snooki','daddy_yankee','MikeTyson','iamdiddy','lala'] 
     auth = self.t_auth() 
     api = tweepy.API(auth, parser=MyModelParser()) 
     stream = tweepy.streaming.Stream(auth,CustomStreamListener(api,queue)) 
     stream.filter(follow=None, track=hashtag) 

    def t_auth(self): 
     consumer_key="" 
     consumer_secret="" 
     access_key = "" 
     access_secret = "" 

     auth = tweepy.OAuthHandler(consumer_key, consumer_secret) 
     auth.set_access_token(access_key, access_secret) 

     return auth 

class TwitterSpout(storm.Spout): 
    SPOUT_NAME = "TwitterSpout" 
    queue = queue 

    def initialize(self, conf, context): 
     self.pid = os.getpid()  
     try: 
      t = threading.Thread(target=Starter(self.queue)) 
      t.daemon=True 
      t.start()   

     except KeyboardInterrupt, e: 
      self.log('\n\nStopping') 
      raise 
+0

不知道這麼多蟒蛇。仍然在問,如果你只是試圖推進隊列,然後從那裏進行處理,它是否有效? – abhi 2013-05-04 10:07:53

回答

0

使用pyleus(https://github.com/Yelp/pyleus)和你的嘴實施應該有next_tuple(個體經營):這應該發出輸出領域在下面的例子;

from pyleus.storm import Spout 


class DummySpout(Spout): 

    OUTPUT_FIELDS = ['sentence', 'name'] 

    def initialize(self): 
     pass 

    def next_tuple(self): 
     self.emit(("This is a sentence.", "spout",)) 


if __name__ == '__main__': 
    DummySpout().run() 

然後寫你的螺栓;

from pyleus.storm import SimpleBolt 


class DummyBolt(SimpleBolt): 

OUTPUT_FIELDS = ['sentence'] 

def process_tuple(self, tup): 
    sentence, name = tup.values 
    new_sentence = "{0} says, \"{1}\"".format(name, sentence) 
    self.emit((new_sentence,), anchors=[tup]) 


if __name__ == '__main__': 
    DummyBolt().run() 

你也可以看看我如何使用它; https://github.com/Yelp/pyleus/issues/140