2017-02-23 79 views
0

在Akka集羣應用的上下文中,我遇到了一個關於Akka預期的一個屬性的問題:每個(cas)類和每個使用的消息必須是可序列化的。我有以下上下文:我想要使用來自redis羣集的數據,爲此,我決定採用羣集感知路由器池來添加節點以增加工作人員。工作人員從redis中讀取數據並在mongodb中存儲一些元數據。在第一個版本,我這樣做:Akka集羣感知路由器 - 共享redis實例到所有路由器

object MasterWorkers { 

    def props 
    ( awsBucket : String, 
    gapMinValueMicroSec : Long, 
    persistentCache: RedisCache, 
    mongoURI : String, 
    mongoDBName : String, 
    mongoCollectioName : String 
) : Props = 
    Props(MasterWorkers(awsBucket, gapMinValueMicroSec, persistentCache, mongoURI, mongoDBName, mongoCollectioName)) 

    case class JobRemove(deviceId: DeviceId, from : Timestamp, to : Timestamp) 
} 

case class MasterWorkers 
(
    awsBucket : String, 
    gapMinValueMicroSec : Long, 
    persistentCache: RedisCache, 
    mongoURI : String, 
    mongoDBName : String, 
    mongoCollectioName : String 
) extends Actor with ActorLogging { 

    val workerRouter = 
    context.actorOf(FromConfig.props(Props(classOf[Worker],awsBucket,gapMinValueMicroSec, self, persistentCache, mongoURI, mongoDBName, mongoCollectioName)), 
    name = "workerRouter") 

Worker類:​​

object Worker { 

    def props 
    (
    awsBucket : String, 
    gapMinValueMicroSec : Long, 
    replyTo : ActorRef, 
    persistentCache: RedisCache, 
    mongoURI : String, 
    mongoDBName : String, 
    mongoCollectioName : String 
) : Props = 
    Props(Worker(awsBucket, gapMinValueMicroSec, replyTo, persistentCache, mongoURI, mongoDBName, mongoCollectioName)) 

    case class JobDumpFailed(deviceId : DeviceId, from: Timestamp, to: Timestamp) 
    case class JobDumpSuccess(deviceId : DeviceId, from: Timestamp, to: Timestamp) 

    case class JobRemoveFailed(deviceId : DeviceId, from: Timestamp, to: Timestamp) 
} 

case class Worker 
(
    awsBucket : String, 
    gapMinValueMicroSec : Long, 
    replyTo : ActorRef, 
    persistentCache: RedisCache, 
    mongoURI : String, 
    mongoDBName : String, 
    mongoCollectioName : String 
) extends Actor with ActorLogging { 

但是這引發了下面的異常,當我開始兩個節點:

[info] akka.remote.MessageSerializer$SerializationException: Failed to serialize remote message [class akka.remote.DaemonMsgCreate] using serializer [class akka.remote.serialization.DaemonMsgCreateSerializer]. 
[info] at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:61) 
[info] at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:895) 
[info] at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:895) 
[info] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
[info] at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:894) 
[info] at akka.remote.EndpointWriter.writeSend(Endpoint.scala:786) 
[info] at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:761) 
[info] at akka.actor.Actor$class.aroundReceive(Actor.scala:497) 
[info] at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:452) 
[info] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) 
[info] at akka.actor.ActorCell.invoke(ActorCell.scala:495) 
[info] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) 
[info] at akka.dispatch.Mailbox.run(Mailbox.scala:224) 
[info] at akka.dispatch.Mailbox.exec(Mailbox.scala:234) 
[info] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[info] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[info] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[info] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[info] Caused by: java.io.NotSerializableException: akka.actor.ActorSystemImpl 

Redis的緩存是簡單案例類與伴隨對象實現如下接口:

​​

然後解決這個問題,我曾在工人的redisCache,我不把它給主節點​​:

case class Worker 
(
    awsBucket : String, 
    gapMinValueMicroSec : Long, 
    replyTo : ActorRef, 
    mongoURI : String, 
    mongoDBName : String, 
    mongoCollectioName : String 
) extends Actor with ActorLogging { 

// redis cache here now 
val redisCache = ... 

但是,這樣的設計,每一個routee將創建的Redis的新實例緩存,它不是預期的行爲。我想要的是讓我的redis緩存有一個實例,然後與我的所有路由共享,但是在集羣應用程序的環境中,似乎無法實現,因此我不知道這是設計失敗還是某些缺失的體驗與Akka。如果有人遇到類似的問題,我樂意提出建議!

回答

0

問題是你的RedisCache不是那麼簡單。它帶有一個ActorSystem,它不能被序列化。

我想這是因爲它包含RedisClient例如 - - rediscala庫,這些需要ActorSystem

您需要從actor系統中抽象出來,並且只傳遞給您的工作人員Redis集羣的裸露細節(即RedisServer對象)。

然後工人將自己實例化RedisClient - 使用他們的context.system

case class Worker 
(
    awsBucket : String, 
    gapMinValueMicroSec : Long, 
    replyTo : ActorRef, 
    redisMaster: RedisServer, 
    redisSlaves: Seq[RedisServer], 
    mongoURI : String, 
    mongoDBName : String, 
    mongoCollectioName : String 
) extends Actor with ActorLogging { 

    val masterSlaveClient = ??? //create from the RedisServer details 

} 

這將允許每個工作人員與redis集羣建立自己的連接。

或者,如果您只想在主人中連接一次並與工作人員分享連接,則需要傳遞嵌入連接的RedisClientActorsource)。這是一個ActorRef,可以遠程共享。

這可以通過致電client.redisConnection獲得。

工人則可以圍繞它建立一個ActorRequest,例如

case class Worker 
    (
     awsBucket : String, 
     gapMinValueMicroSec : Long, 
     replyTo : ActorRef, 
     redisConnection: ActorRef, 
     mongoURI : String, 
     mongoDBName : String, 
     mongoCollectioName : String 
    ) extends Actor with ActorLogging with ActorRequest { 

     // you will need to implement the execution context that ActorRequest needs as well.. 

     send(redisCommand) 

    } 
+0

關於本解決方案,這意味着每個工人都會實例化一個Redis的客戶端? – alifirat

+0

是的。我剛剛添加了一個涉及共享一個連接的方法,如果這是您感興趣的內容。 –

+0

是的,我沒有想到替代。我會試一試,讓你知道! – alifirat