2017-03-27 40 views
0

在hazelcast Jet上工作我的DAG時,我偶然發現了一個奇怪的問題。爲了檢查這個錯誤,我完全忽略了我的方法,並且:根據教程,似乎邊緣沒有工作。如何讓簡單的DAG在Hazelcast Jet中工作?

下面的代碼幾乎和它一樣簡單。兩個頂點(一個源,一個接收器),一個邊。

源代碼正在讀取地圖,接收器應放入地圖。

data.addEntryListener正確告訴我該地圖由另一個應用程序填充了100個列表(每個列表有400個字節的25個對象)......然後什麼也沒有。地圖填滿了,但是dag根本不與它互動。

任何想法在哪裏尋找問題?

package be.andersch.clusterbench; 

import com.fasterxml.jackson.databind.ObjectMapper; 
import com.hazelcast.config.Config; 
import com.hazelcast.config.SerializerConfig; 
import com.hazelcast.core.EntryEvent; 
import com.hazelcast.jet.*; 
import com.hazelcast.jet.config.JetConfig; 
import com.hazelcast.jet.stream.IStreamMap; 
import com.hazelcast.map.listener.EntryAddedListener; 
import be.andersch.anotherpackage.myObject; 

import java.util.List; 
import java.util.concurrent.ExecutionException; 

import static com.hazelcast.jet.Edge.between; 
import static com.hazelcast.jet.Processors.*; 

/** 
* Created by abernard on 24.03.2017. 
*/ 
public class Analyzer { 
    private static final ObjectMapper mapper = new ObjectMapper(); 
    private static JetInstance jet; 
    private static final IStreamMap<Long, List<String>> data; 
    private static final IStreamMap<Long, List<String>> testmap; 

    static { 
     JetConfig config = new JetConfig(); 
     Config hazelConfig = config.getHazelcastConfig(); 
     hazelConfig.getGroupConfig().setName("name").setPassword("password"); 
     hazelConfig.getNetworkConfig().getInterfaces().setEnabled(true).addInterface("my_IP_range_here"); 
     hazelConfig.getSerializationConfig().getSerializerConfigs().add(
       new SerializerConfig(). 
         setTypeClass(myObject.class). 
         setImplementation(new OsamKryoSerializer())); 
     jet = Jet.newJetInstance(config); 
     data = jet.getMap("data"); 
     testmap = jet.getMap("testmap"); 
    } 

    public static void main(String[] args) throws ExecutionException, InterruptedException { 

     DAG dag = new DAG(); 
     Vertex source = dag.newVertex("source", readMap("data")); 
     Vertex test = dag.newVertex("test", writeMap("testmap")); 

     dag.edge(between(source, test)); 

     jet.newJob(dag).execute()get(); 

     data.addEntryListener((EntryAddedListener<Long, List<String>>) (EntryEvent<Long, List<String>> entryEvent) -> { 
      System.out.println("Got data: " + entryEvent.getKey() + " at " + System.currentTimeMillis() + ", Size: " + jet.getHazelcastInstance().getMap("data").size()); 
     }, true); 

     testmap.addEntryListener((EntryAddedListener<Long, List<String>>) (EntryEvent<Long, List<String>> entryEvent) -> { 
      System.out.println("Got test: " + entryEvent.getKey() + " at " + System.currentTimeMillis()); 
     }, true); 

     Runtime.getRuntime().addShutdownHook(new Thread(() -> Jet.shutdownAll())); 
    } 
} 

回答

1

在甚至創建條目偵聽器之前,Jet作業已在線條jet.newJob(dag).execute().get()處完成。這意味着作業在空白地圖上運行。也許你的困惑是關於這個工作的性質:這是一個批處理工作,而不是一個無限的流處理。 Jet版本0.3尚不支持無限流處理。

+0

你也有一個建議如何解決它?我有一個帶有scheduleExcetor的IStreamMap來完成這項工作,但速度很慢。尼爾提出了一個DAG(這是有道理的),這就是我試圖這樣做的原因。 –

+1

可能會有一些微配料方案可以用於工作;否則團隊正在積極開發對真正的無限流處理的支持。 –

相關問題