2017-07-24 109 views
1

我剛開始學習Hazelcast Jet。 我的源代碼是UDP數據報。我想在Jet的某些節點處並行處理它,並通過'domain'將它們重新發送到其他地址。 我想使用Hazelcast IMDG IMap加載器來通過'source ip'獲取'域'。Hazelcast Jet處理器使用Hazelcast IMap

DAG dag = new DAG();   
Vertex source = dag.newVertex("datagram-source", 
       UdpSocketP.supplier("0.0.0.0", 41813)); 
     source.localParallelism(1); 

     Vertex mapper = dag.newVertex("map", 
       map(new DomainMapper(instance.getMap("mysqlNas")))); 

     Vertex sink = dag.newVertex("sink", 
       Sinks.writeFile("logs")); 
     sink.localParallelism(1); 

但是當我嘗試在DistributedFunction使用IMAP我得到異常

Exception in thread "main" java.lang.IllegalArgumentException: "metaSupplier" must be serializable 
    at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:185) 
    at com.hazelcast.jet.Vertex.<init>(Vertex.java:101) 
    at com.hazelcast.jet.Vertex.<init>(Vertex.java:78) 
    at com.hazelcast.jet.DAG.newVertex(DAG.java:79) 
    at org.eltex.softwlc.sorm.replicator.JetServer.main(JetServer.java:46) 
Caused by: java.io.NotSerializableException: com.hazelcast.jet.stream.impl.MapDecorator 

DomainMapper代碼:

package org.eltex.softwlc.sorm.replicator; 

import com.hazelcast.core.IMap; 
import com.hazelcast.jet.function.DistributedFunction; 

import java.io.Serializable; 
import java.net.DatagramPacket; 

/** 
* Created by mickey on 21.07.17. 
*/ 
public class DomainMapper implements DistributedFunction<DatagramPacket, IpData>, Serializable { 

    private final IMap<String, NasValue> map; 

    public DomainMapper(IMap<String, NasValue> map) { 
     this.map = map; 
    } 

    @Override 
    public IpData apply(DatagramPacket datagramPacket) { 
     final IpData d = new IpData(datagramPacket, datagramPacket.getAddress().getHostAddress()); 
     System.out.println(d); 

     final NasValue nasValue = map.get(datagramPacket.getAddress().getHostAddress()); 
     if (nasValue!=null) { 
      d.setDomain(nasValue.getDomain()); 
     } 

     return d; 
    } 
} 

什麼是我的錯? 或Hazelcast Jet是我的目的錯誤的選擇。

回答

2

問題是你試圖序列化函數內部的整個IMap。直接修復的方法是編寫一個定製處理器,通過它的init()方法訪問Hazelcast Jet實例,並從中查找其IMap。由於init()代碼在目標成員上執行,所有反序列化後,這將工作。

但是,在更一般的層面上,您的目標似乎是「數據豐富」類型。我們想要在Jet中支持這一點的方式是通過「散列連接」操作,該操作目前不是一流的;但是有一個代碼示例顯示了這種方法。您可以將整個IMap內容彙集到一個頂點,然後將它變成一個普通的HashMap並分發給所有富集處理器,或者您可以準備將​​由富集處理器直接使用的Hazelcast ReplicatedMap

第一種方法意味着您使用IMap的快照;在第二個中,您可以在作業正在運行時繼續更新ReplicatedMap

最好去檢查樣品:HashMapEnrichmentReplicatedMapEnrichment

+0

我實現了'DomainMapper擴展AbstractProcessor'並覆蓋'tryProcess'。 dag.newVertex(「map」,DomainMapper.supplier());工作正確。 – MGaidamak

+1

請注意,'IMap.get()'是一個阻塞調用(可能與遠程成員進行通信),所以通常在合作處理器中調用它會破壞它的合作性。您可以聲明處理器不合作,但性能仍會受損(與我在答案中提到的方法相反)。 –