1

DataFramesUDF避免競爭條件這裏的問題是你如何重新使用UDF的對象,但避免競爭狀態?星火1.5.2在對象重新使用

我使用的是UDF我的火花應用程序內和單元測試,由於競爭的條件似乎不確定性。有時,他們有時會通過他們失敗...

我試圖通過創建並將其傳遞給UDF求效益,以強制再利用的對象。然而,似乎共享相同的Spark Context和JVM的單獨「測試」正在使用這些對象並導致錯誤。

def reformatDate(input:String,sdfIn:SimpleDateFormat,sdfOut:SimpleDateFormat): String ={ 
    sdfOut.format(sdfIn.parse(input)) 
    } 

    val datePartitionFormat = new SimpleDateFormat("yyyyMMdd") 
    val dTStampFormat = new SimpleDateFormat("yyyy/MM/dd") 
    val validDateFormat = new SimpleDateFormat("yyyy-MM-dd") 

    val partitionToDateUDF = udf(reformatDate(_:String,datePartitionFormat,validDateFormat)) 
    val dTStampToDateUDF= udf(reformatDate(_:String,dTStampFormat,validDateFormat)) 

有時,當我跑我的單元測試,我得到以下錯誤使用此項功能:

17/01/13 11時45分45秒ERROR執行人:異常的任務0.0舞臺2.0 (TID 2)java.lang.NumberFormatException:多點在 sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1890) at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)at java.lang。 Double.parseDouble(Double.java:538) java.text.DigitList.getDouble(DigitList。 java:169) java.text.DecimalFormat.parse(DecimalFormat.java:2056)at java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:1867)at java.text.SimpleDateFormat.parse(SimpleDateFormat.java: 1514)在 java.text.DateFormat.parse(DateFormat.java:364)在 com.baesystems.ai.engineering.threatanalytics.microbatch.processor.transformers.metric.mDnsPreviouslySeenDomainsStartOfDayDF $ .reformatDate(mDnsPreviouslySeenDomainsStartOfDayDF.scala:22)

我使用的功能,像這樣:

val df = df2 
    .filter(
    datediff(
     to_date(partitionToDateUDF($"dt")) 
     ,to_date(dTStampToDate($"d_last_seen")) 
    ) < 90 
) 

並且在調試已發現了輸入「DF2」爲:

+-----------+--------+-------------------------+--------------------------------+ 
|d_last_seen|  dt|partitionToDateUDF($"dt")|dTStampToDateUDF($"d_last_seen")| 
+-----------+--------+-------------------------+--------------------------------+ 
| 2016/11/02|20161102|2016-11-02    |2016-11-02      | 
| 2016/11/01|20161102|2016-11-02    |2016-11-01      | 
+-----------+--------+-------------------------+--------------------------------+ 

我使用conf.setMaster(「本地[2]」),會不會是火花使用線程,並且因此共享相同的JVM運行時在本地,但是這種情況在部署時不會發生,因爲獨立的執行程序將擁有自己的JVM,因此它們擁有自己的對象實例?

回答

2

SimpleDateFormat是不是線程安全的(見例如Why is Java's SimpleDateFormat not thread-safe?)。這意味着,如果你在任何UDF使用它(即使是在一個星星之火工作),你可能會得到意想不到的結果,因爲火花將在幾個任務其在單獨線程多線程訪問它在結束了運行使用UDF同一時間。對於本地模式和實際分佈式集羣都是如此 - 單個副本將被每個執行程序上的多個線程使用。爲了克服這一點 - 只需使用不同的格式化程序,其中線程安全的,例如,喬達的DateTimeFormatter

+1

謝謝Tzach, 我想補充一點,這裏的整體問題的答案是: 由於多個任務在每個執行程序的多個線程上運行,因此您必須在Spark UDF中保持線程安全。 Tzach爲我提供的解決方案是線程安全的。 –