我試圖建立使用Apache弗林克,做下面的一個示例應用程序流數據來源:阿帕奇弗林克 - 從數據流中使用的值來動態地創建
- 讀取股票符號流(例如, 'CSCO','FB')來自卡夫卡隊列。
- 對於每個符號執行當前價格的實時查詢併爲下游處理傳送值。
*更新到原來的職位*
我移動地圖功能到一個單獨的類並沒有得到運行時錯誤消息「的MapFunction的實現是不可序列化了。該對象可能包含或引用不可序列化的字段「。
我現在面臨的問題是,我試圖寫價格的卡夫卡話題「stockprices」沒有收到它們。我試圖解決問題,並會發布任何更新。
public class RetrieveStockPrices {
@SuppressWarnings("serial")
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "stocks");
DataStream<String> streamOfStockSymbols = streamExecEnv.addSource(new FlinkKafkaConsumer08<String>("stocksymbol", new SimpleStringSchema(), properties));
DataStream<String> stockPrice =
streamOfStockSymbols
//get unique keys
.keyBy(new KeySelector<String, String>() {
@Override
public String getKey(String trend) throws Exception {
return trend;
}
})
//collect events over a window
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
//return the last event from the window...all elements are the same "Symbol"
.apply(new WindowFunction<String, String, String, TimeWindow>() {
@Override
public void apply(String key, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception {
out.collect(input.iterator().next().toString());
}
})
.map(new StockSymbolToPriceMapFunction());
streamExecEnv.execute("Retrieve Stock Prices");
}
}
public class StockSymbolToPriceMapFunction extends RichMapFunction<String, String> {
@Override
public String map(String stockSymbol) throws Exception {
final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
System.out.println("StockSymbolToPriceMapFunction: stockSymbol: " + stockSymbol);
DataStream<String> stockPrices = streamExecEnv.addSource(new LookupStockPrice(stockSymbol));
stockPrices.keyBy(new CustomKeySelector()).addSink(new FlinkKafkaProducer08<String>("localhost:9092", "stockprices", new SimpleStringSchema()));
return "100000";
}
private static class CustomKeySelector implements KeySelector<String, String> {
@Override
public String getKey(String arg0) throws Exception {
return arg0.trim();
}
}
}
public class LookupStockPrice extends RichSourceFunction<String> {
public String stockSymbol = null;
public boolean isRunning = true;
public LookupStockPrice(String inSymbol) {
stockSymbol = inSymbol;
}
@Override
public void open(Configuration parameters) throws Exception {
isRunning = true;
}
@Override
public void cancel() {
isRunning = false;
}
@Override
public void run(SourceFunction.SourceContext<String> ctx)
throws Exception {
String stockPrice = "0";
while (isRunning) {
//TODO: query Google Finance API
stockPrice = Integer.toString((new Random()).nextInt(100)+1);
ctx.collect(stockPrice);
Thread.sleep(10000);
}
}
}
是否有可能動態創建流以響應傳入數據? –
您可以實現一個'FlatMapFunction',它根據到達的記錄動態讀取和發出數據。例如,如果您有一個帶有文件名的流,可以使用'FlatMapFunction'打開這些文件併發送它們的數據。但是,所有記錄的輸出類型必須相同。另外,獲得事件時處理語義也許很有挑戰性,但這更多的是動態添加源的一般問題。 –
@FabianHueske我正在解決類似的用例。因此,如果我必須使用FlatMapFunction,那麼我們將不得不使用scala/Java中的普通File API來讀取文件,而不是使用Flink的readTextFile。原因是我們無法在flatMap中使用StreamExecutionEnvironment。我的理解是否正確? –