2017-06-16 24 views
0

我想使用這些變量來選擇兩個變量值範圍之間的條目。我的SQL查詢是String sql = "Select count(*) FROM Records WHERE event_time <= UPPERTIME('1') AND event_time >= LOWERTIME('1')";。在此查詢UPPERTIME('1')LOWERTIME('1')是UDF有定義Spark 2.1.1:如何將變量綁定到結構化流式查詢

spark.udf().register("LOWERTIME", new UDF1 < String, String >() { 
     @Override public String call(String lowertime) { 
      System.out.println("lowerTime="+lowerTime.toString()); 
      return lowerTime.toString(); 
      } 
      }, DataTypes.StringType); 


spark.udf().register("UPPERTIME", new UDF1 < String, String >() { 
     @Override public String call(String uppertime) { 
      System.out.println("upperTime="+upperTime.toString()); 
       return upperTime.toString(); 
      } 
      }, DataTypes.StringType); 

傳遞給UDF的參數是假,我真的返回了全局變量「upperTime和lowerTime」。

當我運行上面的查詢時,它顯示了表中所有條目的計數,但根據條件它應該顯示與條目數相對應的計數落在給定範圍內。出了什麼問題?

回答

0

我的猜測是你在Driver中更改了這些全局變量,執行者看不到這些更改。執行程序進程通常位於不同的節點上,並且導致它無法訪問另一個節點中另一個進程中的普通變量。

一般來說,在Spark中使用全局變量是一個壞主意。 Spark提供broadcast variables以在執行者和驅動程序之間共享變量。

+0

謝謝@zsxwing,但我想與我的SQL查詢綁定的變量不是隻讀的,而是我想在滿足特定條件後通過某個常量值增加它們。 – kadsank

+0

我不認爲您可以在Spark工作期間廣播您的更改。另外,你似乎假設處理數據的順序?這通常是錯誤的。 – zsxwing

相關問題