0
初始化場我與Apache弗林克流API的一個問題。不能在自定義DataSink(弗林克CEP)
我可以設法建立整個CEP-環境與自定義數據源和源像使用標準的水槽時,「打印()」,一切工作正常。
這是我的片貌似現在:
@RequiredArgsConstructor
public class EventDataConsumer extends RichSinkFunction<EventData>{
private final transient Consumer<EventData> consumer;
@Override
public void invoke(EventData eventData) throws Exception {
consumer.accept(eventData);
}
}
我儘量做到IST,一個方法引用傳遞給該SinkFunction,這將在我的數據流中每個元素執行。
這是我如何初始化SinkFunction:
EventDataConsumer consumer = new EventDataConsumer(someService::handleEventData);
outStream.addSink(consumer);
我的問題是,當我把我的自定義接收的「調用」方法中設置斷點,消費者似乎是,即使我叫空這個構造函數明確地指定了使用者。
謝謝,這確實有助於理解問題。任何想法如何使一個方法引用可序列化? –
我的意思是消費者領域,對不起。我試圖創建一個SerializableConsumer接口,它擴展了Consumer和Serializable接口,但似乎不起作用。 –
嗯它應該這樣工作。你確定你刪除了'transient'關鍵字嗎?還要確保該方法是真正可序列化的。請記住,lambda通常有一些也需要可序列化的閉包。在你的例子中'someService'也需要可序列化。 –