2017-08-24 65 views
0

初始化場我與Apache弗林克流API的一個問題。不能在自定義DataSink(弗林克CEP)

我可以設法建立整個CEP-環境與自定義數據源和源像使用標準的水槽時,「打印()」,一切工作正常。

這是我的片貌似現在:

@RequiredArgsConstructor 
public class EventDataConsumer extends RichSinkFunction<EventData>{ 

private final transient Consumer<EventData> consumer; 

    @Override 
    public void invoke(EventData eventData) throws Exception { 
     consumer.accept(eventData); 
    } 
} 

我儘量做到IST,一個方法引用傳遞給該SinkFunction,這將在我的數據流中每個元素執行。

這是我如何初始化SinkFunction:

EventDataConsumer consumer = new EventDataConsumer(someService::handleEventData); 
outStream.addSink(consumer); 

我的問題是,當我把我的自定義接收的「調用」方法中設置斷點,消費者似乎是,即使我叫空這個構造函數明確地指定了使用者。

回答

1

作爲宿被分配給儘可能多的情況下,作爲接收它應該是可串行化的平行度。在集羣上執行時,Sink被序列化,發送到TaskManagers,在那裏進行反序列化。

在您的示例consumertransient,這就是爲什麼序列化之後變得null

+0

謝謝,這確實有助於理解問題。任何想法如何使一個方法引用可序列化? –

+0

我的意思是消費者領域,對不起。我試圖創建一個SerializableConsumer接口,它擴展了Consumer和Serializable接口,但似乎不起作用。 –

+0

嗯它應該這樣工作。你確定你刪除了'transient'關鍵字嗎?還要確保該方法是真正可序列化的。請記住,lambda通常有一些也需要可序列化的閉包。在你的例子中'someService'也需要可序列化。 –