2013-11-22 84 views
0

我正在爲Java應用程序編寫一個插件。外部軟件與此應用程序建立TCP連接,並將消息作爲UTF-8編碼的JSON對象發送給我的插件。每條消息由分隔符分隔。我目前使用"\u00A1"¡)作爲分隔符。Byte []輸入到Java掃描器

{ "message": "value" }¡{ "message": "value" }¡{ "message": "value" }... 

由於TCP並沒有提供多少數據將在同一時間到達任何保證,該插件必須接收數據的這種流並拉出每一個人{ "message": "value" }令牌。聽起來很好用java.util.Scanner ..

問題是,應用程序不提供我的插件直接訪問TCP套接字。該插件通過重複調用其方法receiveData(byte[] bytes)來接收數據。我需要某種輸入流或通道,可以從Scanner讀取,但我也可以將字節存入(來自receiveData)。這樣的事情存在嗎?如果沒有,有什麼建議可以實施呢?或者我離開,有沒有更好的方法來處理這個問題?

注:我最初試圖手動實現這個邏輯。我會將每個接收到的字節塊解碼爲一個字符串,搜索分隔符並追加到StringBuilder。然後我意識到這種方法是無效的,因爲傳入的byte[]可能不會在偶數UTF-8字符邊界上結束並且不能正確解碼。我覺得Scanner正是我想要的,我只是無法弄清楚如何提供它的輸入。

編輯:隨着應用程序的運行,數據會持續流式傳輸以更新顯示。不可能等到沒有更多數據開始解析。

+0

基於這個問題,gson不會有幫助。 – DwB

+0

那麼,使用Observer/Notifier模式實現'InputStream'。如果你願意,我可以稍後詳述。 –

回答

0

我最終創建了一個條件變量的循環緩衝區,實現了ReadableByteChannel。似乎工作得不錯。這裏有一個完整的例子:

import java.io.IOException; 
import java.nio.ByteBuffer; 
import java.nio.channels.ReadableByteChannel; 
import java.util.Scanner; 
import java.util.concurrent.locks.Condition; 
import java.util.concurrent.locks.Lock; 
import java.util.concurrent.locks.ReentrantLock; 


public class Buffer implements ReadableByteChannel { 

    private final Lock lock = new ReentrantLock(); 
    private final Condition notEmpty = lock.newCondition(); 

    private int readIndex = 0; 
    private int writeIndex = 0; 
    private int size = 0; 

    private final int capacity; 
    private final byte[] buff; 

    public Buffer(int capacity) { 
     this.capacity = capacity; 
     this.buff = new byte[capacity]; 
    } 

    /** 
    * Deposit bytes to the buffer. Will only write until 
    * buffer is full. 
    * @param bytes the bytes to add 
    * @return the number of bytes actually added 
    */ 
    public int addBytes(byte[] bytes) { 
     lock.lock(); 
     int writeCount = 0; 
     try { 
      int available = capacity - size; 
      writeCount = available <= bytes.length ? available : bytes.length; 
      for (int i = 0; i < writeCount; ++i) { 
       buff[writeIndex] = bytes[i]; 
       incrementWrite(); 
       ++size; 
      } 
      // notify callers waiting on read() 
      notEmpty.signal(); 
     } finally { 
      lock.unlock(); 
     } 
     return writeCount; 
    } 

    public int addBytes(byte[] bytes, int offset, int length) { 
     lock.lock(); 
     int writeCount = 0; 
     try { 
      int available = capacity - size; 
      writeCount = available <= length ? available : length; 
      for (int i = 0; i <writeCount; ++i) { 
       buff[writeIndex] = bytes[offset + i]; 
       incrementWrite(); 
       ++size; 
      } 
      notEmpty.signal(); 
     } finally { 
      lock.unlock(); 
     } 
     return writeCount; 
    } 

    @Override 
    public int read(ByteBuffer byteBuffer) throws IOException { 
     lock.lock(); 
     try { 
      // if the current size is 0, wait until data is added 
      while (size == 0) { 
       notEmpty.wait(); 
      } 
      int attempt = byteBuffer.remaining(); 
      int readCount = attempt <= size ? attempt : size; 
      for (int i = 0; i < readCount; ++i) { 
       byteBuffer.put(buff[readIndex]); 
       incrementRead(); 
       --size; 
      } 
      return readCount; 
     } catch (InterruptedException ie) { 
      Thread.currentThread().interrupt(); 
      return 0; 
     } finally { 
      lock.unlock(); 
     } 
    } 

    @Override 
    public boolean isOpen() { 
     return true; 
    } 

    @Override 
    public void close() throws IOException { 
     // do nothing 
    } 

    private final void incrementRead() { 
     // increment and wrap around if necessary 
     if (++readIndex == capacity) { 
      readIndex = 0; 
     } 
    } 

    private final void incrementWrite() { 
     // increment and wrap around if necessary 
     if (++writeIndex == capacity) { 
      writeIndex = 0; 
     } 
    } 

    public static void main(String[] args) { 
     final Buffer buff = new Buffer(1024); 
     final Scanner scanner = new Scanner(buff).useDelimiter("!"); 

     Thread readThread = new Thread(new Runnable() { 
      @Override 
      public void run() { 
       while (scanner.hasNext()) { 
        String message = scanner.next(); 
        System.out.println(message); 
        if (message.equals("goodbye")) { 
         return; 
        } 
       } 
      } 
     }); 
     readThread.start(); 

     buff.addBytes("hello,".getBytes()); 
     buff.addBytes(" world!".getBytes()); 
     buff.addBytes("good".getBytes()); 
     buff.addBytes("bye!".getBytes()); 
    } 
}