3

我developped風暴拓撲來接收從hortonworks卡夫卡經紀人JSONArray數據,我KafkaSpout不消耗從卡夫卡經紀人的消息在HDP

我不知道爲什麼我的kafkaSpout不消耗從卡夫卡經紀人的消息HDP,但風暴拓撲成功submited,但當我形象化拓撲:0%的數據已被消耗!

topology visualisation

這是我的計劃類:

public class ClientInfosSheme implements Scheme{ 
private static final long serialVersionUID = -2990121166902741545L; 
private static final Logger LOG = Logger.getLogger(ClientInfosSheme.class); 
public String codeBanque; 
public String codeAgence; 
public String codeGuichet; 
public String devise; 
public String numCompte; 
public String codeClient; 
public String codeOperation; 
public String sensOperation; 
public String montantOperation; 
public String dateValeur; 
public String dateComptable; 
public String utilisateur; 

public static final String CODEBANQUE="codeBanque"; 
public static final String CODEAGENCE="codeAgence"; 
public static final String CODEGUICHET="codeGuichet"; 
public static final String DEVISE="devise"; 
public static final String NUMCOMPTE="numCompte"; 
public static final String CODECLIENT="codeClient"; 
public static final String CODEOPERATION="codeOperation"; 
public static final String SENSOPERATION="sensOperation"; 
public static final String MONTANTOPERATION="montantOperation"; 
public static final String DATEVALEUR="dateValeur"; 
public static final String DATECOMPTABLE="dateComptable"; 
public static final String UTILISATEUR="utilisateur"; 

public List<Object> deserialize(byte[] bytes) { 

     try{ 
      String clientInfos = new String(bytes, "UTF-8"); 
       JSONArray JSON = new JSONArray(clientInfos); 
       for(int i=0;i<JSON.length();i++) { 
        JSONObject object_clientInfos=JSON.getJSONObject(i); 
       try{  

        //Récupérations des données 

         this.codeBanque=object_clientInfos.getString("codeBanque"); 
         this.codeAgence=object_clientInfos.getString("codeAgence"); 
         this.codeGuichet=object_clientInfos.getString("codeGuichet"); 
         this.devise=object_clientInfos.getString("devise"); 
         this.numCompte=object_clientInfos.getString("numCompte"); 
         this.codeClient=object_clientInfos.getString("codeClient"); 
         this.codeOperation=object_clientInfos.getString("codeOperation"); 
         this.sensOperation=object_clientInfos.getString("sensOperation"); 
         this.montantOperation=object_clientInfos.getString("montantOperation"); 
         this.dateValeur=object_clientInfos.getString("dateValeur"); 
         this.dateComptable=object_clientInfos.getString("dateComptable"); 
         this.utilisateur=object_clientInfos.getString("utilisateur"); 

        } 
        catch(Exception e) 
           { 
            e.printStackTrace(); 
           } 


    }// End For Loop 



     } catch (JSONException e1) { 
     // TODO Auto-generated catch block 
     e1.printStackTrace(); 
    } catch (UnsupportedEncodingException e1) { 
     // TODO Auto-generated catch block 
     e1.printStackTrace(); 
    } 
     return new Values(codeBanque, codeAgence, codeGuichet, devise, numCompte, codeClient, codeOperation, sensOperation, 
       montantOperation,dateValeur, dateComptable,utilisateur); 

}// End Function deserialize 

public Fields getOutputFields() { 
     return new Fields(CODEBANQUE,CODEAGENCE,CODEGUICHET,DEVISE,NUMCOMPTE, 
       CODECLIENT,CODEOPERATION, SENSOPERATION,MONTANTOPERATION,DATEVALEUR,DATECOMPTABLE,UTILISATEUR); 
    } 


} 

和屬性文件:

#Broker host 
kafka.zookeeper.host.port=sandbox.hortonworks.com 

#Kafka topic to consume. 
kafka.topic=INFOCLIENT 

#Location in ZK for the Kafka spout to store state. 
kafka.zkRoot=/client_infos_sprout 

#Kafka Spout Executors. 
spout.thread.count=1 

當我用另一消費者在卡夫卡經紀人storted數據,如:

[{"codeBanque":"xx","codeAgence":"yy","codeGuichet":"zz","devise":"tt"..}, 
{"codeBanque":"xx1","codeAgence":"yy1","codeGuichet":"zz1","devise":"tt1"..}, 
{"codeBanque":"xx2","codeAgence":"yy2","codeGuichet":"zz2","devise":"tt2"..}] 

所以我的問題爲什麼它不消費來自卡夫卡經紀人的消息?

請我需要幫助

+0

你有沒有仔細檢查正確的主題名稱,IP /主機名等?您是否檢查Storm和Kafka日誌以獲取錯誤消息? –

+0

嗨@ MatthiasJ.Sax我加倍檢查,我發現當我改變'#Broker主機':''kafka.zookeeper.host.port = 192.168.1.78:2181'我得到這個問題:_java.lang.RuntimeException:java .lang.IllegalArgumentException:a || b || c || calculCleRib(a,b,c)不存在於backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)_ –

+0

嗨,當我檢查暴風雨用戶界面時,我看到mssgs被髮射和傳輸,但沒有acked!我得到了這個消息: 在完成確認之前明確失敗或熄滅的元組數量。預計值0不會完成 –

回答

1

當你在日誌中已經發現,你的壺嘴沒有「消費」的消息,因爲該拓撲結構錯誤,不響應的元組 - 因此壺嘴將重播他們。這是按照設計工作的。

一旦你的拓撲結構穩定,你將觀察到增加的偏移量。在此之前,Spout會將消息發送到拓撲中,但您將無法觀察結果。

沒有看到calculCleRib方法,以及它如何集成到拓撲中,我們無法幫助您調試該方面。