2017-06-18 79 views
1

當我使用sparkSql接口從HDFS讀取數據時,某些任務會拋出java.lang.ArrayIndexOutOfBoundsException。我認爲數據集中可能有一些不良記錄導致任務失敗。我怎樣才能得到不良記錄?或者當我使用Spark界面加載數據以使應用程序成功時,我該如何忽略不良記錄?如何解決spark unsafe.types.UTF8String.numBytesForFirstByte拋出java.lang.ArrayIndexOutOfBoundsException?

完整的錯誤日誌中的失敗的任務是貼在下面(這似乎是一些UTF8解碼錯誤):

17/06/17 23:02:19 ERROR Executor: Exception in task 42.0 in stage 0.0 (TID 42) 
java.lang.ArrayIndexOutOfBoundsException: 62 
    at org.apache.spark.unsafe.types.UTF8String.numBytesForFirstByte(UTF8String.java:156) 
    at org.apache.spark.unsafe.types.UTF8String.numChars(UTF8String.java:171) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

我查找了utf8編碼和火花源代碼(貼在下面)。根據utf8編碼,utf8中一個字符的長度必須在1到6之間。所以最大可用編碼點是11111101b。所以Spark源代碼中的'offset'變量不能超過11111101b - 192 = 61。數據中應該有一些不合法的記錄,這對utf8編碼是非法的。

那麼我該如何選擇它們呢?或者我怎樣才能跳過壞記錄?

private static int[] bytesOfCodePointInUTF8 = {2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 
    2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 
    3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 
    4, 4, 4, 4, 4, 4, 4, 4, 
    5, 5, 5, 5, 
    6, 6}; 


    private static int numBytesForFirstByte(final byte b) { 
    final int offset = (b & 0xFF) - 192; 
    return (offset >= 0) ? bytesOfCodePointInUTF8[offset] : 1; 
    } 
+0

什麼是數據集?你做什麼聚合? –

回答

0

你似乎(猜測通過agg_doAggregateWithKeys)使用類型化的數據集API。

我建議使用Dataset.rdd訪問底層RDD[InternalRow],並直接使用UnsafeRows查看可能導致問題的字符串。

請勿觸摸任何可使用編碼器轉換數據集的方法(因此您可避免使用UTF8String進行轉換)。

0

如果您正在尋找過濾掉不良記錄,並避免java.lang.ArrayIndexOutOfBoundsException,那麼你可以使用trycatch塊捕獲exceptions做例外什麼。這應該過濾出不良記錄。

但是避免這樣的Exception是不好的做法。您應該始終處理代碼中的任何類型的異常。並且try catch是捕獲此類錯誤並應用適當操作的最佳方法。

相關問題