在一個批處理中,您有一個RDD包含所有狀態的間隔爲2秒。然後你可以單獨處理這些狀態。這裏是簡單的例子:
JavaDStream<Status> inputDStream = TwitterUtils.createStream(ctx, new OAuthAuthorization(builder.build()), filters);
inputDStream.foreach(new Function2<JavaRDD<Status>,Time,Void>(){
@Override
public Void call(JavaRDD<Status> status, Time time) throws Exception {
List<Status> statuses=status.collect();
for(Status st:statuses){
System.out.println("STATUS:"+st.getText()+" user:"+st.getUser().getId());
//Process and store status somewhere
}
return null;
}});
ctx.start();
ctx.awaitTermination();
}
我希望我沒有誤解你的問題。
卓然
謝謝。如果我將狀態單獨存儲在列表中,是否可以應用列表中的所有RDD轉換或像reduceByKey(),countByValue這樣的操作?雖然我是Scala的新手,但我需要在Scala中完成。 – Naren
我剛剛給你列出了一個例子,告訴你可以訪問各個狀態,但是如果你想使用spark來進一步處理它,你不應該收集狀態列表。例如,您可以實現inputDStream.mapToPair函數,該函數將通過某些鍵返回狀態,例如用戶ID或任何你需要的。那麼你可以減少BIOS密鑰。不幸的是,我只有Scala的基本知識,不能給你舉例,但是你可以在Java中做的所有事情,你也可以在Scala中做。 –
我認爲可能是我可以將特定批次的狀態存儲在列表中,並使用parallelize()將該列表轉換爲RDD,以便我可以應用Spark轉換和操作。 – Naren