2015-06-28 63 views
5

我是Spark和Spark Streaming的新手。我正在研究Twitter流式傳輸數據。我的任務涉及獨立處理每個Tweet,如計算每條推文中的單詞數。從我讀到的,每個輸入批處理在Spark Streaming的RDD上形成。因此,如果我給出2秒的批處理間隔,那麼新的RDD包含所有推文兩秒鐘,並且所應用的任何轉換將應用於整整兩秒的數據,並且在兩秒鐘內無法處理單個推文。我的理解是否正確?或者每條推文形成一個新的RDD?我有點困惑...Spark Streaming中的批量大小

回答

1

在一個批處理中,您有一個RDD包含所有狀態的間隔爲2秒。然後你可以單獨處理這些狀態。這裏是簡單的例子:

JavaDStream<Status> inputDStream = TwitterUtils.createStream(ctx, new OAuthAuthorization(builder.build()), filters); 

     inputDStream.foreach(new Function2<JavaRDD<Status>,Time,Void>(){ 
      @Override 
      public Void call(JavaRDD<Status> status, Time time) throws Exception { 
       List<Status> statuses=status.collect(); 
       for(Status st:statuses){ 
        System.out.println("STATUS:"+st.getText()+" user:"+st.getUser().getId());      
       //Process and store status somewhere 
       } 
       return null; 
      }});   
    ctx.start(); 
     ctx.awaitTermination();  
} 

我希望我沒有誤解你的問題。

卓然

+0

謝謝。如果我將狀態單獨存儲在列表中,是否可以應用列表中的所有RDD轉換或像reduceByKey(),countByValue這樣的操作?雖然我是Scala的新手,但我需要在Scala中完成。 – Naren

+0

我剛剛給你列出了一個例子,告訴你可以訪問各個狀態,但是如果你想使用spark來進一步處理它,你不應該收集狀態列表。例如,您可以實現inputDStream.mapToPair函數,該函數將通過某些鍵返回狀態,例如用戶ID或任何你需要的。那麼你可以減少BIOS密鑰。不幸的是,我只有Scala的基本知識,不能給你舉例,但是你可以在Java中做的所有事情,你也可以在Scala中做。 –

+0

我認爲可能是我可以將特定批次的狀態存儲在列表中,並使用parallelize()將該列表轉換爲RDD,以便我可以應用Spark轉換和操作。 – Naren