我剛開始學習Hazelcast Jet。 我的源代碼是UDP數據報。我想在Jet的某些節點處並行處理它,並通過'domain'將它們重新發送到其他地址。 我想使用Hazelcast IMDG IMap加載器來通過'source ip'獲取'域'。Hazelcast Jet處理器使用Hazelcast IMap
DAG dag = new DAG();
Vertex source = dag.newVertex("datagram-source",
UdpSocketP.supplier("0.0.0.0", 41813));
source.localParallelism(1);
Vertex mapper = dag.newVertex("map",
map(new DomainMapper(instance.getMap("mysqlNas"))));
Vertex sink = dag.newVertex("sink",
Sinks.writeFile("logs"));
sink.localParallelism(1);
但是當我嘗試在DistributedFunction使用IMAP我得到異常
Exception in thread "main" java.lang.IllegalArgumentException: "metaSupplier" must be serializable
at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:185)
at com.hazelcast.jet.Vertex.<init>(Vertex.java:101)
at com.hazelcast.jet.Vertex.<init>(Vertex.java:78)
at com.hazelcast.jet.DAG.newVertex(DAG.java:79)
at org.eltex.softwlc.sorm.replicator.JetServer.main(JetServer.java:46)
Caused by: java.io.NotSerializableException: com.hazelcast.jet.stream.impl.MapDecorator
DomainMapper代碼:
package org.eltex.softwlc.sorm.replicator;
import com.hazelcast.core.IMap;
import com.hazelcast.jet.function.DistributedFunction;
import java.io.Serializable;
import java.net.DatagramPacket;
/**
* Created by mickey on 21.07.17.
*/
public class DomainMapper implements DistributedFunction<DatagramPacket, IpData>, Serializable {
private final IMap<String, NasValue> map;
public DomainMapper(IMap<String, NasValue> map) {
this.map = map;
}
@Override
public IpData apply(DatagramPacket datagramPacket) {
final IpData d = new IpData(datagramPacket, datagramPacket.getAddress().getHostAddress());
System.out.println(d);
final NasValue nasValue = map.get(datagramPacket.getAddress().getHostAddress());
if (nasValue!=null) {
d.setDomain(nasValue.getDomain());
}
return d;
}
}
什麼是我的錯? 或Hazelcast Jet是我的目的錯誤的選擇。
我實現了'DomainMapper擴展AbstractProcessor'並覆蓋'tryProcess'。 dag.newVertex(「map」,DomainMapper.supplier());工作正確。 – MGaidamak
請注意,'IMap.get()'是一個阻塞調用(可能與遠程成員進行通信),所以通常在合作處理器中調用它會破壞它的合作性。您可以聲明處理器不合作,但性能仍會受損(與我在答案中提到的方法相反)。 –