我有問題了解Akka持久性查詢,尤其是方法eventsByTag,因爲它不像我所期望的那樣。爲什麼Akka Persisence Query Read Journal沒有檢索我的活動?
在我的主類中,我調用一個類來開始監聽使用某個標籤持久化的任何事件。
class CassandraJournal(implicit val system: ActorSystem) {
def engageStreaming = {
val readJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
implicit val mat = ActorMaterializer()
readJournal.eventsByTag("account", Offset.noOffset)
.runForeach { event => println(event) }
}
}
每當我開始我的服務器和我的事件存儲是空的,我堅持我的第一個事件(通過調用HTTP服務,建於阿卡HTTP),該事件確實被打印出來。但是,當我重新啓動服務器並且事件存儲中已有事件時,新的持久事件將不會打印。
對此有解釋嗎?我很難弄清楚爲什麼會發生這種情況。
編輯
事件我使用的商店是卡桑德拉。這裏是PersistentActor(我不使用事件適配器來標記事件,只是包裝他們周圍一個標記())
class Account(id: UUID) extends PersistentActor {
override def receiveRecover: Receive = {
case createCheckingsAccount: CreateCheckingsAccount =>
println("Creating checkings account")
}
override def receiveCommand: Receive = {
case createCheckingsAccount: CreateCheckingsAccount =>
persist(Tagged(CheckingsAccountCreated(id), Set("account"))) { event =>
val checkingsAccountCreatedEvent = event.payload.asInstanceOf[CheckingsAccountCreated]
sender ! CreateCheckingsAccountResponse(checkingsAccountCreatedEvent.id.toString)
}
}
def updateState(evt: Event): Unit = {
}
override def persistenceId: String = s"account-$id"
}
使用事件適配器,持久角色以及您用於持久性的數據存儲的更新問題。 –