2017-04-02 56 views
2

我有問題了解Akka持久性查詢,尤其是方法eventsByTag,因爲它不像我所期望的那樣。爲什麼Akka Persisence Query Read Journal沒有檢索我的活動?

在我的主類中,我調用一個類來開始監聽使用某個標籤持久化的任何事件。

class CassandraJournal(implicit val system: ActorSystem) { 

def engageStreaming = { 
    val readJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) 
    implicit val mat = ActorMaterializer() 

    readJournal.eventsByTag("account", Offset.noOffset) 
    .runForeach { event => println(event) } 
    } 
} 

每當我開始我的服務器和我的事件存儲是空的,我堅持我的第一個事件(通過調用HTTP服務,建於阿卡HTTP),該事件確實被打印出來。但是,當我重新啓動服務器並且事件存儲中已有事件時,新的持久事件將不會打印。

對此有解釋嗎?我很難弄清楚爲什麼會發生這種情況。

編輯

事件我使用的商店是卡桑德拉。這裏是PersistentActor(我不使用事件適配器來標記事件,只是包裝他們周圍一個標記())

class Account(id: UUID) extends PersistentActor { 

    override def receiveRecover: Receive = { 
    case createCheckingsAccount: CreateCheckingsAccount => 
     println("Creating checkings account") 
    } 

    override def receiveCommand: Receive = { 
    case createCheckingsAccount: CreateCheckingsAccount => 
     persist(Tagged(CheckingsAccountCreated(id), Set("account"))) { event => 
     val checkingsAccountCreatedEvent = event.payload.asInstanceOf[CheckingsAccountCreated] 
     sender ! CreateCheckingsAccountResponse(checkingsAccountCreatedEvent.id.toString) 
     } 

    } 

    def updateState(evt: Event): Unit = { 
    } 

    override def persistenceId: String = s"account-$id" 
} 
+0

使用事件適配器,持久角色以及您用於持久性的數據存儲的更新問題。 –

回答

2

隨着receiveRecover沒有做必要的狀態恢復工作,堅持不懈,就不能正常工作。我建議在receiveRecover中加入一些基本的狀態恢復邏輯,並讓你的updateState方法覆蓋標記的事件情況。

我在一個類似於下面的狀態恢復邏輯的應用中使用了eventsByTag,它在新的啓動和恢復方面都工作得很好。

def updateState(e: Any): Unit = e match { 
    case evt: Event => 
    state = state.updated(evt) 
    case Tagged(evt: Event, _) => 
    state = state.updated(evt) 
} 

... 

override def receiveRecover: Receive = { 
    case evt: Event => updateState(evt) 
    case taggedEvt: Tagged => updateState(taggedEvt) 
} 
+0

通過設置一些適當的恢復工作,我能夠得到它的工作。我並沒有意識到復甦將重新開始。謝謝你的幫助! –

相關問題