您應該查看Flink Asyncio,這將使您能夠在流式應用程序中以異步方式查詢您的web服務。
需要注意的一件事是,Asyncio函數不被稱爲多線程,並且每個分區每個分區被連續調用一次,因此您的Web應用程序需要確定性地返回並且可能快速返回以便不被佔用。
而且,潛在的更高的分區數量,將有助於你的情況,但再次你的web服務需要滿足足夠快
示例代碼塊的請求從Flinks網站:
// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)
/**
* An implementation of the 'AsyncFunction' that sends requests and sets the callback.
*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
/** The database specific client that can issue concurrent requests with callbacks */
private transient DatabaseClient client;
@Override
public void open(Configuration parameters) throws Exception {
client = new DatabaseClient(host, post, credentials);
}
@Override
public void close() throws Exception {
client.close();
}
@Override
public void asyncInvoke(final String str, final AsyncCollector<Tuple2<String, String>> asyncCollector) throws Exception {
// issue the asynchronous request, receive a future for result
Future<String> resultFuture = client.query(str);
// set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the collector
resultFuture.thenAccept((String result) -> {
asyncCollector.collect(Collections.singleton(new Tuple2<>(str, result)));
});
}
}
// create the original stream (In your case the stream you are mappartitioning)
DataStream<String> stream = ...;
// apply the async I/O transformation
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
編輯:
由於用戶想要創建大小爲100的批次,並且asyncio特定於Streaming API,因此最好的方法是創建大小爲100的countwindows。
此外,要清除最後一個窗口,可能沒有100個事件,自定義Triggers可與計數觸發器和基於時間的觸發器組合使用,以便在元素數或每隔幾分鐘後啓動觸發器。
一個很好的跟進,請在這裏Flink Mailing List,其中用戶「克斯特亞」創建一個自定義的觸發器,它可here
不幸的是這是在流API。我正在使用批處理API。我需要將這些項目收集到100個批次中 - 沒有看到在Streaming API中進行這種轉換的方法。 – Vineet
爲什麼不創建一個計數窗口,你的計數大小是100,並在那工作?更多信息在這裏:https://ci.apache.org/projects/flink/flink-docs-release-1。3/api/java/org/apache/flink/streaming/api/datastream/KeyedStream.html#countWindow-long- –
最後一個問題 - 當我收集100個批次時,最後一批會有剩餘的元素小於100)。我怎麼能爲此開啓窗戶? – Vineet