我仍在學習RxJava。在另一個流中使用流的最佳方式是什麼?還是違背了反應式編程的原則?RxJava:輸入一個流(Observable)作爲另一個流的輸入
我試圖寫一個玩具的例子包括一個TCP客戶端和一個發回大寫輸入的服務器。我想從標準輸入中獲取輸入,將其發送到服務器並打印出客戶端和服務器都收到的所有內容。
以下是從程序預期輸出:
(User input) apple
Server received: apple
Client received: APPLE
(User input) peach
Server received: peach
Client received: PEACH
我能夠實現這個使用三個觀測:
stdinStream
表示從標準輸入發射串,serverStream
發出服務器收到的字符串clientStream
發出字符串客戶端收到。
,然後從認購inputStream
創造clientStream
內,像這樣:
private Observable<String> createClientStream(String host, int port, Observable<String> inputStream) {
return Observable.create(sub -> {
try (Socket socket = new Socket(host, port);
BufferedReader inFromServer = new BufferedReader(new InputStreamReader(socket.getInputStream()));
DataOutputStream outputStream = new DataOutputStream(socket.getOutputStream());
PrintWriter outWriter = new PrintWriter(outputStream, true);
) {
inputStream.subscribe(line -> {
outWriter.println(line);
try {
sub.onNext(inFromServer.readLine());
} catch (IOException e) {
sub.onError(e);
}
});
} catch (UnknownHostException e) {
sub.onError(e);
} catch (IOException e) {
sub.onError(e);
}
});
}
注:我不想創建多個客戶,寧願保持單一的客戶端運行,指示它根據輸入發送不同的值給服務器。因此,輸入映射到一個新的clientStream
的做法是不期望:
stdinStream.map(line -> createClientStream(line))
所以我的問題是:
- 這是使用RxJava一個明智的方法是什麼?有更好的選擇嗎?
- 我創建了客戶端套接字作爲創建
clientStream
的一部分。我這樣做是爲了讓我可以使用調度程序異步地運行它,clientStream.scheduleOn(Schedulers.newThread)
。考慮到我的單一客戶要求,也許我應該採取不同的做法?
下面是完整的代碼:https://gist.github.com/lintonye/25af58abdfcc688ad3c3