2009-09-18 19 views
5

動機我可以從併發線程調用XMPPConnection.sendPacket嗎?

我想額外的眼睛,以確認我能夠調用此方法XMPPConnection.sendPacket( 包)兼任。對於我當前的代碼,我以串行方式調用可調參數列表(最多3個)。每個Callable在一個XMPPConnection上發送/接收XMPP數據包。我打算通過分離多個線程來並行化這些Callables,每個Callable將在沒有同步的情況下調用共享XMPPConnection上的sendPacket。

XMPPConnection

class XMPPConnection 
{ 
    private boolean connected = false; 

    public boolean isConnected() 
    { 
     return connected; 
    } 

    PacketWriter packetWriter; 

    public void sendPacket(Packet packet) 
    { 
     if (!isConnected()) 
      throw new IllegalStateException("Not connected to server."); 

     if (packet == null) 
      throw new NullPointerException("Packet is null."); 

     packetWriter.sendPacket(packet); 
    } 
} 

PacketWriter

class PacketWriter 
{ 
    public void sendPacket(Packet packet) 
    { 
     if (!done) { 
      // Invoke interceptors for the new packet 
      // that is about to be sent. Interceptors 
      // may modify the content of the packet. 
      processInterceptors(packet); 

      try { 
       queue.put(packet); 
      } 
      catch (InterruptedException ie) { 
       ie.printStackTrace(); 
       return; 
      } 
      synchronized (queue) { 
       queue.notifyAll(); 
      } 

      // Process packet writer listeners. Note that we're 
      // using the sending thread so it's expected that 
      // listeners are fast. 
      processListeners(packet); 
    } 

    protected PacketWriter(XMPPConnection connection) 
    { 
     this.queue = new ArrayBlockingQueue<Packet>(500, true); 
     this.connection = connection; 
     init(); 
    } 
} 

我能得出什麼結論

由於PacketWriter使用的BlockingQueue,還有把我的意圖來調用沒問題SE ndPacket來自多個線程。我對麼 ?

回答

0

您還沒有在這裏提供足夠的信息。

我們不知道該怎麼以下實現:

  • processInterceptors
  • processListeners

誰在讀/寫 '做' 變量?如果一個線程將其設置爲true,那麼所有其他線程都將默默失敗。

快速一瞥,這看起來不是線程安全的,但無法確定您發佈的內容。

其他問題:

  • 爲什麼PacketWriter XMPPConnectionwhen的類成員它只有一個方法使用?
  • 爲什麼PacketWriter有一個XMPPConnection成員var並且不使用它?
0

如果您可以限制爲Java 5+,則可以考慮使用BlockingQueue。

從Java API文檔,對於小的修改使用ArrayBlockingQueue:

class Producer implements Runnable { 
    private final BlockingQueue queue; 
    Producer(BlockingQueue q) { queue = q; } 
    public void run() { 
    try { 
     while(true) { queue.put(produce()); } 
    } catch (InterruptedException ex) { ... handle ...} 
    } 
    Object produce() { ... } 
} 

class Consumer implements Runnable { 
    private final BlockingQueue queue; 
    Consumer(BlockingQueue q) { queue = q; } 
    public void run() { 
    try { 
     while(true) { consume(queue.take()); } 
    } catch (InterruptedException ex) { ... handle ...} 
    } 
    void consume(Object x) { ... } 
} 

class Setup { 
    void main() { 
    BlockingQueue q = new ArrayBlockingQueue(); 
    Producer p = new Producer(q); 
    Consumer c1 = new Consumer(q); 
    Consumer c2 = new Consumer(q); 
    new Thread(p).start(); 
    new Thread(c1).start(); 
    new Thread(c2).start(); 
    } 
} 

爲您的使用情況你有你的真正的發件人(實際連接的持有人)是消費者,和包編制/發件人是生產者。

另一個有趣的附加思想是,您可以使用PriorityBlockingQueue來允許Flash在其他任何等待數據包之前重寫XMPP數據包。

此外,格倫在設計上的觀點是好點。您可能想看看Smack API(http://www.igniterealtime.org/projects/smack/),而不是創建自己的。

2

是的,你可以發送不同線程的數據包沒有任何問題。

Smack阻塞隊列是因爲你不能做的就是讓不同的線程同時寫入輸出流。 Smack負責通過將數據寫入每個數據包粒度來同步輸出流。

Smack實現的模式只是一個典型的生產者/消費者併發模式。你可能有幾個生產者(你的線程),只有一個消費者(Smack的PacketWriter運行在它自己的線程中)。

問候。

相關問題