-2
我們從Spark流中調用SparkSQL作業。我們正在收到併發異常,而卡夫卡用戶正在關閉錯誤。這裏是代碼和異常詳細信息:併發的KafkaConsumer異常對於多線程訪問不安全
卡夫卡消費者代碼
// Start reading messages from Kafka and get DStream
final JavaInputDStream<ConsumerRecord<String, byte[]>> consumerStream = KafkaUtils.createDirectStream(
getJavaStreamingContext(), LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, byte[]>Subscribe(SparkServiceConfParams.AIR.CONSUME_TOPICS,
sparkServiceConf.getKafkaConsumeParams()));
ThreadContext.put(Constants.CommonLiterals.LOGGER_UID_VAR, CommonUtils.loggerUniqueId());
// Decode each binary message and generate JSON array
JavaDStream<String> decodedStream = messagesStream.map(new Function<byte[], String>() {}
..
// publish generated json gzip to kafka
decodedStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(JavaRDD<String> jsonRdd4DF) throws Exception {
//Dataset<Row> json = sparkSession.read().json(jsonRdd4DF);
if(!jsonRdd4DF.isEmpty()) {
//JavaRDD<String> jsonRddDF = getJavaSparkContext().parallelize(jsonRdd4DF.collect());
Dataset<Row> json = sparkSession.read().json(jsonRdd4DF);
SparkAIRMainJsonProcessor airMainJsonProcessor = new SparkAIRMainJsonProcessor();
AIRDataSetBean processAIRData = airMainJsonProcessor.processAIRData(json, sparkSession);
錯誤詳細信息
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
最後卡夫卡消費封閉:
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException:
This consumer has already been closed.
調用卡夫卡用戶的代碼在哪裏? –
@YuvalItzchakov我添加了代碼 – Imran
你的問題是什麼?看起來像Spark中的一個錯誤... –