2015-12-07 83 views
-1

我已經編寫了一個代碼來使用kafka獲取twitter推文,它的工作正常,但它不適用於分區。我想創建一個話題3個分區..如何將值傳遞給分區類..任何地方我做錯了Kafka使用java API創建分區

public class kafkaSpoutFetchingRealTweets { 


private String consumerKey; 
private String consumerSecret; 
private String accessToken; 
private String accessTokenSecret; 
private TwitterStream twitterStream; 

/** 
* @param contxt 
*/ 
void start(final Context context) { 

    /** Producer properties **/ 
    Properties props = new Properties(); 
    props.put("metadata.broker.list", 
      context.getString(Constant.BROKER_LIST)); 
    props.put("partitioner.class","SimplePartitioner"); 
    props.put("serializer.class", context.getString(Constant.SERIALIZER)); 
    props.put("request.required.acks", 
      context.getString(Constant.REQUIRED_ACKS)); 
    props.put("producer.type", "async"); 
    // props.put("partitioner.class", context.getClass()); 
    ProducerConfig config = new ProducerConfig(props); 

    final Producer<String, String> producer = new Producer<String, String>(
      config); 

    /** Twitter properties **/ 
    consumerKey = context.getString(Constant.CONSUMER_KEY_KEY); 
    consumerSecret = context.getString(Constant.CONSUMER_SECRET_KEY); 
    accessToken = context.getString(Constant.ACCESS_TOKEN_KEY); 
    accessTokenSecret = context.getString(Constant.ACCESS_TOKEN_SECRET_KEY); 

    ConfigurationBuilder cb = new ConfigurationBuilder(); 
    cb.setOAuthConsumerKey(consumerKey); 
    cb.setOAuthConsumerSecret(consumerSecret); 
    cb.setOAuthAccessToken(accessToken); 
    cb.setOAuthAccessTokenSecret(accessTokenSecret); 
    cb.setJSONStoreEnabled(true); 
    cb.setIncludeEntitiesEnabled(true); 

    twitterStream = new TwitterStreamFactory(cb.build()).getInstance(); 

    /** Twitter listener **/ 
    StatusListener listener = new StatusListener() { 
     // The onStatus method is executed every time a new tweet comes 
     // in. 
     public void onStatus(Status status) { 


      if(("en".equals(status.getLang())) && ("en".equals(status.getUser().getLang()))){ 

       KeyedMessage<String, String> data = new KeyedMessage<String, String>(
         context.getString(Constant.data), 
         DataObjectFactory.getRawJSON(status)); 
       producer.send(data); 
       System.out.println(DataObjectFactory.getRawJSON(status)); 

      } 
     } 
     } 


     public void onDeletionNotice(
       StatusDeletionNotice statusDeletionNotice) { 
     } 

     public void onTrackLimitationNotice(int numberOfLimitedStatuses) { 
     } 

     public void onScrubGeo(long userId, long upToStatusId) { 
     } 

     public void onException(Exception ex) { 
      ex.printStackTrace(); 
      logger.info("Shutting down Twitter sample stream..."); 
      twitterStream.shutdown(); 
     } 

     public void onStallWarning(StallWarning warning) { 
      System.out.println("stallWarning"); 
     } 
    }; 


    String[] lang = { "en" }; 
    fq.language(lang); 
    twitterStream.addListener(listener); 
    twitterStream.sample(); 

} 

public static void main(String[] args) { 
    try { 

     Context context = new Context(args[0]); 
     kafkaSpoutFetchingRealTweets tp = new kafkaSpoutFetchingRealTweets(); 
     tp.start(context); 

    } catch (Exception e) { 
     e.printStackTrace(); 
     logger.info(e.getMessage()); 
    } 

} 

}

+0

你上哪兒去閱讀,您可以通過Java API的創建分區?我讀過的所有內容都是無法通過API創建分區。 – morganw09dev

+0

@ morganw09dev ...(http://fbi.wf/dir/Books/Tech/Learning%20Apache%20Kafka,%20Second%20Edition%20by%20Nishant%20Garg.pdf)...你可以在這裏找到.. 。我無法執行它 – Anji

+0

我不打算閱讀整個PDF以幫助解決您的問題。通過Kafka協議,https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol,似乎不支持通過Java API創建分區。 – morganw09dev

回答

0

建議,以便有幾個問題。

  • 你的問題和代碼不匹配。你的問題是關於用3個分區創建一個主題。但是,您提供的代碼和示例說明了如何確定將消息發送到哪個分區,因爲您已經創建了包含3個分區的主題。
  • 如果您實際上想要創建包含3個分區的主題,則需要使用命令行客戶端。可以在這裏找到一個示例,http://kafka.apache.org/documentation.html#quickstart
  • 如果您確實想確定您需要發送數據的分區。您需要提供有關您遇到的實際問題的更多信息?他們都去了同一個分區嗎?然後你需要看看你在你的config中指定的SimplePartitioner類中如何計算分區。什麼是SimplePartitioner類?

    props.put("partitioner.class","SimplePartitioner"); 
    
+0

@Matthias J. Sax ...哦..看起來我很困惑你..其實我想創建一個kafkaSpout,它將Twitter推文3個分區。所以在暴風雨中,我可以將噴口Parallelism定義爲3.實際上,我在暴風雨噴口中有大量的延遲。爲了在一篇文章中讀到的最佳性能調整增加了噴口的並行性。所以我想創建一個帶有3個分區的kafkaSpout以便我可以從風暴中的這些分區中獲取推文。 – Anji

+0

什麼?我不是Matthias J. Sax?是的,你讓我感到困惑。 – morganw09dev

+0

我知道了......這意味着什麼...... 1.創建上面的kafkaSpout 2.從命令行創建我們需要的分區的主題名稱3.執行kafkaSpout jar。 – Anji