2

我想通過CompletableFuture.supplyAsync將優先級隊列添加到與LinkedBlockingQueue一起使用ThreadPoolExecutor的現有應用程序中。問題是我無法想出一個設計來分配任務優先級,然後我可以在PriorityBlockingQueue的比較器中訪問它。這是因爲我的任務被CompletableFuture包裝到一個稱爲AsyncSupply的私有內部類的實例中,該私有內部類將原始任務隱藏在私有字段中。比較器隨後被調用鑄成的Runnable這些AsyncSupply對象,如下所示:如何與PriorityBlockingQueue一起使用CompletableFuture.supplyAsync?

public class PriorityComparator<T extends Runnable> implements Comparator<T> { 

    @Override 
    public int compare(T o1, T o2) { 

     // T is an AsyncSupply object. 
     // BUT I WANT SOMETHING I CAN ASSIGN PRIORITIES TOO! 
     return 0; 
    } 
} 

我調查延長CompletableFuture這樣我就可以把它包裝在不同的對象,但這麼多的多CompletableFuture的可能性被封裝和uninheritable 。所以擴展它似乎不是一個選項。也不是用適配器封裝它,因爲它實現了非常寬的接口。

我不知道如何解決這個問題,除了複製整個CompletableFuture,並修改它。有任何想法嗎?

+0

我不明白你的問題。你正在調用'asyncSupply()'並傳遞一個配置了'LinkedBlockingQueue'的Executor?你想用'PriorityBlockingQueue'取代它?也就是說,似乎所有這些都在'supplyAsync()'函數和'CompletableFuture'的外部。你在哪裏遇到麻煩?也許你可以顯示你想寫的代碼,中間缺少的位突出顯示。 – erickson

+0

謝謝@erickson回覆,並提出一個很好的問題。我沒有注意到爲什麼我不能只使用PriorityBlockingQueue的重要一點。所以原因是PriorityBlockingQueue會傳遞給比較器:compare方法將兩個AsyncSupply對象轉換爲Runnables。這個AsyncSupply是CompletableFuture的一個私有的內部類,所以我不能將它轉換回AsyncSupply,更不用說訪問它的私有字段,該字段引用我想要分配優先級的原始未解包任務。所以我想這個問題是:我如何或在哪裏分配我的優先事項?我會添加一些代碼。 – jacob

回答

5

這似乎是API中的一個限制,CompletableFuture沒有提供直接使用PriorityBlockingQueue的方法。幸運的是,我們可以在沒有太多麻煩的情況下繞過它。在Oracle的1.8 JVM,他們碰巧命名所有的內部類字段fn的,所以提取我們的首要任務感知Runnable S可沒有太多的麻煩來完成:

public class CFRunnableComparator implements Comparator<Runnable> { 

    @Override 
    @SuppressWarnings("unchecked") 
    public int compare(Runnable r1, Runnable r2) { 
     // T might be AsyncSupply, UniApply, etc., but we want to 
     // compare our original Runnables. 
     return ((Comparable) unwrap(r1)).compareTo(unwrap(r2)); 
    } 

    private Object unwrap(Runnable r) { 
     try { 
      Field field = r.getClass().getDeclaredField("fn"); 
      field.setAccessible(true); 
      // NB: For performance-intensive contexts, you may want to 
      // cache these in a ConcurrentHashMap<Class<?>, Field>. 
      return field.get(r); 
     } catch (IllegalAccessException | NoSuchFieldException e) { 
      throw new IllegalArgumentException("Couldn't unwrap " + r, e); 
     } 
    } 
} 

這是假設你的Supplier類是Comparable,是這樣的:

public interface WithPriority extends Comparable<WithPriority> { 
    int priority(); 
    @Override 
    default int compareTo(WithPriority o) { 
     // Reverse comparison so higher priority comes first. 
     return Integer.compare(o.priority(), priority()); 
    } 
} 

public class PrioritySupplier<T> implements Supplier<T>, WithPriority { 
    private final int priority; 
    private final Supplier<T> supplier; 
    public PrioritySupplier(int priority, Supplier<T> supplier) { 
     this.priority = priority; 
     this.supplier = supplier; 
    } 
    @Override 
    public T get() { 
     return supplier.get(); 
    } 
    @Override 
    public int priority() { 
     return priority; 
    } 
} 

使用方法如下:

PriorityBlockingQueue<Runnable> q = new PriorityBlockingQueue<>(11 /*default*/, 
     new CFRunnableComparator()); 
ThreadPoolExecutor pool = new ThreadPoolExecutor(..., q); 
CompletableFuture.supplyAsync(new PrioritySupplier<>(n,() -> { 
    ... 
}), pool); 

如果你創建類,比如PriorityFunctionPriorityBiConsumer,您可以使用相同的技術來調用方法,如thenApplyAsyncwhenCompleteAsync以及適當的優先級。

+1

抱歉,延遲迴復,非常感謝。我最終做了你所說的話,並且爲供應商和比較者抽出了我自己的實現。 – jacob

相關問題