我正在斯卡拉建立一個基於演員的服務,消費者可以查詢客戶端是否被授權,也可以授權客戶端。斯卡拉演員:receiveWithin()沒有收到消息
如果消費者查詢客戶端的授權狀態並且該客戶端尚未獲得授權,則參與者應該在指定的超時時間內等待傳入的Authorize
消息,然後發送回覆。 IsAuthorized
應該能夠在消費者代碼中同步執行,以便阻止並等待答覆。像
service !? IsAuthorized(client) => {
case IsAuthorizedResponse(_, authorized) => // do something
}
然而receiveWithin()
在我的演員東西永遠不會收到一個消息,並始終跑入超時。
這裏是我的代碼
case object WaitingForAuthorization
case class WaitingForAuthorizationResponse(clients: immutable.Set[Client])
case class IsAuthorized(client: Client)
case class IsAuthorizedResponse(client: Client, authorized: Boolean)
case class Authorize(client: Client)
class ClientAuthorizationService {
private val authorized: mutable.Set[Client] = new mutable.HashSet[Client] with mutable.SynchronizedSet[Client]
private val waiting: mutable.Set[Client] = new mutable.HashSet[Client] with mutable.SynchronizedSet[Client]
def actor = Actor.actor {
loop {
react {
case IsAuthorized(client: Client) => reply {
if (authorized contains client) {
IsAuthorizedResponse(client, true)
} else {
waiting += client
var matched = false;
val end = Instant.now.plus(ClientAuthorizationService.AUTH_TIMEOUT)
while (!matched && Instant.now.isBefore(end)) {
// ERROR HERE: Never receives Authorize messages
receiveWithin(ClientAuthorizationService.AUTH_TIMEOUT) {
case Authorize(authorizedClient: Client) => {
authorizeClient(authorizedClient)
if (authorizedClient == client) matched = true
}
case TIMEOUT => // do nothing since we handle the timeout in the while loop
}
}
IsAuthorizedResponse(client, matched)
}
}
case Authorize(client: Client) => authorizeClient(client)
case WaitingForAuthorization => reply {
WaitingForAuthorizationResponse(immutable.Set() ++ waiting)
}
}
}
}
private def authorizeClient(client: Client) = synchronized {
authorized += client
waiting -= client
}
}
object ClientAuthorizationService {
val AUTH_TIMEOUT: Long = 60 * 1000;
}
當我發送Authorize
消息給演員,而它在receiveWithin塊由第二case語句抓住低於實際應該只捕獲這些消息時沒有消息當時正在等待答覆。
我的代碼有什麼問題?
更新:
下面是相關的代碼的簡化版本,實際上代表了一個更簡單,不同的邏輯,但也許更好地闡明瞭問題:
loop {
react {
case IsAuthorized(client: Client) => reply {
var matched = false
// In the "real" logic we would actually loop here until either the
// authorized client matches the requested client or the timeout is hit.
// For the sake of the demo we only take the first Authorize message.
receiveWithin(60*1000) {
// Although Authorize is send to actor it's never caught here
case Authorize(authorizedClient: Client) => matched = authorizedClient == client
case TIMEOUT =>
}
IsAuthorizedResponse(client, matched)
}
case Authorize(client: Client) => // this case is hit
}
}
更新2:
我終於解決了這個問題。我認爲問題在於,在前面的IsAuthorized
消息的回覆中嘗試收到Authorize
消息時,演員被阻止。
我重寫了代碼,以便在我們等待Authorized
時啓動匿名Actor。這是有興趣的人的代碼。 waiting
是Map[Client, Actor]
。
loop {
react {
case IsAuthorized(client: Client) =>
if (authorized contains client) {
sender ! IsAuthorizedResponse(client, true)
} else {
val receipient = sender
// Start an anonymous actor that waits for an Authorize message
// within a given timeout and sends a reply to the consumer.
// The actor will be notified by the parent actor below.
waiting += client -> Actor.actor {
val cleanup =() => {
waiting -= client
exit()
}
receiveWithin(ClientAuthorizationService.AUTH_TIMEOUT) {
case Authorize(c) =>
receipient ! IsAuthorizedResponse(client, true)
cleanup()
case TIMEOUT =>
receipient ! IsAuthorizedResponse(client, false)
cleanup()
}
}
}
case Authorize(client: Client) =>
authorized += client
waiting.get(client) match {
case Some(actor) => actor ! Authorize(client)
case None =>
}
case WaitingForAuthorization => sender ! WaitingForAuthorizationResponse(immutable.Set() ++ waiting.keySet)
}
}
如果有更好的方法來解決這個問題,請讓我知道!
你能清理/縮短代碼併發布相關部分嗎? – Jus12
我已更新我的帖子。 –
爲什麼不使用'reactWithin'? – Jus12