2017-09-25 128 views
1

我有,我用我的項目的一些卡夫卡頻道層次:斯卡拉特質類型不匹配

我的基本特點是:

trait SendChannel[A, B] extends CommunicationChannel { 
    def send(data:A): B 
} 

現在我有一個共同的卡夫卡發送通道像

trait CommonKafkaSendChannel[A, B, Return] extends SendChannel[A, Return] { 
val channelProps: KafkaSendChannelProperties 
val kafkaProducer: Producer[String, B] 
override def close(): Unit = kafkaProducer.close() 
} 

現在有2個CommanKafkaSendChannel變體,一個是回調,一個是Future:

trait KafkaSendChannelWithFuture[A, B] extends CommonKafkaSendChannel[A, B, Future[RecordMetadata]] { 
override def send(data: A): Future[RecordMetadata] = Future { 
    kafkaProducer.send(new ProducerRecord[String, B](channelProps.topic)).get 
} 
} 

KafkaSendChannelWithCallback定義:

object KafkaSendChannelWithCallback { 

def apply[A, B](oChannelProps: KafkaSendChannelProperties, 
       oKafkaProducer: Producer[String, B], 
       oCallback: Callback): KafkaSendChannelWithCallback[A, B] = 
new KafkaSendChannelWithCallback[A,B] { 
    override val channelProps: KafkaSendChannelProperties = oChannelProps 
    override val kafkaProducer: Producer[String, B] = oKafkaProducer 
    override val callback: Callback = oCallback 
} 
} 

trait KafkaSendChannelWithCallback[A, B] extends CommonKafkaSendChannel[A, B, Unit] { 

    val callback: Callback 

override def send(data: A): Unit = 
kafkaProducer.send(new ProducerRecord[String, B](channelProps.topic), callback) 
} 

現在基於所述配置值I選擇通道的上下面等運行時適當的類型。我創建右信道類型的演員,將數據發送到卡夫卡:

val sendChannel = kafkaChannel.channel(config, actorSystem).fold(
    error => { 
     logger.error("Exception while instantiating the KafkaSendChannel") 
     throw error 
    }, 
    success => success 
) 

actor = actorSystem.actorOf(IngestionActor.props(config, sendChannel), name = ACTOR_NAME) 

演員的定義:

object IngestionRouterActor { 
    def props[V](config: Config, sendChannel: SendChannel[V, Unit]): Props = 
Props(classOf[IngestionActor[V]], config, sendChannel) 
} 

問題是,當我使用KafkaSendChannelWithCallback我的代碼正常編譯但是當我使用KafkaSendChannelWithFuture它給我下面的錯誤actor =聲明:

[error] IngestionActor.scala:32:模式類型與預期類型不兼容; [錯誤]實測值:KafkaSendChannelWithFuture [字符串,V] [錯誤]需要:SendChannel [V,單位]

由於兩個信道定義是從SendChannel延長,此代碼應編譯沒有任何錯誤。我不確定它爲什麼不編譯。謝謝

回答

1

Props對於IngestionActor需要SendChannel[V, Unit]。將KafkaSendChannelWithCallback傳遞給此參數的原因是因爲它是SendChannel[V, Unit]。另一方面,是SendChannel[V, Future[RecordMetadata]]。 A SendChannel[V, Future[RecordMetadata]]而不是 a SendChannel[V, Unit]

一種選擇是重新定義Props採取SendChannel[V, Any],因爲Any既是UnitFuture的超類型:

def props[V](config: Config, sendChannel: SendChannel[V, Any]): Props = ??? 

此時,編譯器仍然不高興,因爲SendChannel,作爲一個泛型類型,默認情況下是不變的。換句話說,SendChannel[V, Unit]SendChannel[V, Future[RecordMetadata]]都不是SendChannel[V, Any]類型。

要改變這種狀況,就第2類參數SendChannel協變(在B前面添加+):

trait SendChannel[A, +B] extends CommunicationChannel { 
    def send(data: A): B 
} 
+0

嗨@chunjef,感謝您的答覆。你提到'因爲任何是單元和未來的超類型,這是嘗試爲'任何'是超類型,但爲什麼SendChannel [V,單元]和SendChannel [V,未來[記錄元數據]]的類型都是SendChannel [V,任何]'這是錯誤的? – Explorer

+0

我做了你所建議的修改,是否有任何需要修改'props'仍然有'SendChannel [V,Unit]'因爲我仍然得到相同的錯誤。 – Explorer

+0

@Explorer:請仔細閱讀我的回答。我在回答中解答了你的兩條評論。 – chunjef