2017-07-11 38 views
0

我有一個關於Ignite Streaming部分的問題。apache點燃數據流

我的理解是,它是將數據導入緩存的方式,但我也看到我們可以配置流接收器來應用其他一些自定義邏輯。

所以我嘗試創建一個類與接收器和一個類到數據流注入數據流(所以2個主要和2個Ignite實例在服務器模式),但我「只」有數據到流的緩存(沒有任何定製邏輯被處理到接收器中)。所以,我在問我是否錯過了一些東西,或者我不清楚什麼是Streams into Ignite。

如果我把發送器部分放入接收器中,我就完成了打印。

有誰知道我在做什麼(或理解)錯誤?

接收器類:

public class Receiver { 
    public static void main(String[] args){ 
     IgniteConfiguration igniteConfig = new IgniteConfiguration(); 
     CacheConfiguration<String, String> cacheConfig = new CacheConfiguration<>("CacheStream"); 


     igniteConfig.setCacheConfiguration(cacheConfig); 


     Ignite ignite = Ignition.getOrStart(igniteConfig); 

     IgniteDataStreamer<String, String> streamer = ignite.dataStreamer("CacheStream"); 

     streamer.receiver(StreamVisitor.from((cacheLambda, e) -> { 
      System.out.println("Value : " + e.getValue()); 
     })); 
    } 
} 

發件人類:

public class Sender { 
    public static void main(String[] args){ 
     IgniteConfiguration igniteConfig = new IgniteConfiguration(); 
     CacheConfiguration<String, String> cacheConfig = new CacheConfiguration<>("CacheStream"); 

     igniteConfig.setCacheConfiguration(cacheConfig); 

     Ignite ignite = Ignition.getOrStart(igniteConfig); 

     IgniteDataStreamer<String, String> streamer = ignite.dataStreamer("CacheStream"); 

     for(int i = 0 ; i < 10 ; i++){ 
      streamer.addData("key-"+i, "value-"+i); 
     } 
     streamer.flush(); 
    } 
} 

問候

回答

1

ignite.dataStreamer( 「CacheStream」)不返回您之前創建相同的數據流光,它每次都會創建新的數據播放器。

因此,就您而言,您配置了2個不同的數據播放器,並且您使用沒有配置接收器的流式傳輸器上傳數據。

+0

感謝您的快速回答。這就是我在想什麼......沒關係,我將使用消息傳遞部分。是否有使用Streams而不是其他功能的架構原因?我認爲Streams是緩存和計算或/和消息傳遞的組合,可以構建一個抽象層。它可以提供一種簡單的分佈式協調器,Ignite只需要這麼做...... –

+0

DataStreamer用於將大量連續的數據流注入Ignite緩存。它將在內部正確地將密鑰批量處理,並將這些批處理與將要緩存數據的節點進行搭配。但是,如果您想使用Receiver,仍然可以使用它,它將直接在數據將被緩存的節點上調用。 –

+0

謝謝你的回覆:)。我認爲我不太瞭解接收器的作用以及它們的工作原理,我將繼續深入文檔。再次感謝你。 –