2017-10-18 172 views
1

在Java中的異步I/O果殼Java中的異步編程有`CompletionHandler`和`CompletableFuture`兩種不同的方法嗎?

回調風格是基於CompletionHandler,這 定義了兩種方法,completed()failed(),將被稱爲回來的時候 操作要麼成功或失敗。 如果您希望立即通知異步事件中的事件 I/O(例如,如果有大量I/O操作在運行中,但任何單個操作的故障 未必是致命的),此樣式非常有用。

http://www.deadcoderising.com/java8-writing-asynchronous-code-with-completablefuture/

除了實現Future接口,CompletableFuture也 實現CompletionStage接口。

A CompletionStage是一個承諾。它承諾計算 最終將完成。

關於CompletionStage的好處是,它提供了大量的方法選擇,允許您附加將在完成時執行 的回調。

這樣我們就可以以非阻塞的方式構建系統。

CompletionHandlerCompletableFuture都可用於指定回叫處理程序。

他們的關係和差異是什麼?

CompletionHandlerCompletableFuture兩種不同的用於Java中異步編程的方法嗎?

或者他們一起使用?

謝謝。

+1

你爲什麼試圖比較這些? ['CompletionHandler'](https://docs.oracle.com/javase/8/docs/api/java/nio/channels/CompletionHandler.html)是_A處理程序,用於使用異步I/O操作的結果_,意味着與NIO的異步通道一起使用。 'CompletableFuture'只是一個支持完成的通用未來/承諾。我不明白你想從中提取什麼。你能澄清嗎? –

+0

@SotiriosDelimanolis我想知道什麼時候用哪個。 「CompletionHandler」僅用於IO操作和NIO的異步通道,「completableFuture」可用於IO操作但不與NIO的異步通道一起使用,這是否正確? – Tim

+0

我不知道_only_。 'AsynchronousSocketChannel'是在Java 7中引入的,它的方法([eg。](https://docs.oracle.com/javase/8/docs/api/java/nio/channels/AsynchronousSocketChannel.html#write-java.nio .ByteBuffer-A-java.nio.channels.CompletionHandler-))期望'CompletionHandler'。 'CompletableFuture'是獨立於Java 8中引入的。 –

回答

1

如果你看看CompletableFuture(自Java 8以來),你會注意到它具有大量的功能,允許比回調更多的功能。與鏈接,組合和其他有趣的功能。

CompletionHandler(自Java 7以來)相比,差別應該是顯而易見的。

沒有什麼能夠阻止你使用兩者,根據你使用的API類型,甚至有必要使用它,但如果你有機會使用CompletableFuture,那麼你確實不需要使用CompletionHandler

+0

謝謝。我什麼時候可以使用? – Tim

+0

你會知道什麼時候到來。 – Kayaman

+0

請賜教。 – Tim

2

CompletionHandler<V, A>是NIO異步通道的完成接口。在Java 8引入lambda表達式並將它們轉換爲函數接口(使用單一抽象方法的接口)之前的幾年,Java 7引入了它,所以它有兩種方法,即completed(V result, A attachment)failed(Throwable ex, A attachment),而不是(現在)更舒適的單一方法。

CompletableFuture<T>CompletionStage<T>接口的實現。

它是在Java 8中引入的,所以它的靜態方法來構建期貨及其實例方法來構建延續使用函數接口,您可以使用它來舒適地使用lambda表達式。

你可以用每NIO的異步調用使用CompletableFuture<T>

import java.lang.*; 
import java.net.*; 
import java.nio.*; 
import java.nio.channels.*; 
import java.util.*; 
import java.util.concurrent.*; 

public class AsynchronousCompletionHandler<T> implements CompletionHandler<T, CompletableFuture<T>> { 
    public void completed(T result, CompletableFuture<T> attachment) { 
     attachment.complete(result); 
    } 

    public void failed(Throwable ex, CompletableFuture<T> attachment) { 
     attachment.completeExceptionally(ex); 
    } 

    private static final ConcurrentHashMap<Class<?>, AsynchronousCompletionHandler<?>> cache = new ConcurrentHashMap<>(); 

    static <T> AsynchronousCompletionHandler<T> getInstance(Class<T> clazz) { 
     @SuppressWarnings("unchecked") 
     AsynchronousCompletionHandler<T> handler = (AsynchronousCompletionHandler<T>)cache.computeIfAbsent(clazz, c -> new AsynchronousCompletionHandler<T>()); 
     return handler; 
    } 

    // 
    // AsynchronousByteChannel 
    public static CompletableFuture<Integer> readAsync(AsynchronousByteChannel channel, ByteBuffer dst) { 
     CompletableFuture<Integer> completableFuture = new CompletableFuture<>(); 
     channel.read(dst, completableFuture, getInstance(Integer.class)); 
     return completableFuture; 
    } 

    public static CompletableFuture<Integer> writeAsync(AsynchronousByteChannel channel, ByteBuffer src) { 
     CompletableFuture<Integer> completableFuture = new CompletableFuture<>(); 
     channel.write(src, completableFuture, getInstance(Integer.class)); 
     return completableFuture; 
    } 

    // 
    // AsynchronousFileChannel 
    public static CompletableFuture<FileLock> lockAsync(AsynchronousFileChannel channel) { 
     CompletableFuture<FileLock> completableFuture = new CompletableFuture<>(); 
     channel.lock(completableFuture, getInstance(FileLock.class)); 
     return completableFuture; 
    } 

    public static CompletableFuture<FileLock> lockAsync(AsynchronousFileChannel channel, long position, long size, boolean shared) { 
     CompletableFuture<FileLock> completableFuture = new CompletableFuture<>(); 
     channel.lock(position, size, shared, completableFuture, getInstance(FileLock.class)); 
     return completableFuture; 
    } 

    public static CompletableFuture<Integer> readAsync(AsynchronousFileChannel channel, ByteBuffer dst, long position) { 
     CompletableFuture<Integer> completableFuture = new CompletableFuture<>(); 
     channel.read(dst, position, completableFuture, getInstance(Integer.class)); 
     return completableFuture; 
    } 

    public static CompletableFuture<Integer> writeAsync(AsynchronousFileChannel channel, ByteBuffer src, long position) { 
     CompletableFuture<Integer> completableFuture = new CompletableFuture<>(); 
     channel.write(src, position, completableFuture, getInstance(Integer.class)); 
     return completableFuture; 
    } 

    // 
    // AsynchronousServerSocketChannel 
    public static CompletableFuture<AsynchronousSocketChannel> acceptAsync(AsynchronousServerSocketChannel channel) { 
     CompletableFuture<AsynchronousSocketChannel> completableFuture = new CompletableFuture<>(); 
     channel.accept(completableFuture, getInstance(AsynchronousSocketChannel.class)); 
     return completableFuture; 
    } 

    // 
    // AsynchronousSocketChannel 
    public static CompletableFuture<Void> connectAsync(AsynchronousSocketChannel channel, SocketAddress remote) { 
     CompletableFuture<Void> completableFuture = new CompletableFuture<>(); 
     channel.connect(remote, completableFuture, getInstance(Void.class)); 
     return completableFuture; 
    } 

    public static CompletableFuture<Long> readAsync(AsynchronousSocketChannel channel, ByteBuffer[] dsts, int offset, int length, long timeout, TimeUnit unit) { 
     CompletableFuture<Long> completableFuture = new CompletableFuture<>(); 
     channel.read(dsts, offset, length, timeout, unit, completableFuture, getInstance(Long.class)); 
     return completableFuture; 
    } 

    public static CompletableFuture<Integer> readAsync(AsynchronousSocketChannel channel, ByteBuffer dst) { 
     CompletableFuture<Integer> completableFuture = new CompletableFuture<>(); 
     channel.read(dst, completableFuture, getInstance(Integer.class)); 
     return completableFuture; 
    } 

    public static CompletableFuture<Integer> readAsync(AsynchronousSocketChannel channel, ByteBuffer dst, long timeout, TimeUnit unit) { 
     CompletableFuture<Integer> completableFuture = new CompletableFuture<>(); 
     channel.read(dst, timeout, unit, completableFuture, getInstance(Integer.class)); 
     return completableFuture; 
    } 

    public static CompletableFuture<Long> writeAsync(AsynchronousSocketChannel channel, ByteBuffer[] srcs, int offset, int length, long timeout, TimeUnit unit) { 
     CompletableFuture<Long> completableFuture = new CompletableFuture<>(); 
     channel.write(srcs, offset, length, timeout, unit, completableFuture, getInstance(Long.class)); 
     return completableFuture; 
    } 

    public static CompletableFuture<Integer> writeAsync(AsynchronousSocketChannel channel, ByteBuffer src) { 
     CompletableFuture<Integer> completableFuture = new CompletableFuture<>(); 
     channel.write(src, completableFuture, getInstance(Integer.class)); 
     return completableFuture; 
    } 

    public static CompletableFuture<Integer> writeAsync(AsynchronousSocketChannel channel, ByteBuffer src, long timeout, TimeUnit unit) { 
     CompletableFuture<Integer> completableFuture = new CompletableFuture<>(); 
     channel.write(src, timeout, unit, completableFuture, getInstance(Integer.class)); 
     return completableFuture; 
    } 
} 

樣例用法(僅用於闡述;沒有錯誤處理,不處理資源):

import static AsynchronousCompletionHandler; 

AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open(); 
serverChannel.bind(new InetSocketAddress(5000)); 
ByteBuffer buffer = ByteBuffer.allocateDirect(1024); 
acceptAsync(serverChannel) 
    .thenCompose(clientChannel -> readAsync(clientChannel, buffer)) 
    .thenAccept(readBytes -> System.out.format("read %d bytes from client%n", readBytes));