2

嘗試運行我的Flink流應用程序時出現以下錯誤。Apache Flink Kakfa XML流

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:822) 
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768) 
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768) 
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) 
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) 
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) 
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) 
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of com.test.SwissProt: no suitable constructor found, can not deserialize from Object value (missing default constructor or creator, or perhaps need to add/enable type information?) 
at [Source: [[email protected]; line: 1, column: 12] 
at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:261) 
at com.fasterxml.jackson.databind.DeserializationContext.instantiationException(DeserializationContext.java:1456) 
at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1012) 
at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1203) 
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:314) 
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:148) 
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3789) 
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2920) 
at com.test.SwissProtDeserializationSchema.deserialize(SwissProtDeserializationSchema.scala:17) 
at com.test.SwissProtDeserializationSchema.deserialize(SwissProtDeserializationSchema.scala:9) 
at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:39) 
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:227) 
at java.lang.Thread.run(Thread.java:745) 

我想在Scala中,當你創建一個case類時,默認的構造函數被創建了嗎?我不明白這個錯誤。請幫忙!

我有以下斯卡拉對象:

主要斯卡拉對象運行弗林克流

package com.test 

import java.util.Properties 

import org.apache.flink.api.scala._ 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 

object Run { 

    def main(args: Array[String]): Unit = { 
    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    val properties = new Properties() 
    properties.setProperty("bootstrap.servers", "localhost:9092") 
    properties.setProperty("group.id", "test") 
    val rawStream = env.addSource(new FlinkKafkaConsumer09("XML", new SwissProtDeserializationSchema,properties)) 

    rawStream.print 
    env.execute() 
    } 

} 

案例類,它描述了輸入

package com.test 

case class SwissProt (name: String, 
         address: String, 
         phoneNumber: String, 
         cellPhoneNumber: String 
         ) { 

} 

最後反序列化提取卡夫卡事件的課程n要我的情況下,類對象

package com.test 

import org.apache.flink.streaming.util.serialization.AbstractDeserializationSchema 
import com.fasterxml.jackson.dataformat.xml.XmlMapper 

class SwissProtDeserializationSchema extends AbstractDeserializationSchema[SwissProt]{ 
    private var xmlMapper: XmlMapper = null 

    override def deserialize(bytes: Array[Byte]): SwissProt = { 
    if (xmlMapper == null) { 
     xmlMapper = new XmlMapper() 
    } 

    xmlMapper.readValue(bytes, classOf[SwissProt]) 
    } 
} 
+2

編譯器合成的同伴對象中的「工廠」的方法,但我不認爲它也是合成一個默認的構造函數(Java中的,不帶參數的構造函數)。看起來你正在使用的庫需要遵守Bean約定,因此如果你想在Scala中編寫它,我建議你用'var's創建一個普通的類,用@BeanProperty(http:// www.scala-lang.org/files/archive/spec/2.11/11-annotations.html#java-beans- annotations)並添加一個默認構造函數(http://stackoverflow.com/questions/6874329/scala-constructor-沒有參數)。 – stefanobaghino

+1

這是信息。你讓我仔細看看jackson-dataformat-xml的註釋。原來,我需要爲我的幾個字段使用@JacksonXmlProperty。另外,我還需要一個額外的案例班才能進入我的原始案例班。我會在完全滿意的時候發佈最後一堂課。再次感謝! – Crackerman

回答

0

你只需要註冊DefaultScalaModule。 所以

xmlMapper.registerModule(DefaultScalaModule) 

The scala import : 
    import com.fasterxml.jackson.module.scala.DefaultScalaModule 

The maven dependency: 
      <dependency> 
       <groupId>com.fasterxml.jackson.module</groupId> 
       <artifactId>jackson-module-scala_2.11</artifactId> 
       <version>2.6.5</version> 
      </dependency>