2015-03-24 84 views
0

我在讀取UDP源,然後解碼並寫入MSMQ(消息隊列)。鎖是否正確完成

我創建了一個調用UDPReader的新線程。 UDPReader又創建一個線程池並調用類ipaddrConnection。在ipaddrConnection內部運行包含一個while循環,它不斷地從組播套接字中讀取數據包並將其推送到類parseUDP。從parseUDP解碼並最終推送到寫入MSMQ的類。我相信我在ipaddrConnection的while循環中沒有正確地鎖定線程,因爲線程正在嘗試寫入MSMQ中的相同內存位置。我認爲通過將我的鎖放在while循環中,池中的每個線程都會在「關鍵部分」中擁有自己的時間。1.接收數據包,然後2.解碼並寫入MSMQ。我仍然在學習併發性並尋求一些幫助。我提供了一個崩潰轉儲文件,我不明白如何正確讀取我的UDPReader和ipaddrConnection類。 parseUDP調用一個類來解碼數據包,並且該類調用一個MSMQ類來寫入內存。所有這些都在我的關鍵部分。

class UDPReader implements Runnable 
{ 
    private final String ip, socket, queue, threadName; 
    private final JTextArea screen; 

    UDPReader(String ip, String socket, String queue, String threadName, JTextArea screen) 
    { 
     this.ip = ip; 
     this.socket = socket; 
     this.queue = queue; 
     this.threadName = threadName; 
     this.screen = screen; 
    } 

    public void run() 
    { 
     screen.append("Thread " + threadName + " running\n\n"); 
     ExecutorService executor = Executors.newFixedThreadPool(5); 
     Runnable reader = new ipaddrConnection(ip, socket, queue); 
     executor.execute(reader); 
    } 

} 

public final class ipaddrConnection implements Runnable 
{ 
    private final ReentrantLock lock = new ReentrantLock(); 
    byte[] bytes = new byte[(int)100000]; 
    InetAddress group; 
    MulticastSocket s; 
    DatagramPacket packet = new DatagramPacket(bytes, bytes.length); 
    private String queue; 

    public ipaddrConnection(String ip, String socket, String queue) { 
     try { 
      this.s = new MulticastSocket(Integer.parseInt(socket)); 
      this.group = InetAddress.getByName(ip); 
      this.queue = queue; 
     } catch (IOException ex) { 
      Logger.getLogger(ipaddrConnection.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 

     @Override 
     public void run() { 
      try { 
       parseUDP p = new parseUDP(queue); 
       s.joinGroup(group); 
       s.setSoTimeout(95000); 

       try{ 
        while(true){ 
         lock.lock(); 
         s.receive(packet); 
         p.parseUDP(packet.getData()); 
        } 
       }finally { 
        lock.unlock(); 
       } 


      } catch (SocketException ex) { 
       Logger.getLogger(ipaddrConnection.class.getName()).log(Level.SEVERE, null, ex); 
      } catch (IOException ex) { 
       Logger.getLogger(ipaddrConnection.class.getName()).log(Level.SEVERE, null, ex); 
      } 
     } 

} 

崩潰報告 https://drive.google.com/file/d/0B4GWNCU6_CBlM2tJNGJqNzRVazg/view?usp=sharing

+0

很少人會刻意去點擊通過我們的玩具崩潰報告文件。這不是一個調試服務。在這裏發佈崩潰報告的相關細節。如果你不確定什麼是相關的,那麼你需要做更多的研究。 – 2015-03-24 22:22:25

+1

現在,每個線程都有自己的鎖。線程只使用自己的鎖,所以永遠不會有兩個線程試圖鎖定同一個鎖,所以就好像你沒有使用鎖。 – immibis 2015-03-24 22:24:57

+0

@immibis你是對的。您應該將其作爲答案而不是評論發佈。我正要去。 – Persixty 2015-03-24 22:25:55

回答

0

在你的代碼,你的鎖沒有做任何有用的。

每個線程都有自己的鎖,因此一次可以有多個線程使用隊列(因爲線程1鎖定了鎖1,線程2鎖定了鎖2,並且沒有什麼能夠阻止他們使用隊列同時)。

如果您在代碼中設置lock字段static,則線程將全部使用相同的鎖。

你可能還是有問題的,因爲線程從未解除鎖定(除非他們遇到例外),所以只有一個線程將被允許做的工作:

try{ 
    while(true){ 
     lock.lock(); 
     s.receive(packet); 
     p.parseUDP(packet.getData()); 
    } 
}finally { 
    lock.unlock(); 
} 

注意如何的唯一途徑線程可以解鎖如果有異常,鎖定是否存在?

威力都想要更多的東西是這樣的:

while(true) { 
    s.receive(packet); 
    try { 
     lock.lock(); 
     s.parseUDP(packet.getData()); 
    } finally { 
     lock.unlock(); 
    } 
} 

- 這種結構,而他們正在分析數據包的線程將只持有鎖,而不是在他們收到的數據包。 (我不知道這是否是你真正想要什麼)

+0

如果我在UDPReader類中創建靜態鎖,然後將其作爲靜態變量傳遞給ipaddrConnection類,還是應該在ipaddrConnection類中初始化它,請問是否有關係? – MadMardigans 2015-03-25 14:13:29

0
ExecutorService executor = Executors.newFixedThreadPool(5); 
Runnable reader = new ipaddrConnection(ip, socket, queue); 
executor.execute(reader); 

此代碼,實際上,單線程因爲雖然池有五個線程,您使用了唯一的一個。

+0

是的,但即使如此,我想確保我正確應用鎖定。我總是可以調用更多的實例來創建更多的線程 – MadMardigans 2015-03-25 14:03:38

0
  1. 有UDPReader實施Runnable,它的run()實現至少是不地道。
  2. 正如immibis所述,您的鎖定對象不會在線程之間共享,並且它們不提供您正在尋找的保護。
  3. 只有當您退出while (true) { ... }這就是說永遠不會解鎖。考慮到這一點,你可能要考慮是這樣的:

    public class UDPReader { 
         ... 
    
         UDPReader(String ip, String socket, String queue, String threadName, JTextArea screen, numberOfThreads) { 
          ... 
          this.numberOfThreads = numberOfThreads; 
          this.lock = new ReentrantLock(); 
         } 
    
         public void run() { 
          ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); 
          for (int i = 0; i < numberOfThreads; i++){ 
           executor.execute(new ipaddrConnection(ip, socket, queue, lock)); 
          } 
         } 
        } 
    
    
        public final class ipaddrConnection implements Runnable { 
         private lock ; 
         ... 
    
         public ipaddrConnection(String ip, String socket, String queue, ReentrantLock lock) { 
          ... 
          this.lock = lock; 
         } 
    
         @Override 
         public void run() { 
           ... 
           while (true) { 
            try { 
             lock.lock(); 
             s.receive(packet); 
             p.parseUDP(packet.getData()); 
            } finally { 
             lock.unlock(); 
            } 
           } 
          .... 
         } 
        } 
    } 
    
相關問題