我上一個項目,我需要做下面的事情工作:如果未收到數據,請發送並重試數據?
- 發送特定的套接字到另一個系統某些數據。我必須在給定的套接字上發送特定的字節數組。每個字節數組都有一個唯一的長地址。
- 然後通過使用下面實現的
RetryStrategy
中的任何一個來繼續重試發送相同的數據。 - 啓動一個後臺輪詢器線程,它會告訴您在其他系統上發送的數據是否已收到。如果收到,我們會將它從
pending
隊列中刪除,這樣它就不會被重試,如果因爲任何原因沒有收到它,我們將重新嘗試使用我們使用的RetryStrategy重新發送相同的數據。
例如:如果我們派出byteArrayA
它具有獨特的長地址addressA
,如果它在其他系統recived,那麼我的輪詢線程將得到這個addressA
追溯到確認這意味着它已收到所以現在我們可以從待處理隊列中刪除此地址,以便它不會再次重試。
我有兩個RetryStrategy
實施ConstantBackoff
和。所以我想出了模擬上述流程的模擬器。
public class Experimental {
/** Return the desired backoff delay in millis for the given retry number, which is 1-based. */
interface RetryStrategy {
long getDelayMs(int retry);
}
public enum ConstantBackoff implements RetryStrategy {
INSTANCE;
@Override
public long getDelayMs(int retry) {
return 1000L;
}
}
public enum ExponentialBackoff implements RetryStrategy {
INSTANCE;
@Override
public long getDelayMs(int retry) {
return 100 + (1L << retry);
}
}
/** A container that sends messages with retries. */
private static class Sender {
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(20);
private final ConcurrentMap<Long, Retrier> pending = new ConcurrentHashMap<>();
/** Send the given (simulated) data with given address on the given socket. */
void sendTo(long addr, byte[] data, int socket) {
System.err.println("Sending " + Arrays.toString(data) + "@" + addr + " on " + socket);
}
/** The state of a message that's being retried. */
private class Retrier implements Runnable {
private final RetryStrategy retryStrategy;
private final long addr;
private final byte[] data;
private final int socket;
private int retry;
private Future<?> future;
Retrier(RetryStrategy retryStrategy, long addr, byte[] data, int socket) {
this.retryStrategy = retryStrategy;
this.addr = addr;
this.data = data;
this.socket = socket;
this.retry = 0;
}
private synchronized void start() {
if (future == null) {
future = executorService.submit(this);
pending.put(addr, this);
}
}
private synchronized void cancel() {
if (future != null) {
future.cancel(true);
future = null;
}
}
private synchronized void reschedule() {
if (future != null) {
future = executorService.schedule(this, retryStrategy.getDelayMs(++retry), MILLISECONDS);
}
}
@Override
synchronized public void run() {
sendTo(addr, data, socket);
reschedule();
}
}
/**
* Get a (simulated) verified message address. Just picks a pending
* one. Returns zero if none left.
*/
long getVerifiedAddr() {
System.err.println("Pending messages: " + pending.size());
Iterator<Long> i = pending.keySet().iterator();
long addr = i.hasNext() ? i.next() : 0;
return addr;
}
/** A polling loop that cancels retries of (simulated) verified messages. */
class CancellationPoller implements Runnable {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
long addr = getVerifiedAddr();
if (addr == 0) {
continue;
}
System.err.println("Verified message (to be cancelled) " + addr);
Retrier retrier = pending.remove(addr);
if (retrier != null) {
retrier.cancel();
}
}
}
}
private Sender initialize() {
executorService.submit(new CancellationPoller());
return this;
}
private void sendWithRetriesTo(RetryStrategy retryStrategy, long addr, byte[] data, int socket) {
new Retrier(retryStrategy, addr, data, socket).start();
}
}
public static void main(String[] args) {
Sender sender = new Sender().initialize();
for (long i = 1; i <= 10; i++) {
sender.sendWithRetriesTo(ConstantBackoff.INSTANCE, i, null, 42);
}
for (long i = -1; i >= -10; i--) {
sender.sendWithRetriesTo(ExponentialBackoff.INSTANCE, i, null, 37);
}
}
}
我想看看上面的代碼中是否有任何競態條件或任何線程安全問題?因爲在多線程中正確使用是很困難的。
讓我知道是否有更好或有效的方法來做同樣的事情。
我的建議是不要使用線程,除非你真的有太多...你可以使用一個事件循環一個線程中執行的一切,但仍然有異步執行 – AJC