這似乎是API中的一個限制,CompletableFuture
沒有提供直接使用PriorityBlockingQueue
的方法。幸運的是,我們可以在沒有太多麻煩的情況下繞過它。在Oracle的1.8 JVM,他們碰巧命名所有的內部類字段fn
的,所以提取我們的首要任務感知Runnable
S可沒有太多的麻煩來完成:
public class CFRunnableComparator implements Comparator<Runnable> {
@Override
@SuppressWarnings("unchecked")
public int compare(Runnable r1, Runnable r2) {
// T might be AsyncSupply, UniApply, etc., but we want to
// compare our original Runnables.
return ((Comparable) unwrap(r1)).compareTo(unwrap(r2));
}
private Object unwrap(Runnable r) {
try {
Field field = r.getClass().getDeclaredField("fn");
field.setAccessible(true);
// NB: For performance-intensive contexts, you may want to
// cache these in a ConcurrentHashMap<Class<?>, Field>.
return field.get(r);
} catch (IllegalAccessException | NoSuchFieldException e) {
throw new IllegalArgumentException("Couldn't unwrap " + r, e);
}
}
}
這是假設你的Supplier
類是Comparable
,是這樣的:
public interface WithPriority extends Comparable<WithPriority> {
int priority();
@Override
default int compareTo(WithPriority o) {
// Reverse comparison so higher priority comes first.
return Integer.compare(o.priority(), priority());
}
}
public class PrioritySupplier<T> implements Supplier<T>, WithPriority {
private final int priority;
private final Supplier<T> supplier;
public PrioritySupplier(int priority, Supplier<T> supplier) {
this.priority = priority;
this.supplier = supplier;
}
@Override
public T get() {
return supplier.get();
}
@Override
public int priority() {
return priority;
}
}
使用方法如下:
PriorityBlockingQueue<Runnable> q = new PriorityBlockingQueue<>(11 /*default*/,
new CFRunnableComparator());
ThreadPoolExecutor pool = new ThreadPoolExecutor(..., q);
CompletableFuture.supplyAsync(new PrioritySupplier<>(n,() -> {
...
}), pool);
如果你創建類,比如PriorityFunction
和PriorityBiConsumer
,您可以使用相同的技術來調用方法,如thenApplyAsync
和whenCompleteAsync
以及適當的優先級。
我不明白你的問題。你正在調用'asyncSupply()'並傳遞一個配置了'LinkedBlockingQueue'的Executor?你想用'PriorityBlockingQueue'取代它?也就是說,似乎所有這些都在'supplyAsync()'函數和'CompletableFuture'的外部。你在哪裏遇到麻煩?也許你可以顯示你想寫的代碼,中間缺少的位突出顯示。 – erickson
謝謝@erickson回覆,並提出一個很好的問題。我沒有注意到爲什麼我不能只使用PriorityBlockingQueue的重要一點。所以原因是PriorityBlockingQueue會傳遞給比較器:compare方法將兩個AsyncSupply對象轉換爲Runnables。這個AsyncSupply是CompletableFuture的一個私有的內部類,所以我不能將它轉換回AsyncSupply,更不用說訪問它的私有字段,該字段引用我想要分配優先級的原始未解包任務。所以我想這個問題是:我如何或在哪裏分配我的優先事項?我會添加一些代碼。 – jacob