2017-04-18 80 views
0

我有一個匕首單例包裝處理我的基本領域請求。其中一個看起來像這樣:領域 - 實現異步隊列

public void insertOrUpdateAsync(final List<RealmMessage> messages, @Nullable final OnInsertListener listener) { 
    Realm instance = getRealmInstance(); 
    instance.executeTransactionAsync(realm -> { 
       List<RealmMessage> newMessages = insertOrUpdateMessages(realm, messages); 
      }, 
      () -> success(listener, instance), 
      error -> error(listener, error, instance)); 
} 

private List<RealmMessage> insertOrUpdateMessages(@NonNull Realm realm, @NonNull final List<RealmMessage> messages) { 
    ... 
    return realm.copyToRealmOrUpdate(unattendedMessages); 
} 

這很好。

但是有一個角落案例 - 長話短說 - 我多次啓動insertOrUpdateAsynch()。而一些要求後,我得到這個:

Caused by: java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Running, pool size = 17, active threads = 17, queued tasks = 100, completed tasks = 81]

我的問題是:我應該如何處理這種不重建整個應用程序流。 我的想法是通過RxJava對傳入的請求進行排隊。我對嗎?我應該考慮哪些運營商並對自己進行教育?

還是我以完全錯誤的方式接近這個? 從我的大多數谷歌搜索中,我發現主要是這個問題是在啓動像我的循環方法。我沒有使用任何。在我的情況下,問題是這種方法是由多個響應啓動的,並且由於當前的後端實現而進行更改是不可能的。

回答

0

如果你不想重新設計你的應用程序,你可以使用計數信號量。你會看到兩個線程會立即獲得該鎖。另一個線程將阻塞,直到某個呼叫釋放一個鎖。不建議在沒有超時的情況下使用acquire()。

爲了使用RxJava,你將不得不改變你的應用程序的設計,並且在RxJava中限速並不是那麼容易,因爲它全部是關於吞吐量的。

private final Semaphore semaphore = new Semaphore(2); 

@Test 
public void name() throws Exception { 
    Thread t1 = new Thread(() -> { 
     doNetworkStuff(); 
    }); 
    Thread t2 = new Thread(() -> { 
     doNetworkStuff(); 
    }); 
    Thread t3 = new Thread(() -> { 
     doNetworkStuff(); 
    }); 

    t1.start(); 
    t2.start(); 
    t3.start(); 

    Thread.sleep(1500); 
} 

private void doNetworkStuff() { 
    try { 
     System.out.println("enter doNetworkStuff"); 

     semaphore.acquire(); 

     System.out.println("acquired"); 

     Thread.sleep(1000); 

    } catch (InterruptedException e) { 
     e.printStackTrace(); // Don't do this!! 
    } finally { 
     semaphore.release(); 
    } 
}