2016-01-08 77 views
1

我正試圖用Scala實現映射條目偵聽器。實現映射條目偵聽器

理念:

  1. 我需要訂閱來自服務的地圖。
  2. 我需要在添加/更新特定密鑰的條目時通知所有訂戶。
  3. 我需要從其他服務訪問地圖來檢查輸入值。

我找不到這個現成的解決方案,所以我試圖用阿卡實現它:

class TrackingService(system: ActorSystem) extends LazyLogging { 
    private val trackingActor = system.actorOf(TrackingActor.props) 
    private val duration = Duration(15, TimeUnit.SECONDS) 
    private implicit val timeout = Timeout(duration) 

    def fireEvent(key: String): Unit = { 
    TrackingActor ! EventFired(key) 
    } 

    def eventHappened(key: String): Future[Boolean] = { 
    TrackingActor ? DoesEventHappened(key)).mapTo[Boolean] 
    } 

    def registerHiddenCardListener(key: String, data: MyData): Unit = { 
    TrackingActor ! Subscribe(key, data) 
    } 
} 

case class EventFired(key: String) 
case class EventHappened(key: String) 
case class EventHappenedResponse(happened: Boolean) 

case class Subscribe(key: String, data: Data) 
class TrackingActor extends Actor with LazyLogging { 
    var eventMap: Map[String, Boolean] = Map.empty[String, Boolean] 
    var dataMap: Map[String, List[Data]] = Map.empty[String, List[Data]] 

    def receive: Receive = { 
    case Subscribe(key, data)  => 
     val currentData: List[Data] = dataMap.getOrElse(key, Nil) 
     val newData = data :: currentData 
     dataMap = dataMap + (key -> newData) 
    case EventHappened(key)   => sender() ! EventHappenedResponse(eventMap.getOrElse(key, false)) 
    case [email protected](key)   => 
     eventMap = eventMap + (key -> true) 

     for { 
     dataOpt <- dataMap.get(key) 
     data <- dataOpt 
     } { 
     // do callback with data (e.g. send email) 
     } 
    case x => logger.warn(s"Received unknown message: $x") 
    } 
} 

object TrackingActor { 
    def props: Props = Props(classOf[TrackingActor]) 
} 

我並不在此解決方案喜歡什麼:我不喜歡問模式,但我需要訪問非演員類的條目。另外,我不喜歡有2張地圖,但我需要存儲某個地方的數據,這些數據應該用於回調。

關於如何改進這一點的任何想法?

回答

1

這裏有一個想法:

case class Subscribe[A, B](f: (A, B, NotifyingMap[A,B]) => Any) 

case class Event[A, B](key: A, value: B, originator: NotifyingMap[A,B]) 

case class RegisterObserver(actorRef: ActorRef) 

/** 
    * Subscribes to events 
    */ 
class Subscriber[A,B]{ 

    def register(actorSystem: ActorSystem) = { 
    val actor = actorSystem.actorOf(Props(classOf[Observer[A,B]])) 
    actor ! Subscribe(handleEvent) 
    } 

    def handleEvent(key: A, value: B, notifyingMap: NotifyingMap[A, B]) = { 
    println(s"Saw key $key with value $value") 
    } 
} 

/** 
    * Observer of events that will call a partial function when 
    * an event comes in. 
    */ 
class Observer[A, B] extends Actor{ 
    var f: (A,B,NotifyingMap[A,B]) => Any = _ 

    def receive = { 
    case x: Subscribe[A, B] => 
     f = x.f 
     Notifier() ! RegisterObserver(self) 
    case e: Event[A,B] => 
     f(e.key, e.value, e.originator) 
    } 
} 

/** 
    * Notifier that sends out the event to all registered observers. 
    */ 
class Notifier extends Actor { 
    var observers = List[ActorRef]() 

    def receive = { 
    case x: RegisterObserver => 
     observers = x.actorRef :: observers 
    case x: Event[_,_] => 
     observers.foreach(_ ! Event) 
    } 
} 

/** 
    * Singleton notifier. 
    */ 
object Notifier{ 

    var notifier: ActorRef = _ 

    def create(actorSystem: ActorSystem) = 
    actorSystem.actorOf(Props(classOf[Notifier])) 

    def apply(): ActorRef = notifier 
} 

/** 
    * Class that sends out an event when an item is put. Also allows for 
    * getting an item based on a key. 
    */ 
class NotifyingMap[A, B](){ 
    val map: TrieMap[A,B] = TrieMap[A,B]() 

    // Specific business logic here on when you publish the event. 
    def put(key: A, value: B) = { 
    map.put(key, value).foreach(v => Notifier() ! Event(key, v, this)) 
    } 

    def get(key: A) = map.get(key) 
} 

通過這樣做,你可以讓你的用戶非Actor類,同時仍允許其對事件作出反應。您也可以在您的NotifyingMap上調用簡單的舊方法,因爲它只是一個類,而不是Actor

我個人喜歡在消息中存儲回調信息。通常情況下,您可以通過在案例分類中設置另一個ActorRef來看到這一點。在這個例子中,我們在案例類中有NotifyingMap,所以我們知道事件來自哪裏,並且可以在那裏適當地調用get方法。

完全披露:我沒有運行任何此代碼。它編譯。