2017-02-14 108 views
1

我是Hazelcast的新手,我試圖用它來存儲地圖上的數據,該地圖比可能放在一臺機器上的地圖大得多。Hazelcast keySet串流?

我需要實現的一個過程是遍歷地圖中的每個值,並對它們做一些事情 - 不累積或聚合,我不需要一次查看所有數據,所以在那裏對此沒有記憶關注。

我的微不足道的實現是使用IMap.keySet(),然後迭代所有的鍵來輪流獲取每個存儲的值(並允許在處理後對值進行GC處理),但是我擔心會有這樣的情況發生系統中的大量數據甚至只是獲取密鑰列表都足以對系統造成不必要的壓力。

我一直希望有一個流式API,我可以通過這種方式讓本地節點不必在本地緩存整個集合 - 但未能找到任何看起來相關的東西在我的文檔中。

我希望你能提出任何建議。謝謝。

+0

我的反應對你有意義嗎? –

+0

嗨Guss。你有任何問題嗎?否則,請接受我的回答。謝謝 –

+0

感謝您的迴應,最終這不是我們一起工作的,但是由於該項目也被剔除,我沒有高度投資於:-)。對於未來的項目,我想看看Jet,你提供的介紹非常有用。 – Guss

回答

1

Hazelcast Jet提供j.u.s的分佈式版本,並增加了«流»功能到IMap。 它允許在Hazelcast集羣上執行Java Streams API。

import com.hazelcast.jet.JetInstance; 
import com.hazelcast.jet.stream.DistributedCollectors; 
import com.hazelcast.jet.stream.IStreamMap; 
import com.hazelcast.jet.stream.IStreamList; 

import static com.hazelcast.jet.stream.DistributedCollectors.toIList; 

    final IStreamMap<String, Integer> streamMap = instance1.getMap("source"); 
    // stream of entries, you can grab keys from it 
    IStreamList<String> counts = streamMap.stream() 
        .map(entry -> entry.getKey().toLowerCase()) 
        .filter(key -> key.length() >= 5) 
        .sorted() 
        // this will store the result on cluster as well 
        // so there is no data movement between client and cluster 
        .collect(toIList()); 

請找到噴氣here和更多的例子here更多信息。

乾杯, 維克

+0

條目流更好。問題:'.flatMap(o - > Stream.of(o.getSingleVal()))'似乎是多餘的?當你寫「在客戶端和集羣之間沒有數據移動」時,你的意思是什麼? - 是不是所有條目都流向我們運行該進程的客戶端? – Guss

+0

客戶端將發送與您的'j.u.s'管道描述的計算任務到我們的羣集。它將在羣集中執行,並將結果寫入羣集(檢查'toIList'收集方法) –

+0

我使用導入更新了片段 –

0

雖然Hazelcast急流實現看起來令人印象深刻,我沒有很多的時間在尋找升級到Hazelcast噴氣(投資於我們幾乎沼澤標準vert.x建立)。相反,我使用了IMap.executeOnEntries,這似乎與@Vik Gamov針對Hazelcast Jet詳細描述的內容大致相同,除了語法更煩人。

我的例子:

myMap.executeOnEntries(new EntryProcessor<String,MyEntity>(){ 
    private static final long serialVersionUID = 1L; 
    @Override 
    public Object process(Entry<String, MyEntity> entry) { 
     entry.getValue().fondle(); 
     return null; 
    } 
    @Override 
    public EntryBackupProcessor<String, MyEntity> getBackupProcessor() { 
     return null; 
    }}); 

正如你所看到的,語法是很煩人:

  1. 我們需要創建一個實際的對象,可序列化到集羣 - 沒有華麗的lambda表達式在這裏(不要使用我的序列號,如果你複製&粘貼 - 它的設計破壞)。
  2. 它不能作爲lambda的一個原因是界面不起作用 - 你需要另一種方法來處理備份副本(或者至少聲明你不想處理它們,就像我一樣),當我使用acknolwedge它的重要性,它在任何時候都不重要,我猜想它在極少數情況下是唯一重要的。
  3. 顯然你不能(或者至少它不是微不足道的)從流程中返回數據 - 這對我來說並不重要,但依然如此。