2017-04-04 121 views
0

在這個問題上我需要你的幫助。我讀到,噴口負責讀取數據或準備在Bolt中進行處理。所以我寫了一些代碼在噴口中打開文件並逐行讀取噴口邏輯錯誤

class SimSpout(storm.Spout): 
    # Not much to do here for such a basic spout 
    def initialize(self, conf, context): 
    ## Open the file with read only permit 
     self.f = open('data.txt', 'r') 
    ## Read the first line 
     self._conf = conf 
     self._context = context 
     storm.logInfo("Spout instance starting...") 
    # Process the next tuple 
    def nextTuple(self): 
     # check if it reach at the EOF to close it 
     for line in self.f.readlines(): 
     # Emit a random sentence 
     storm.logInfo("Emiting %s" % line) 
     storm.emit([line]) 

# Start the spout when it's invoked 
SimSpout().run() 

是嗎?

回答

0

您正在編寫Spout,它在Storm中的職責是爲下游螺栓處理髮出元組。

Spout的nextTuple責任是每次調用它時發出一個事件。在你的代碼中,你正在發送文件中的所有行。如果你的單個元組是單行的。你應該保持一個偏移到文件和讀取 偏移線,放出,更新偏移=偏移+ 1 類似下面

class SimSpout(storm.Spout): 

    # Not much to do here for such a basic spout 
    def initialize(self, conf, context): 
    ## Open the file with read only permit 
    self.f = open('data.txt', 'r') 
    ## Read the first line 
    self._conf = conf 
    self._context = context 
    self._offset = 0 
    storm.logInfo("Spout instance starting...") 

# Process the next tuple 
def nextTuple(self): 
    # check if it reach at the EOF to close it 
    with open(self.f) as f: 
     f.readlines()[self._offset] 
     #Emit a random sentence 
     storm.logInfo("Emiting %s" % line) 
     storm.emit([line]) 
    self._offset = self._offset + 1 
+0

許多感謝回答,我得到這個「打開(self.f)作爲f: TypeError:強制爲Unicode:需要字符串或緩衝區,發現文件「 – user3188912

+0

我解決了這個錯誤,因爲我打開了兩次文件,但我得到另一個AttributeError:'SimSpout'對象沒有屬性'_offset'我尋找解決它,但沒有找到任何幫助 – user3188912