我得到一個json流,並且我想要計算機中每秒有狀態爲「待定」的項目數量。我怎麼做?到目前爲止,我的代碼如下:1)我不確定它是否正確。 2)它給我返回一個Dstream,但我的目標是每秒存儲一個數字給cassandra或隊列,或者你可以想象有功能public void store(Long number){}
。如何統計火花流中每秒的物品數量?
// #1
jsonMessagesDStream
.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String v1) throws Exception {
JsonParser parser = new JsonParser();
JsonObject jsonObj = parser.parse(v1).getAsJsonObject();
if (jsonObj != null && jsonObj.has("status")) {
return jsonObj.get("status").getAsString().equalsIgnoreCase("Pending");
}
return false;
}
}).countByValue().foreachRDD(new VoidFunction<JavaPairRDD<String, Long>>() {
@Override
public void call(JavaPairRDD<String, Long> stringLongJavaPairRDD) throws Exception {
store(stringLongJavaPairRDD.count());
}
});
試過以下:仍然沒有工作,因爲它始終打印零不知道它是否正確?堆棧跟蹤
16/09/10 17:51:39 INFO SparkContext: Starting job: count at Consumer.java:88
16/09/10 17:51:39 INFO DAGScheduler: Got job 17 (count at Consumer.java:88) with 4 output partitions
16/09/10 17:51:39 INFO DAGScheduler: Final stage: ResultStage 17 (count at Consumer.java:88)
16/09/10 17:51:39 INFO DAGScheduler: Parents of final stage: List()
16/09/10 17:51:39 INFO DAGScheduler: Missing parents: List()
16/09/10 17:51:39 INFO DAGScheduler: Submitting ResultStage 17 (MapPartitionsRDD[35] at filter at Consumer.java:72), which has no missing parents
BAR被打印,但不是foo
//Debug code
jsonMessagesDStream
.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String v1) throws Exception {
System.out.println("****************FOO******************");
JsonParser parser = new JsonParser();
JsonObject jsonObj = parser.parse(v1).getAsJsonObject();
if (jsonObj != null && jsonObj.has("status")) {
return jsonObj.get("status").getAsString().equalsIgnoreCase("Pending");
}
return false;
}
}).foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> stringJavaRDD) throws Exception {
System.out.println("*****************BAR******************");
store(stringJavaRDD.count());
}
});
嗨感謝您的迴應!我實際上想寫一條消息給隊列(哪個btw不是Kafka),但你能想象有一個公共無效存儲(Long number){},它將寫入一個單獨的隊列隊列。那麼你能告訴我,如果我每秒鐘都要調用這個存儲函數,代碼的外觀如何? – user1870400
您可以在流中使用foreach並調用lambda塊內的函數。 –
但是這個DStream(一系列RDD)代表了它在一秒鐘之後積累的數據,所以我不應該指望整個DStream嗎?另外,當你說每一個你認爲對於ERDRDD是對的嗎? – user1870400