2017-05-06 78 views
1

當我將生產者設置放入代碼中時,我一次又一次地遇到問題。當我沒有它時,一切正常。下面給出了它包含所有代碼的文件單個文件,我試圖將一個文件寫入一個kafka流。並得到這個編譯錯誤。Akka Kafka Producersettings:重載的方法值適用於替代方案:

package somePackage 

import java.nio.file.Paths 

import akka.Done 
import akka.actor.{Actor, ActorLogging, ActorSystem, Cancellable, Props} 
import akka.kafka.ProducerSettings 
import akka.serialization.ByteArraySerializer 
import akka.stream.{ActorMaterializer, Materializer} 
import akka.stream.scaladsl.{FileIO, Sink} 
import akka.util.ByteString 
import org.apache.kafka.clients.producer.ProducerRecord 
import org.apache.kafka.common.serialization.StringSerializer 

import scala.concurrent.Future 


// Lines Producer 
class LinesProducer (implicit mat: Materializer) extends Actor with ActorLogging { 
    import LinesProducerCompanion._ 

    override def preStart():Unit = { 
    log.info("Not doing anything in PreStart!") 
    } 

    def receive = { 
    case Start => 
     log.info("LinesProducer Started.") 
     val future:Future[Done] = FileIO.fromPath(Paths.get("C:\\Users\\tnkteja\\Documents\\GitHub\\scala-immersion-program\\miniproject-1\\1000-genomes.csv")) 
     .map(line => {new ProducerRecord[Array[Byte],ByteString]("genomes0", line)}) 
     .runWith(Sink.ignore) 
    } 

    override def postStop():Unit = { 
    log.info("Doing nothing in postStop!") 
    } 
} 

object LinesProducerCompanion { 
    val props = Props[LinesProducer] 
    case object Start 
    case object Stop 
} 

object Application extends App { 

    implicit val system:ActorSystem = ActorSystem("some") 
    implicit val materializer:ActorMaterializer= ActorMaterializer() 
    implicit val executor = system.dispatcher 
    val LinesProducer = system.actorOf(LinesProducerCompanion.props, "LinesProducer") 
    val producerSetting = ProducerSettings(system, new ByteArraySerializer(), new StringSerializer).withBootstrapServers("localhost:9092") 
    LinesProducer ! LinesProducerCompanion.Start 

    // This example app will ping pong 3 times and thereafter terminate the ActorSystem - 
    // see counter logic in PingActor 
    //system.awaitTermination() 
} 

和錯誤是

info] Resolving org.fusesource.jansi#jansi;1.4 ... 
[info] Done updating. 
[info] Set current project to project (in build file:/C:/Users/tnkteja/Documents/GitHub/scala-immersion-program/miniproject-1/) 
[info] Compiling 1 Scala source to C:\Users\tnkteja\Documents\GitHub\scala-immersion-program\miniproject-1\target\scala-2.12\classes... 
[error] C:\Users\tnkteja\Documents\GitHub\scala-immersion-program\miniproject-1\src\main\scala\Application.scala:29: overloaded method value apply with alternatives: 
[error] [K, V](config: com.typesafe.config.Config, keySerializer: org.apache.kafka.common.serialization.Serializer[K], valueSerializer: org.apache.kafka.common.serialization.Serializer[V 
])akka.kafka.ProducerSettings[K,V] <and> 
[error] [K, V](system: akka.actor.ActorSystem, keySerializer: org.apache.kafka.common.serialization.Serializer[K], valueSerializer: org.apache.kafka.common.serialization.Serializer[V])ak 
ka.kafka.ProducerSettings[K,V] <and> 
[error] [K, V](config: com.typesafe.config.Config, keySerializer: Option[org.apache.kafka.common.serialization.Serializer[K]], valueSerializer: Option[org.apache.kafka.common.serializati 
on.Serializer[V]])akka.kafka.ProducerSettings[K,V] <and> 
[error] [K, V](system: akka.actor.ActorSystem, keySerializer: Option[org.apache.kafka.common.serialization.Serializer[K]], valueSerializer: Option[org.apache.kafka.common.serialization.S 
erializer[V]])akka.kafka.ProducerSettings[K,V] 
[error] cannot be applied to (akka.actor.ActorSystem, akka.serialization.ByteArraySerializer, org.apache.kafka.common.serialization.StringSerializer) 
[error]  val producerSetting = ProducerSettings(system, new ByteArraySerializer(), new StringSerializer).withBootstrapServers("localhost:9092") 
[error]       ^
[error] one error found 
[error] (compile:compileIncremental) Compilation failed 
[error] Total time: 8 s, completed 6 May, 2017 10:42:15 AM 

回答

0

您是否嘗試過使用卡夫卡的ByteArraySerializer呢?

import org.apache.kafka.common.serialization.ByteArraySerializer 

Akka的ByteArraySerializer()實例構造函數已被棄用。

+0

謝謝你的答案。這段代碼解決了這個問題。 – tnkteja

0

此代碼解決了問題。

包com.miniproject1

import akka.Done 
import akka.actor.{Actor, ActorLogging, Props} 
import akka.kafka.ProducerSettings 
import akka.kafka.scaladsl.Producer 
import akka.stream.Materializer 
import akka.stream.scaladsl.Source 
import org.apache.kafka.clients.producer.ProducerRecord 
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer} 


// Futures need execution context to reuse allocated thread pools 
import scala.concurrent.ExecutionContext.Implicits.global 
import scala.concurrent.Future 

import scala.io.Source.fromFile 


class LinesProducer(implicit mat: Materializer) extends Actor with ActorLogging { 
    import LinesProducerCompanion._ 

    override def preStart(): Unit = { 
    super.preStart() 
    log.info("Not doing anything in PreStart!") 
    } 

    override def receive: Receive = { 
    case Start => { 

     val producerSettings = ProducerSettings(context.system, new ByteArraySerializer, new StringSerializer) 
     .withBootstrapServers("192.168.99.100:9092") 

     log.info("Initializing writer") 

     val kafkaSink = Producer.plainSink(producerSettings) 

     // 
     val done: Future[Done] = Source.fromIterator(() => fromFile("C:\\Users\\tnkteja\\Documents\\GitHub\\scala-immersion-program\\miniproject-1\\1000-genomes.csv").getLines().drop(1)) 
     .map(line => {new ProducerRecord[Array[Byte], String]("genomes0", line)}) 
     .runWith(kafkaSink) 

     done.onComplete({ 
     success => 
      log.info("Writing to kafka Complete!") 
      context.stop(self) 
     }) 

     done.onFailure { 
     case ex => 
      log.info("*********************Stopping********************") 
      context.stop(self) 
     } 
    } 
    } 
} 

object LinesProducerCompanion{ 
    val props = Props[LinesProducer] 
    case object Start 
} 
相關問題