2016-12-18 34 views
5

我試圖建立使用Apache弗林克,做下面的一個示例應用程序流數據來源:阿帕奇弗林克 - 從數據流中使用的值來動態地創建

  1. 讀取股票符號流(例如, 'CSCO','FB')來自卡夫卡隊列。
  2. 對於每個符號執行當前價格的實時查詢併爲下游處理傳送值。

*更新到原來的職位*

我移動地圖功能到一個單獨的類並沒有得到運行時錯誤消息「的MapFunction的實現是不可序列化了。該對象可能包含或引用不可序列化的字段「。

我現在面臨的問題是,我試圖寫價格的卡夫卡話題「stockprices」沒有收到它們。我試圖解決問題,並會發布任何更新。

public class RetrieveStockPrices { 
    @SuppressWarnings("serial") 
    public static void main(String[] args) throws Exception { 
     final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); 
     streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); 

     Properties properties = new Properties(); 
     properties.setProperty("bootstrap.servers", "localhost:9092"); 
     properties.setProperty("zookeeper.connect", "localhost:2181"); 
     properties.setProperty("group.id", "stocks"); 

     DataStream<String> streamOfStockSymbols = streamExecEnv.addSource(new FlinkKafkaConsumer08<String>("stocksymbol", new SimpleStringSchema(), properties)); 

     DataStream<String> stockPrice = 
      streamOfStockSymbols 
      //get unique keys 
      .keyBy(new KeySelector<String, String>() { 
       @Override 
       public String getKey(String trend) throws Exception { 
        return trend; 
       } 
       }) 
      //collect events over a window 
      .window(TumblingEventTimeWindows.of(Time.seconds(60))) 
      //return the last event from the window...all elements are the same "Symbol" 
      .apply(new WindowFunction<String, String, String, TimeWindow>() { 
       @Override 
       public void apply(String key, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception { 
        out.collect(input.iterator().next().toString()); 
       } 
      }) 
      .map(new StockSymbolToPriceMapFunction()); 

     streamExecEnv.execute("Retrieve Stock Prices"); 
    } 
} 

public class StockSymbolToPriceMapFunction extends RichMapFunction<String, String> { 
    @Override 
    public String map(String stockSymbol) throws Exception { 
     final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); 
     streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); 
     System.out.println("StockSymbolToPriceMapFunction: stockSymbol: " + stockSymbol); 

     DataStream<String> stockPrices = streamExecEnv.addSource(new LookupStockPrice(stockSymbol)); 
     stockPrices.keyBy(new CustomKeySelector()).addSink(new FlinkKafkaProducer08<String>("localhost:9092", "stockprices", new SimpleStringSchema())); 

     return "100000"; 
    } 

    private static class CustomKeySelector implements KeySelector<String, String> { 
     @Override 
     public String getKey(String arg0) throws Exception { 
      return arg0.trim(); 
     } 
    } 
} 


public class LookupStockPrice extends RichSourceFunction<String> { 
    public String stockSymbol = null; 
    public boolean isRunning = true; 

    public LookupStockPrice(String inSymbol) { 
      stockSymbol = inSymbol; 
    } 

    @Override 
    public void open(Configuration parameters) throws Exception { 
      isRunning = true; 
    } 


    @Override 
    public void cancel() { 
      isRunning = false; 
    } 

    @Override 
    public void run(SourceFunction.SourceContext<String> ctx) 
        throws Exception { 
      String stockPrice = "0"; 
      while (isRunning) { 
       //TODO: query Google Finance API 
       stockPrice = Integer.toString((new Random()).nextInt(100)+1); 
       ctx.collect(stockPrice); 
       Thread.sleep(10000); 
      } 
    } 
} 

回答

4

StreamExecutionEnvironment不縮進在流媒體應用程序的運營商內部使用。沒有打算的意思,這是沒有測試和鼓勵。它可能會工作並做某些事情,但很可能不會很好,可能會導致您的應用程序失效。

程序中的StockSymbolToPriceMapFunction爲每個傳入記錄指定一個全新的獨立新流應用程序。但是,由於您不調用streamExecEnv.execute(),因此程序未啓動,並且map方法無需執行任何操作就會返回。

如果呼叫streamExecEnv.execute(),該功能將開始在工人JVM一個新的本地弗林克集羣,啓動這個地方弗林克羣集上的應用程序。當地的Flink實例會佔用大量的堆空間,並且在幾個羣集啓動後,工作人員可能會因爲不想發生的OutOfMemoryError而死亡。

+0

是否有可能動態創建流以響應傳入數據? –

+0

您可以實現一個'FlatMapFunction',它根據到達的記錄動態讀取和發出數據。例如,如果您有一個帶有文件名的流,可以使用'FlatMapFunction'打開這些文件併發送它們的數據。但是,所有記錄的輸出類型必須相同。另外,獲得事件時處理語義也許很有挑戰性,但這更多的是動態添加源的一般問題。 –

+0

@FabianHueske我正在解決類似的用例。因此,如果我必須使用FlatMapFunction,那麼我們將不得不使用scala/Java中的普通File API來讀取文件,而不是使用Flink的readTextFile。原因是我們無法在flatMap中使用StreamExecutionEnvironment。我的理解是否正確? –

相關問題