我有一個Java類型的actor,負責可能暫時不可用的外部資源上的過濾器/重試邏輯。演員的領域,常用的方法有:無限期地等待可能永遠不會到達的消息
public class MyActorImpl implements MyActor {
private static final long MINWAIT = 50;
private static final long MAXWAIT = 1000;
private static final long DEFAULTWAIT = 0;
private static final double BACKOFFMULTIPLIER = 1.5;
private long updateWait(long currentWait) {
return Math.min(Math.max((long) (currentWait * BACKOFFMULTIPLIER), MINWAIT), MAXWAIT);
}
// mutable
private long opWait = DEFAULTWAIT;
private final Queue<OpInput> opBuffer = new ArrayDeque<>();
// called from external actor
public void operation(OpInput opInput) {
operation(opInput, DEFAULTWAIT);
}
// called internally
public void operation(OpInput opInput, long currentWait);
}
演員有幾個操作所有或多或少相同的重試/緩衝邏輯;每個操作都有自己的[op]Wait
和[op]Buffer
字段。
- 父演員調用
void operation(OpInput opInput)
void operation(OpInput opInput, long currentWait)
使用DEFAULTWAIT
用於第二參數前述方法調用- 如果
currentWait
參數不等於opWait
然後輸入被存儲在opBuffer
,否則該輸入被髮送到外部資源。 - 如果外部資源返回成功,則
opWait
設置爲DEFAULTWAIT
,並且opBuffer
的內容通過operation(opInput)
方法發回。如果外部資源(或更可能是網絡)返回錯誤,則我使用opWait
毫秒的延遲更新opWait = updateWait(opWait)
和調度參與者系統調度器上的operation(opInput, opWait)
。
I.e.我使用actor系統調度器來實現指數退避;我正在使用currentWait
參數來標識正在重試的消息,並正在緩衝其他消息,直到主消息被外部資源成功處理。
問題是,如果預定的operation(opInput, currentWait)
消息丟失,那麼我將永遠緩衝消息,因爲currentWait == opWait
防護將對所有其他消息失敗。我可以使用像spring-retry這樣的實現指數回退,但我沒有看到合併操作的重試循環的方式,這意味着我可以在每個重試循環中使用一個線程(而使用actor系統的調度器不會放得太多系統上的應變)。
我正在尋找一種更容錯的方法來實現角色和外部資源之間的接口上的緩衝和指數退避,而不必爲任務分配太多資源。
「調用外部」我的意思是該方法被另一個actor調用,而「內部調用」方法只能通過'self.operation(...)調用。並感謝您使用未來的建議,而不是預定的消息,這大大簡化了代碼 –