2017-09-22 30 views
0

下面的簡單程序從kafka流中讀取並每隔5分鐘寫入一次CSV文件及其火花流。在「驅動程序」(不在執行程序中)中進行微批處理後,是否可以調用Java函數?在Spark流中的每個微批次之後調用java函數

我同意它不是一個很好的習慣來調用流中的任意代碼,但這是我們的數據量很低的特殊情況。請adivse。謝謝。

public static void main(String[] args) throws Exception { 

    if (args.length == 0) 
     throw new Exception("Usage program configFilename"); 
    String configFilename = args[0]; 

    addShutdownHook(); 

    ConfigLoader.loadConfig(configFilename); 
    sparkSession = SparkSession 
      .builder() 
      .appName(TestKafka.class.getName()) 
      .master(ConfigLoader.getValue("master")).getOrCreate(); 
    SparkContext context = sparkSession.sparkContext(); 
    context.setLogLevel(ConfigLoader.getValue("logLevel")); 

    SQLContext sqlCtx = sparkSession.sqlContext(); 
    System.out.println("Spark context established"); 

    DataStreamReader kafkaDataStreamReader = sparkSession.readStream() 
      .format("kafka") 
      .option("kafka.bootstrap.servers", ConfigLoader.getValue("brokers")) 
      .option("group.id", ConfigLoader.getValue("groupId")) 
      .option("subscribe", ConfigLoader.getValue("topics")) 
      .option("failOnDataLoss", false); 
    Dataset<Row> rawDataSet = kafkaDataStreamReader.load(); 
    rawDataSet.printSchema(); 
    rawDataSet.createOrReplaceTempView("rawEventView1"); 

    rawDataSet = rawDataSet.withColumn("rawEventValue", rawDataSet.col("value").cast("string")); 
    rawDataSet.printSchema(); 
    rawDataSet.createOrReplaceTempView("eventView1"); 
    sqlCtx.sql("select * from eventView1") 
      .writeStream() 
      .format("csv") 
      .option("header", "true") 
      .option("delimiter", "~") 
      .option("checkpointLocation", ConfigLoader.getValue("checkpointPath")) 
      .option("path", ConfigLoader.getValue("recordsPath")) 
      .outputMode(OutputMode.Append()) 
      .trigger(ProcessingTime.create(Integer.parseInt(ConfigLoader.getValue("kafkaProcessingTime")) 
        , TimeUnit.SECONDS)) 
      .start() 
      .awaitTermination(); 
} 
+0

您想運行哪種代碼?它是一種副作用,因爲它不返回任何值?爲什麼它必須在司機中發生? – raam86

+0

這可能類似於通過電子郵件通知微量批次完成, – Manjesh

回答

0

你應該能夠做到這一點的是這樣的:

kafkaDataStreamReader.map{value -> mySideEffect(); value} 

這將每次從卡夫卡收到微量分批時間調用函數mySideEffect,怎麼過的我不推薦這樣做,更好的方法是觀看保存CSV文件的文件夾,或者僅僅檢查網絡用戶界面,考慮到每隔幾秒發生一次微量批次,您最多隻會收到一封電子郵件。如果您想確保流式傳輸應用程序已啓動,您可以每隔幾秒查詢一次Spark REST API並確保它仍然運行。 https://spark.apache.org/docs/latest/monitoring.html

相關問題