2013-01-04 71 views
4

假設以下情形:合作流消費者的設計理念?

我們已經從消耗字節流的一些庫的Java類,說一些XML解析器XmlParser1暴露的方法xmlParser1.parse(inputStream);該方法通常會在一次調用中吃掉所有字節,最終會被阻塞。 我們還有另外一類,來自其他一些庫,類似地做了不同的實現:XmlParser2xmlParser2.parse(inputStream)。 現在,我們想要用兩個解析器解析單個流

我的第一個答案是:我們搞砸了。由於我們無法控制每個類如何消耗流,因此我們只能緩衝內存中的所有字節或臨時文件(如果可能,請打開/重新打開它)。這些消費者的API本質上是不合作的。

現在,假設我們對XmlParser1(實現和簽名)有控制權,並且我們希望以更靈活和合作的方式對其進行編碼,以便調用者可以以一種合理和有效的方式實現上述行爲......你會建議嗎?

一些替代方案,我考慮:

1)請XmlParser1實施FilterInputStream,這樣,當某一類(XmlParser1)attemps來讀取它的一些字節,它在內部解析它具有(迭代,也許一些合理的緩衝)並且還返回原始字節。 (這並不完全對應於FilterInputStream的概念,我會說)。通過這種方式,客戶端代碼可能鏈中的解析器簡單:

public class XmlParser1 extends FilterInputStream { 
     public XmlParser1(InputStream rawInputStream) { ... } 
     public int read(byte[] b, int off, int l) throws IOException { 
      // this would invoke the underlying stream read, parse internall the read bytes, 
      // and leave them in the buffer 
     } 
    } 

    XmlParser1 parser1 = new XmlParser1(inputstream); 
    XmlParser2 parser2 = new XmlParser2(parse); 
    parser2.parse(); // parser2 consumes all the input stream, which causes parser1 to read an parse it too 

2)而不是在字節XmlParser1消費者有關的,把它當成一個:我們不會讓它吃本身就是字節,我們將用它來填充。所以,相反的xmlParser1.parse(inputStream)我們可以有 xmlParser1.write(byte[]) ...也就是說,而不是傳遞一個InputStream我們會做一個OutputStream。這將允許客戶端創建一個TeeInputStream,將字節透明地傳遞給XmlParser2類,並且同時調用XmlParser1.write()

請注意,我們在任何情況下都不需要單獨的線程。

我不確定哪一個(如果有的話)在概念上更可取,如果有更好的選擇。這聽起來像是一個應該已經討論過的設計問題,但我沒有發現太多東西 - 不一定只限於Java。歡迎意見和參考。

+0

也檢查PipeInputStream/PipeOutputStream類。 –

+0

您如何看待我的回答?我只是問,因爲我爲您提供了最乾淨的解決方案:使對象脫離流,然後處理對象。但是我也知道另一個解決方案,比方說一個「解決方法」,我認爲這不是最好的實現方式,而只是一種可能的解決方案。不過,我不知道你爲什麼要多次解析同一個流,因爲這會消耗CPU的能力。但是,我也不知道你的實現,可能存在一些罕見的特殊情況,這樣做是適當的。你能告訴我們更多一點嗎?爲什麼你想分析兩次相同的流? – Marcus

+0

@Marcus:假設我更喜歡XmlParser2,但XmlParser1給了我一些附加信息。我的具體情況是:我有一個我想用的PngReader2(因爲它直接給我我想要的圖像格式),而且還有一個解析元數據的PngReader1,我希望這兩件事情都是。 – leonbloy

回答

1

假設兩個分析器在兩個單獨的線程上運行,也可能是這樣的(不是一個工作碼)

public class Test extends FilterInputStream { 
    byte[] buf = new byte[8192]; 
    int len; 
    Thread thread = null; 

    @Override 
    public synchronized int read(byte[] b, int off, int l) throws IOException { 
     while (thread == Thread.currentThread() && len > 0) { 
      thread.wait(); 
     } 
     if (len > 0) { 
      System.arraycopy(buf, 0, b, off, l); 
      len = 0; 
      return l; 
     } 
     len = super.read(b, off, l); 
     System.arraycopy(b, off, buf, 0, len); 
     thread = Thread.currentThread(); 
     notify(); 
     return len; 
    } 

即#1讀取字節,並將它們保存在buf中,由#下一次嘗試1被阻塞,直到#2從緩衝器

0

我試圖拉動otiginal輸入流中的Apache的百科全書TeeInputStream創建一個OutputStream讀取所有。 http://commons.apache.org/io/api-release/org/apache/commons/io/input/TeeInputStream.html

作爲由IeeInputStream寫入的OutputStream,我使用了一個java PipedOutputStream。

我把這個PipedOutputStream連接到一個java PipedInputStream。

這使我可以閱讀TeeInputStream和PipedInputStream。不知道它是否適合你,或者至少是否會提供下一步。

我創建了一個ReaderThread類來檢查,如果我能在平行閱讀:

private static class ReaderThread extends Thread 
    { 
    InputStream inStream; 
    int threadId; 
    public ReaderThread(int threadId, InputStream inStream) 
    { 
     this.inStream = inStream; 
     this.threadId = threadId; 
    } 

    @Override 
    public void run() 
    { 
     try 
     { 
     int c = inStream.read(); 
     while (c != -1) 
     { 
      System.out.println("From ("+threadId+ ") "+c); 
      c = inStream.read(); 
     } 
     } 
     catch (Exception e) 
     { 
     e.printStackTrace(); 
     } 
    } 
    } 

然後將其從下面的代碼驅動:

InputStream inStream = new FileInputStream(fileName); 
PipedInputStream pipedInStream = new PipedInputStream(); 
OutputStream pipedOutStream = new PipedOutputStream(pipedInStream); 
TeeInputStream tin = new TeeInputStream(inStream, 
    pipedOutStream); 

ReaderThread firstThread = new ReaderThread(1,tin); 
ReaderThread secondThread = new ReaderThread(2,pipedInStream); 

firstThread.start(); 
secondThread.start(); 
1

如果你的線程在同一臺服務器上,你的想法分裂InputStreams沒有任何意義。由於您仍然只使用一個InputStream和一個BufferedInputStream來獲取數據,因此請從InputStream中創建對象,然後在兩個不同的正在運行的線程中使用這些對象。結論:從來沒有必要在Java中隨時阻止任何InputStream。我甚至會認爲它是有害的,因爲如果你阻塞,如果你的緩衝區或管道流過了會發生什麼?隊列溢出!

編輯:如果要停止流,你需要告訴發件人沒有任何更多的發送任何數據或者你不喜歡它的YouTube,他們切片的視頻分成部分(即1份爲1分鐘),並且只能立即預加載這些部分,因此停止播放視頻根本不會影響預加載,因爲它只會在時間軸上達到某個位置時預先加載(如45秒,1分45秒,2分45秒,aso)嗯,這實際上只是一種預加載技術,並沒有真正的流式傳輸,這就是爲什麼Youtube不需要混淆包丟失的原因。)

但是,我仍然有幾行僞碼,客戶:

BufferedOutputStream bos = new BufferedOutputStream(/*yourBasicInputStream*/); 
ObjectOutputStream oos = new ObjectOutputStream(bos); //Or use another wrapper 
oos.writeObject(yourObjectToSend);  //Or use another parser: Look into the API: ObjectInputStream 

主線程控制器內的類變量(又名服務器):

Thread thread1; //e.g. a GUI controller 
Thread thread2; //e.g. a DB controller 

服務器(或其他服務器線程由服務器啓動,與兩個線程作爲參數):

BufferedInputStream bis = new BufferedInputStream(/*yourBasicInputStream*/); 
ObjectInputStream ois = new ObjectInputStream(bis); //Or use another wrapper 
//now we use an interface MyNetObject implementing the method getTarget(), but 
//also an abstract class would be possible (with a more complex getTarget-method): 
MyNetObject netObject = (MyNetObject) ois.readObject(); //Or use another parser... 
if(netObject.getTarget()=="Thread1ClassANDThread2Class"){ 
    thread1.activateSync(netObject);  //notify... 
    thread2.activateSync(netObject);  //...both threads! 
} 
else if(netObject.getTarget()=="Thread1Class"){ 
    thread1.activate(netObject);  //give thread1 a notification 
} 
else if(netObject.getTarget()=="Thread2Class"){ 
    thread2.activate(netObject);  //give thread2 a notification 
} 
else {//do something else...} 

不要忘記同步「activateSync(netObject)」 - 方法,但前提是隻需要對對象進行任何更改(不需要同步讀數,只需寫入):

public void activateSync(MyNetObject netObject){ 
    synchronize(netObject){ 
     //do whatever you wanna do with the object...the other thread will always get the actual object! 
    } 
} 

這很簡單,快速,一致並且完全面向對象。希望你會明白。 ;)

UPDATE:

明白,流或閱讀器實際上也是 「分析器」 這是非常重要的。有一個重要的區別:流(通常)是網絡驅動的類,用於寫和讀任何類型的數據 - 除了字符。閱讀器被用來讀取任何類型的文本/字符。因此,您的正確實施將是:使用某些流讀取傳入數據包,然後將數據存儲到適當的對象中。那麼你有一個通用的對象,你可以在任何類型的閱讀器中使用。如果你只是一個圖片閱讀,你可以嘗試解析器readUTF()在類ObjectInputStreamhttp://docs.oracle.com/javase/1.4.2/docs/api/java/io/ObjectInputStream.html),它產生一個字符串:

BufferedInputStream bis = new BufferedInputStream(/*yourBasicInputStream*/); 
ObjectInputStream ois = new ObjectInputStream(bis); 
String string = ois.readUTF(); //Or another usable parser/method 
XmlParser1.read(string);  //for reads there is... 
XmlParser2.read(string);  //...no synchronisation needed! 

現在,唯一剩下的東西是教語法分析器如何讀取該字符串。而對象字符串本身可以被視爲「下沉」。如果這不起作用,只需找到另一個解析器/方法來創建「sink」對象。

請注意,這裏討論的解決方案 - 使用類ObjectInputStream和適當的解析器 - 在許多情況下也適用於大數據(然後在發送之前,您只需將1GB的文件分割成幾個字符串/對象「數據包」在網上,像洪流一樣)。但它不適用於視頻/音頻流式傳輸,您可能會丟包,無論如何需要完全不同的解決方案(這本身就是一門科學:http://www.google.ch/search?q=video+stream+packet+drop)。

0

這是有點不清楚XmlParse1和XmlParser2內部發生了什麼,但假設他們真的關心最終的XML數據而不是InputStream字節,我會切換到StAX XMLEvent api。您可以使兩個解析器都實現XMLEventConsumer。然後,你只是有一個外環,它解析的實際流和傳遞事件給廣大消費者:

public void parseXml(InputStream stream) { 
    XMLEventReader reader = ...; // convert stream into XMLEventReader 

    XMLConsumer[] consumers = new XMLConsumer[]{new XmlParser1(), new XmlParser2()}; 

    while(reader.hasNext()) { 
    XMLEvent event = reader.nextEvent(); 
    for(XMLConsumer consumer : consumers) { 
     consumer.add(event)); 
    } 
} 
0

你說「這些消費者的API是固有的非合作。」所以不要試圖讓他們,保持隔離,並給他們他們想要的。分開輸入流。

有一個線程讀取實際輸入流並寫入兩個輸出流。

然後讓那些輸出流的輸入流,你可以用管道流

pipedInputStream1 =新的PipedInputStream(pipedOutputStream1)做到這一點;

ByteArrayInputStream進行(((ByteArrayOutputStream)byteOutputStream1).toByteArray());