2011-03-04 162 views
4

我有一個java進程,從套接字服務器讀取數據。因此,我有一個BufferedReader和一個PrintWriter對象與該套接字相對應。java之間共享數據線程

現在在同一個java進程中,我有一個多線程的java服務器,它接受客戶端連接。我想實現的功能,所有這些客戶端,我接受可以從我上面提到的BufferedReader對象讀取數據。(使他們可以複用數據)

如何使這些單獨的客戶端線程讀取數據從BuffereReader單個對象?對不起,我感到困惑。

+1

我假設你說所有的線程都應該得到所有的數據,而不僅僅是誰先讀取數據應該得到一個特定的數據? – johusman 2011-03-04 07:26:54

+0

雅所有他們應該得到的數據,這是目前來自BufferedReader流.. – ayush 2011-03-04 07:28:02

+1

@ayush:我不認爲你理解johusman的問題 - 你是否試圖*拆分*不同客戶端之間的數據,或者應該*全部*參見*全部*的數據?如果數據是0,1,2,3,4,那麼每個客戶端應該看到0,1,2,3,4還是可能一個客戶端看到0,1,3而另一個看到2,4? – 2011-03-04 07:34:18

回答

6

我強烈建議他們不要直接訪問BufferedReader。假設你知道數據的格式,並且每個客戶都試圖讀取相同的格式(如果情況並非如此,我看不出它是如何工作的),我建議創建一個線程從BufferedReader並將工作項目放入Queue。在Java中使用生產者/消費者隊列的示例有批次,這也可能使測試客戶端代碼更容易。

只有一個線程訪問BufferedReader意味着您不必擔心定義所有其他線程都必須滿足的原子操作 - 您的閱讀線程通過決定將工作項添加到隊列。

編輯:如果所有的客戶端都應該看到數據的所有,這鞏固了我更進一步具有單個讀者的建議 - 除了,而不必數據Queue該項目從刪除,你就會有一個收集哪些客戶端可以讀取全部現有的數據。您需要使用適當的線程安全集合,但Java中有很多。

編輯:剛剛閱讀您的評論,其中說每個客戶端應該只看到從閱讀器讀取的最後一項,這使得它更容易。讓一個線程讀取數據,但只保留一個引用「最後一個讀取項」的變量。您可能想要同步對它的訪問或使用AtomicReference,但這兩者都很容易。

+0

所以如果我想所有的線程讀取所有的數據,那麼我該怎麼辦?有一個文件在哪裏寫這個數據,然後通過所有的客戶端同步訪問這個文件..?或者其他一些方法? – ayush 2011-03-04 08:23:44

+0

@ayush:所有的數據,或只是最後一個項目?你是自相矛盾的。無論如何,我的「編輯」段落涵蓋了兩種可能性 - 一個線程讀取數據,但通過AtomicReference或線程安全集合使其可用。 – 2011-03-04 08:28:37

+0

感謝您的解釋..數據將持續不斷地在'BufferedReader'中,我希望所有線程都能讀取該數據(最後一個)..缺少一些數據是可以接受的..但是所有的線程都應該得到相同的數據。 .. – ayush 2011-03-04 08:37:23

1

最簡單的事情可能是創建一個臨時文件並使用java.nio.channels.FileChannel。原因是這已經提供了你想要的讀寫器語義。另一種方法是用內存方案自己重新實現這些語義。

你會做的是從你的閱讀器讀取並附加到文件的末尾。然後您的讀者將獲得最初指向文件末尾的文件通道。當你只是在等待閱讀器的更多輸入時,你必須小心以確保他們不認爲它們已經到達尾聲。您可以讓下游邏輯意識到這一點,或者爲讀者提供某種包裝。

編輯:這是寫的理解,你希望所有的讀者看到的一切。如果這實際上不是你想要的,這太複雜了。

1

如果你不能在內存中保存所有的數據,你可以使用這樣的事情: (使用一個線程監視源的經典觀察者模式和其他人通知的新數據)

class Broadcaster { 
    private final ConcurrentSkipListSet<Listener> listeners = 
     new ConcurrentSkipListSet<Listener>(); 

    private final BufferedReader source = //inject source 

    // register/de-gegister code 
    public void register(Listener listener); 
    public void deRegister(Listener listener); 
    // usually it's used like broadcaster.register(this); 

    public void broadcast(){ 
     String read = null; 
     while((read = source.readLine())!=null){ 
      for(Listener listener : listeners){ 
       listener.send(read); 
      } 
     } 
    } 
} 

class Listener implements Runnable { 
    private final Queue<String> queue = new LinkedBlockingQueue<String>(); 

    public void send(String read){ 
     queue.add(read); 
    } 

    public void run(){ 
     while(!Thread.currentThread().isInterrupted()){ 
      String got = queue.get(); 
      System.out.println(got); 
     } 
    } 
} 

我沒有測試這個東西什麼的,所以可憐,如果它不能正常工作!

編輯:如果你有記憶的限制,你可能想這樣做,以及:

Queue<String> queue = new LinkedBlockingQueue<String>(2000); 

這將使多少元素可以在隊列中排隊一個上限,而當這限制已達到,想要在其上放置元素的線程將不得不等待(如果使用了add,那就是)。這樣,當聽衆不能夠快速消費數據時,廣播公司會阻止(等待)(在這種簡單的方案中,其他聽衆可能會餓死)。

編輯2:當你這樣做時,你應該最好只在隊列上放置不變的東西。如果你把例如byte[],一個不禮貌的監聽器可能會改變數據,其他線程可能會感到驚訝。如果這是不可能的,你應該在每個隊列上放一個不同的副本,例如byteArray.clone()。儘管這可能會帶來一些性能問題。