我已經編寫了一個代碼來使用kafka獲取twitter推文,它的工作正常,但它不適用於分區。我想創建一個話題3個分區..如何將值傳遞給分區類..任何地方我做錯了Kafka使用java API創建分區
public class kafkaSpoutFetchingRealTweets {
private String consumerKey;
private String consumerSecret;
private String accessToken;
private String accessTokenSecret;
private TwitterStream twitterStream;
/**
* @param contxt
*/
void start(final Context context) {
/** Producer properties **/
Properties props = new Properties();
props.put("metadata.broker.list",
context.getString(Constant.BROKER_LIST));
props.put("partitioner.class","SimplePartitioner");
props.put("serializer.class", context.getString(Constant.SERIALIZER));
props.put("request.required.acks",
context.getString(Constant.REQUIRED_ACKS));
props.put("producer.type", "async");
// props.put("partitioner.class", context.getClass());
ProducerConfig config = new ProducerConfig(props);
final Producer<String, String> producer = new Producer<String, String>(
config);
/** Twitter properties **/
consumerKey = context.getString(Constant.CONSUMER_KEY_KEY);
consumerSecret = context.getString(Constant.CONSUMER_SECRET_KEY);
accessToken = context.getString(Constant.ACCESS_TOKEN_KEY);
accessTokenSecret = context.getString(Constant.ACCESS_TOKEN_SECRET_KEY);
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setOAuthConsumerKey(consumerKey);
cb.setOAuthConsumerSecret(consumerSecret);
cb.setOAuthAccessToken(accessToken);
cb.setOAuthAccessTokenSecret(accessTokenSecret);
cb.setJSONStoreEnabled(true);
cb.setIncludeEntitiesEnabled(true);
twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
/** Twitter listener **/
StatusListener listener = new StatusListener() {
// The onStatus method is executed every time a new tweet comes
// in.
public void onStatus(Status status) {
if(("en".equals(status.getLang())) && ("en".equals(status.getUser().getLang()))){
KeyedMessage<String, String> data = new KeyedMessage<String, String>(
context.getString(Constant.data),
DataObjectFactory.getRawJSON(status));
producer.send(data);
System.out.println(DataObjectFactory.getRawJSON(status));
}
}
}
public void onDeletionNotice(
StatusDeletionNotice statusDeletionNotice) {
}
public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
}
public void onScrubGeo(long userId, long upToStatusId) {
}
public void onException(Exception ex) {
ex.printStackTrace();
logger.info("Shutting down Twitter sample stream...");
twitterStream.shutdown();
}
public void onStallWarning(StallWarning warning) {
System.out.println("stallWarning");
}
};
String[] lang = { "en" };
fq.language(lang);
twitterStream.addListener(listener);
twitterStream.sample();
}
public static void main(String[] args) {
try {
Context context = new Context(args[0]);
kafkaSpoutFetchingRealTweets tp = new kafkaSpoutFetchingRealTweets();
tp.start(context);
} catch (Exception e) {
e.printStackTrace();
logger.info(e.getMessage());
}
}
}
你上哪兒去閱讀,您可以通過Java API的創建分區?我讀過的所有內容都是無法通過API創建分區。 – morganw09dev
@ morganw09dev ...(http://fbi.wf/dir/Books/Tech/Learning%20Apache%20Kafka,%20Second%20Edition%20by%20Nishant%20Garg.pdf)...你可以在這裏找到.. 。我無法執行它 – Anji
我不打算閱讀整個PDF以幫助解決您的問題。通過Kafka協議,https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol,似乎不支持通過Java API創建分區。 – morganw09dev