2015-10-24 103 views
1

我仍在學習RxJava。在另一個流中使用流的最佳方式是什麼?還是違背了反應式編程的原則?RxJava:輸入一個流(Observable)作爲另一個流的輸入

我試圖寫一個玩具的例子包括一個TCP客戶端和一個發回大寫輸入的服務器。我想從標準輸入中獲取輸入,將其發送到服務器並打印出客戶端和服務器都收到的所有內容。

以下是從程序預期輸出:

(User input) apple 
Server received: apple 
Client received: APPLE 
(User input) peach 
Server received: peach 
Client received: PEACH 

我能夠實現這個使用三個觀測:

  • stdinStream表示從標準輸入發射串,
  • serverStream發出服務器收到的字符串
  • clientStream發出字符串客戶端收到。

,然後從認購inputStream創造clientStream內,像這樣:

private Observable<String> createClientStream(String host, int port, Observable<String> inputStream) { 
    return Observable.create(sub -> { 
     try (Socket socket = new Socket(host, port); 
      BufferedReader inFromServer = new BufferedReader(new InputStreamReader(socket.getInputStream())); 
      DataOutputStream outputStream = new DataOutputStream(socket.getOutputStream()); 
      PrintWriter outWriter = new PrintWriter(outputStream, true); 
     ) { 
      inputStream.subscribe(line -> { 
       outWriter.println(line); 
       try { 
        sub.onNext(inFromServer.readLine()); 
       } catch (IOException e) { 
        sub.onError(e); 
       } 
      }); 
     } catch (UnknownHostException e) { 
      sub.onError(e); 
     } catch (IOException e) { 
      sub.onError(e); 
     } 
    }); 
} 

注:我不想創建多個客戶,寧願保持單一的客戶端運行,指示它根據輸入發送不同的值給服務器。因此,輸入映射到一個新的clientStream的做法是不期望:

stdinStream.map(line -> createClientStream(line))

所以我的問題是:

  1. 這是使用RxJava一個明智的方法是什麼?有更好的選擇嗎?
  2. 我創建了客戶端套接字作爲創建clientStream的一部分。我這樣做是爲了讓我可以使用調度程序異步地運行它,clientStream.scheduleOn(Schedulers.newThread)。考慮到我的單一客戶要求,也許我應該採取不同的做法?

下面是完整的代碼:https://gist.github.com/lintonye/25af58abdfcc688ad3c3

回答

2

你需要的是using。將所有與套接字相關的對象放入Connection類中,並給定輸入序列,將其映射到一對println/readLine,同時保持單個連接。這是gist for a runnable example

static class Connection { 
    Socket socket; 
    BufferedReader inFromServer; 
    DataOutputStream outputStream; 
    PrintWriter outWriter; 

    public Connection(String host, int port) { 
     try { 
      socket = new Socket(host, port); 
      inFromServer = new BufferedReader(
       new InputStreamReader(socket.getInputStream())); 
      outputStream = new DataOutputStream(socket.getOutputStream()); 
      outWriter = new PrintWriter(outputStream, true); 
     } catch (IOException ex) { 
      Exceptions.propagate(ex); 
     } 
    } 

    public void close() { 
     try { 
      outWriter.close(); 
      outputStream.close(); 
      inFromServer.close(); 
      socket.close(); 
     } catch (IOException ex) { 
      Exceptions.propagate(ex); 
     } 
    } 
} 

public static void main(String[] args) { 
    runServer(); 

    Observable<String> source = Observable.just("a", "b", "c"); 

    String host = "localhost"; 
    int port = 8080; 

    Observable.<String, Connection>using(() -> new Connection(host, port), 
    conn -> 
     source 
     .map(v -> { 
      conn.outWriter.println(v); 
      try { 
       return conn.inFromServer.readLine(); 
      } catch (IOException ex) { 
       throw Exceptions.propagate(ex); 
      } 
     }) 
    , Connection::close) 
    .subscribe(System.out::println); 
} 
相關問題