當我使用sparkSql接口從HDFS讀取數據時,某些任務會拋出java.lang.ArrayIndexOutOfBoundsException
。我認爲數據集中可能有一些不良記錄導致任務失敗。我怎樣才能得到不良記錄?或者當我使用Spark界面加載數據以使應用程序成功時,我該如何忽略不良記錄?如何解決spark unsafe.types.UTF8String.numBytesForFirstByte拋出java.lang.ArrayIndexOutOfBoundsException?
完整的錯誤日誌中的失敗的任務是貼在下面(這似乎是一些UTF8解碼錯誤):
17/06/17 23:02:19 ERROR Executor: Exception in task 42.0 in stage 0.0 (TID 42)
java.lang.ArrayIndexOutOfBoundsException: 62
at org.apache.spark.unsafe.types.UTF8String.numBytesForFirstByte(UTF8String.java:156)
at org.apache.spark.unsafe.types.UTF8String.numChars(UTF8String.java:171)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
我查找了utf8編碼和火花源代碼(貼在下面)。根據utf8編碼,utf8中一個字符的長度必須在1到6之間。所以最大可用編碼點是11111101b。所以Spark源代碼中的'offset'變量不能超過11111101b - 192 = 61。數據中應該有一些不合法的記錄,這對utf8編碼是非法的。
那麼我該如何選擇它們呢?或者我怎樣才能跳過壞記錄?
private static int[] bytesOfCodePointInUTF8 = {2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
4, 4, 4, 4, 4, 4, 4, 4,
5, 5, 5, 5,
6, 6};
private static int numBytesForFirstByte(final byte b) {
final int offset = (b & 0xFF) - 192;
return (offset >= 0) ? bytesOfCodePointInUTF8[offset] : 1;
}
什麼是數據集?你做什麼聚合? –