2017-10-10 60 views
0

我已經與下列mapPartition功能的程序異步Web服務調用:阿帕奇弗林克:做正確的MapReduce內()

public void mapPartition(Iterable<Tuple> values, Collector<Tuple2<Integer, String>> out) 

我收集的100從輸入values &分批送他們到一個web服務進行轉換。結果我加回到out集合。

爲了加快這個過程,我通過使用Executors做了網絡服務調用async。這造成了問題,要麼我得到taskManager released exceptionAskTimeoutException。我增加了內存&超時,但它沒有幫助。有相當多的輸入數據。我相信這導致了很多工作排在了ExecutorService &之間,因此佔用了大量內存。

什麼是最好的方法呢?

我也在看taskManager vs taskSlot的配置,但對這兩者之間的區別有些困惑(我猜他們與進程vs線程相似?)。不知道我在什麼時候增加taskManagers vs taskSlots?例如如果我有三臺每臺機器有4cpus的機器,那麼我的taskManager=3應該是我的taskSlot=4

我還在考慮僅增加mapPartition的並行度來說10以獲得更多的線程來訪問web服務。意見或建議?

回答

1

您應該查看Flink Asyncio,這將使您能夠在流式應用程序中以異步方式查詢您的web服務。

需要注意的一件事是,Asyncio函數不被稱爲多線程,並且每個分區每個分區被連續調用一次,因此您的Web應用程序需要確定性地返回並且可能快速返回以便不被佔用。

而且,潛在的更高的分區數量,將有助於你的情況,但再次你的web服務需要滿足足夠快

示例代碼塊的請求從Flinks網站:

// This example implements the asynchronous request and callback with Futures that have the 
// interface of Java 8's futures (which is the same one followed by Flink's Future) 

/** 
* An implementation of the 'AsyncFunction' that sends requests and sets the callback. 
*/ 
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> { 

/** The database specific client that can issue concurrent requests with callbacks */ 
private transient DatabaseClient client; 

@Override 
public void open(Configuration parameters) throws Exception { 
    client = new DatabaseClient(host, post, credentials); 
} 

@Override 
public void close() throws Exception { 
    client.close(); 
} 

@Override 
public void asyncInvoke(final String str, final AsyncCollector<Tuple2<String, String>> asyncCollector) throws Exception { 

    // issue the asynchronous request, receive a future for result 
    Future<String> resultFuture = client.query(str); 

    // set the callback to be executed once the request by the client is complete 
    // the callback simply forwards the result to the collector 
    resultFuture.thenAccept((String result) -> { 

     asyncCollector.collect(Collections.singleton(new Tuple2<>(str, result))); 

    }); 
    } 
} 

// create the original stream (In your case the stream you are mappartitioning) 
DataStream<String> stream = ...; 

// apply the async I/O transformation 
DataStream<Tuple2<String, String>> resultStream = 
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100); 

編輯:

由於用戶想要創建大小爲100的批次,並且asyncio特定於Streaming API,因此最好的方法是創建大小爲100的countwindows。

此外,要清除最後一個窗口,可能沒有100個事件,自定義Triggers可與計數觸發器和基於時間的觸發器組合使用,以便在元素數或每隔幾分鐘後啓動觸發器。

一個很好的跟進,請在這裏Flink Mailing List,其中用戶「克斯特亞」創建一個自定義的觸發器,它可here

+0

不幸的是這是在流API。我正在使用批處理API。我需要將這些項目收集到100個批次中 - 沒有看到在Streaming API中進行這種轉換的方法。 – Vineet

+0

爲什麼不創建一個計數窗口,你的計數大小是100,並在那工作?更多信息在這裏:https://ci.apache.org/projects/flink/flink-docs-release-1。3/api/java/org/apache/flink/streaming/api/datastream/KeyedStream.html#countWindow-long- –

+0

最後一個問題 - 當我收集100個批次時,最後一批會有剩餘的元素小於100)。我怎麼能爲此開啓窗戶? – Vineet

相關問題