2016-09-10 56 views
0

我得到一個json流,並且我想要計算機中每秒有狀態爲「待定」的項目數量。我怎麼做?到目前爲止,我的代碼如下:1)我不確定它是否正確。 2)它給我返回一個Dstream,但我的目標是每秒存儲一個數字給cassandra或隊列,或者你可以想象有功能public void store(Long number){}如何統計火花流中每秒的物品數量?

// #1 
jsonMessagesDStream 
     .filter(new Function<String, Boolean>() { 
     @Override 
     public Boolean call(String v1) throws Exception { 
      JsonParser parser = new JsonParser(); 
      JsonObject jsonObj = parser.parse(v1).getAsJsonObject(); 
      if (jsonObj != null && jsonObj.has("status")) { 
       return jsonObj.get("status").getAsString().equalsIgnoreCase("Pending"); 
      } 
      return false; 
     } 
    }).countByValue().foreachRDD(new VoidFunction<JavaPairRDD<String, Long>>() { 
     @Override 
     public void call(JavaPairRDD<String, Long> stringLongJavaPairRDD) throws Exception { 
      store(stringLongJavaPairRDD.count()); 
     } 
    }); 

試過以下:仍然沒有工作,因爲它始終打印零不知道它是否正確?堆棧跟蹤

16/09/10 17:51:39 INFO SparkContext: Starting job: count at Consumer.java:88 
16/09/10 17:51:39 INFO DAGScheduler: Got job 17 (count at Consumer.java:88) with 4 output partitions 
16/09/10 17:51:39 INFO DAGScheduler: Final stage: ResultStage 17 (count at Consumer.java:88) 
16/09/10 17:51:39 INFO DAGScheduler: Parents of final stage: List() 
16/09/10 17:51:39 INFO DAGScheduler: Missing parents: List() 
16/09/10 17:51:39 INFO DAGScheduler: Submitting ResultStage 17 (MapPartitionsRDD[35] at filter at Consumer.java:72), which has no missing parents 

BAR被打印,但不是foo

//Debug code 
jsonMessagesDStream 
     .filter(new Function<String, Boolean>() { 
     @Override 
     public Boolean call(String v1) throws Exception { 
      System.out.println("****************FOO******************"); 
      JsonParser parser = new JsonParser(); 
      JsonObject jsonObj = parser.parse(v1).getAsJsonObject(); 
      if (jsonObj != null && jsonObj.has("status")) { 
       return jsonObj.get("status").getAsString().equalsIgnoreCase("Pending"); 
      } 
      return false; 
     } 
    }).foreachRDD(new VoidFunction<JavaRDD<String>>() { 
     @Override 
     public void call(JavaRDD<String> stringJavaRDD) throws Exception { 
      System.out.println("*****************BAR******************"); 
      store(stringJavaRDD.count()); 
     } 
    }); 

回答

1

既然你已經過濾結果集的

 // #2 
    jsonMessagesDStream 
     .filter(new Function<String, Boolean>() { 
     @Override 
     public Boolean call(String v1) throws Exception { 
      JsonParser parser = new JsonParser(); 
      JsonObject jsonObj = parser.parse(v1).getAsJsonObject(); 
      if (jsonObj != null && jsonObj.has("status")) { 
       return jsonObj.get("status").getAsString().equalsIgnoreCase("Pending"); 
      } 
      return false; 
     } 
    }).foreachRDD(new VoidFunction<JavaRDD<String>>() { 
     @Override 
     public void call(JavaRDD<String> stringJavaRDD) throws Exception { 
      store(stringJavaRDD.count()); 
     } 
    }); 

部分,你可能只是做一個數()在DStream/RDD上。

另外我不認爲你會需要在這裏開窗,如果你每秒鐘從源頭閱讀。當微量批處理間隔與聚合頻率不匹配時,需要窗口化。您是否正在尋找小於一秒的微批次頻率?

它返回我DSTREAM,但我的目標是要存儲號碼,每秒卡桑德拉或隊列

星火的工作原理是,它提供了DSTREAM每次在現有DSTREAM做了計算時間的方式。這樣你可以輕鬆地將功能鏈接在一起。您還應該意識到Spark中轉換和操作之間的區別。像filter(),count()等函數是轉換,就是說它們在DStream上運行並給出一個新的DStream。但是,如果您需要副作用(如打印,推送到數據庫等),則應該查看Spark操作。

如果您需要將DStream推送到cassandra,您應該查看cassandra連接器,這些連接器將顯示可用於將數據推送到cassandra中的函數(Spark術語中的操作)。

+0

嗨感謝您的迴應!我實際上想寫一條消息給隊列(哪個btw不是Kafka),但你能想象有一個公共無效存儲(Long number){},它將寫入一個單獨的隊列隊列。那麼你能告訴我,如果我每秒鐘都要調用這個存儲函數,代碼的外觀如何? – user1870400

+0

您可以在流中使用foreach並調用lambda塊內的函數。 –

+0

但是這個DStream(一系列RDD)代表了它在一秒鐘之後積累的數據,所以我不應該指望整個DStream嗎?另外,當你說每一個你認爲對於ERDRDD是對的嗎? – user1870400

1

無論批處理間隔如何,您都可以使用1秒的滑動窗口以及reduceByKey函數。一旦您選擇了1秒的幻燈片間隔時間,您將會每秒收到一次商店電話事件。

+0

您的意思是.window(新的Duration(1000),新的Duration(1000))。reduceByKey()?沒有過濾器 ?或者你的意思是.window(新的持續時間(1000),新的持續時間(1000))。filter()。reduceByKey()? – user1870400

+0

不知道這是怎麼回事? – user1870400

+0

@ user1870400,忘掉reduceByKeyAndWindow功能,當你只需要計算值,那麼你可以使用countByValueAndWindow功能,如下所示: 'filteredDstream.countByValueAndWindow(Durations.seconds(5), Durations.seconds(1) ).foreachRDD(新 VoidFunction >(){ @Override 公共無效呼叫(JavaPairRDD <字符串,龍> stringLongJavaPairRDD)拋出異常{ 存儲(stringLongJavaPairRDD.count());} });' – Hokam