2014-01-28 27 views
3

我搜索了高和低,但無法找到一個明確的,最新的答案我有關NIO的問題。InputStream到渠道的選擇器

有什麼辦法可以將InputStream轉換成Channel,我可以使用Selector?看起來像Channels.newChannel()是做轉換的唯一方法,但不提供AbstractSelectableChannel的實例,這實際上是我所需要的。

更具體地說,我想從stdoutstderr中讀取子流程的流,而不是每個流創建一個線程,看起來這是在純Java中執行它的唯一方法。由於這些流正在使用管道來回傳遞I/O,所以我很驚訝.newChannel未返回Pipe.SourceChannel,它是AbstractSelectableChannel的一個子類。

我正在使用Java 7(但如果新功能在8中可用,我仍然很樂意回答)。

編輯:我也嘗試過鑄造的.newChannel()結果爲可選擇的頻道無濟於事 - 它不是一個可選擇通道。

回答

3

有沒有辦法做你在問什麼,但你不需要每個流的線程。只需將流與爲此目的提供的API合併,然後在當前線程中讀取輸出即可。

+0

不幸的是,因爲我的第三方子'stderr'穿插隨機分爲'stdout'我無法合併流(它們沒有被換行分隔,因爲人們希望)。 – Dan

+1

然後你被困在額外的線程中。 – EJP

+0

您可以在沒有額外線程的情況下執行此操作,但它需要一個輪詢循環來分別將'stdout'和'stderr'流捕獲到不同的地方(例如'StringBuilder's)。使用'select'會更好,但看起來不可能做到這一點。 :( –

0

我有同樣的問題,但我可以將子進程的輸出重定向到文件並稍後分析輸出。你可以使用類似下面的代碼,但它有一些缺點,應該添加一些代碼。可以使用一些併發集合來代替鎖定。

import java.io.IOException; 
import java.util.Collection; 
import java.util.HashSet; 
import java.util.Iterator; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.LinkedBlockingQueue; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.locks.Lock; 
import java.util.concurrent.locks.ReentrantLock; 

public class InputPrcocessSelector { 

    final private Collection<Process> registeredProcesses = new HashSet<>(); 
    final private Lock registeredProcessesLock = new ReentrantLock(); 

    final private LinkedBlockingQueue<Process> readyProcesses = new LinkedBlockingQueue<>(); 

    final private ExecutorService executorService = Executors.newSingleThreadExecutor(); 

    final private static TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; 
    final private static int DEFAULT_TIMEOUT = 100; 

    final private int timeOut; 
    final private TimeUnit timeUnit; 

    public InputPrcocessSelector() { 
     this(DEFAULT_TIMEOUT, DEFAULT_TIME_UNIT); 
    } 

    public InputPrcocessSelector(int timeOut, TimeUnit timeUnit) { 
     this.timeOut = timeOut; 
     this.timeUnit = timeUnit; 
     this.executorService.submit(new SelectorTask()); 
    } 

    public boolean register(Process process) { 
     try { 
      registeredProcessesLock.lock(); 
      return registeredProcesses.add(process); 
     } finally { 
      registeredProcessesLock.unlock(); 
     } 
    } 

    public boolean unregister(Process process) { 
     try { 
      registeredProcessesLock.lock(); 
      return registeredProcesses.remove(process); 
     } finally { 
      registeredProcessesLock.unlock(); 
     } 
    } 

    public Collection<Process> select() throws InterruptedException { 
     HashSet<Process> selectedInputs = new HashSet<>(); 

     Process firstProcess = readyProcesses.take(); 
     selectedInputs.add(firstProcess); 

     Process nextProcess = null; 
     while ((nextProcess = readyProcesses.poll()) != null) { 
      selectedInputs.add(nextProcess); 
     } 

     return selectedInputs; 
    } 

    private class SelectorTask implements Runnable { 
     public void run() { 
      while (true) { 
       try { 
        registeredProcessesLock.lock(); 

        Iterator<Process> it = registeredProcesses.iterator(); 
        while (it.hasNext()) { 
         Process p = it.next(); 
         try { 
          int available = p.getInputStream().available(); 
          if (available > 0) 
           readyProcesses.add(p); 
          if (p.isAlive() == false) { 
           System.err.println("Not alive"); 
           it.remove(); 
          } 
         } catch (IOException e) { 
          throw new RuntimeException(e); 
         } 
        } 

       } finally { 
        registeredProcessesLock.unlock(); 
       } 

       try { 
        timeUnit.sleep(timeOut); 
       } catch (InterruptedException e) { 
        throw new RuntimeException(e); 
       } 
      } 
     } 
    } 

}