2012-04-14 80 views
3

我認爲我做錯了。我正在創建線程,以便從共享隊列中剔除一些數據。我的問題是程序速度慢,內存不足,我懷疑隊列可能不像我希望的那樣共享。我懷疑這是因爲在我的代碼中,我添加了一行顯示隊列的大小,如果我啓動2個線程,然後我得到兩個完全不同數字的輸出,似乎自己增加(我認爲它可能是相同的數字,但也許它從100跳到2,等等,但看完它後顯示105和5,並以不同的速率運行,如果我有4個線程,那麼我看到4個不同的數字)。如何實現一個可以被多個線程處理的隊列?

這是相關部分的片段。我在節目

static class queue_class { 
     int number; 
     int[] data; 
     Context(int number, int[] data) { 
      this.number = number; 
      this.data = data; 
     } 
    } 

然後我將一些職位的調用後創建隊列的頂部創建隊列我想要的數據靜態類..

static class process_threaded implements Callable<Void> { 
    // queue with contexts to process 
    private Queue<queue_class> queue; 

    process_threaded(queue_class request) { 
     queue = new ArrayDeque<queue_class>(); 
     queue.add(request); 
    } 

    public Void call() { 
     while(!queue.isEmpty()) { 
      System.out.println("in contexts queue with a size of " + queue.size()); 
      Context current = contexts.poll(); 
      //get work and process it, if it work great then the solution goes elsewhere 
      //otherwise, depending on the data, its either discarded or parts of it is added back to queue 
      queue.add(new queue_class(k, data_list)); 

,你可以看到,數據有3個選項,如果數據是好的,就會被髮送出去,如果數據完全可怕或丟回隊列就丟棄。我認爲隊列正在發送,但我懷疑是因爲每個線程都在自己的隊列中工作,而不是共享隊列。

這是猜測正確的,我在做這錯了嗎?

+0

此代碼不會編譯。你是否嘗試編譯和執行某些東西,然後問一個具體的問題,而不是編寫一個不被java編譯器接受的僞代碼? – 2012-04-14 04:24:19

+0

我的實際代碼編譯..我沒有想到任何人都想編譯它,所以我拿了代碼,清理了與問題無關的信息,只是爲了展示我在做什麼。我不需要代碼只是邏輯,並認爲上面的代碼應該讓我知道我在做什麼。 – 2012-04-14 04:33:28

+0

您應該閱讀[實踐中的Java併發](http://www.amazon.com/Java-Concurrency-Practice-Brian-Goetz/dp/0321349601)以編寫線程安全代碼的專業知識。 – 2012-04-14 04:54:35

回答

2

你在你的評估,即每個線程(可能)擁有自己的隊列工作,因爲你正在創建你的Callable的構造隊列正確。 (它實際上很奇怪有一個Callable<Void> - 是不是隻是一個Runnable?)

還有其他問題,例如,您正在處理一個非線程安全的隊列,或者你的代碼在編寫時不會編譯的事實。

但是,重要的問題是你真的需要顯式創建一個隊列嗎?爲什麼沒有ExecutorService你提交Callable(或Runnables,如果你決定進行切換):將執行者的引用傳遞到你的Callable,他們可以將新的Callable添加到執行者的任務隊列中以運行。沒有必要重新發明輪子。

例如:

static class process_threaded implements Runnable { 
    // Reference to an executor 
    private final ExecutorService exec; 
    // Reference to the job counter 
    private final AtomicInteger jobCounter; 
    // Request to process 
    private queue_class request; 

    process_threaded(ExecutorService exec, AtomicInteger counter, queue_class request) { 
     this.exec = exec; 
     this.jobCounter = counter; 
     this.jobCounter.incrementAndGet(); // Assuming that you will always 
              // submit the process_threaded to 
              // the executor if you create it. 
     this.request = request; 
    } 

    public run() { 
     //get work and process **request**, if it work great then the solution goes elsewhere 
     //otherwise, depending on the data, its either discarded or parts of are added back to the executor 
     exec.submit(new process_threaded(exec, new queue_class(k, data_list))); 

     // Can do some more work 

     // Always run before returning: counter update and notify the launcher 
     synchronized(jobCounter){ 
      jobCounter.decrementAndGet(); 
      jobCounter.notifyAll(); 
     } 
    } 
} 

編輯:

爲了解決您的時候關閉執行的問題,我覺得最簡單的辦法是有一個工作計數器,關機時達到0.對於螺紋安全,AtomicInteger可能是最佳選擇。我在上面添加了一些代碼來包含更改。然後你的啓動代碼看起來像這樣:

void theLauncher() { 

    AtomicInteger jobCounter = new AtomicInteger(0); 

    ExecutorService exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcesses()); 

    exec.submit(new process_threaded(exec, jobCounter, someProcessRequest)); 
    // Can submit some other things here of course... 

    // Wait for jobs to complete: 
    for(;;jobCounter.get() > 0){ 
     synchronized(jobCounter){ // (I'm not sure if you have to have the synchronized block, but I think this is safer. 
      if(jobCounter.get() > 0) 
       jobCounter.wait(); 
     } 
    } 

    // Now you can shutdown: 
    exec.shutdown(); 
} 
+0

有趣,沒有想到(只是將它提交給ExecutorService)。一些基本的問題。如果我從線程本身做到這一點,或者這不重要,會是一個問題嗎? ExecutorService也是安全的多線程進入它(當然,我知道它可以分配任務,但它可以同時採取多個?)?該代碼是給我,對不起,所以我想提高我得到了什麼,沒有實現明顯簡單的解決方案.. – 2012-04-14 04:38:14

+0

@learningJava是的,它是線程安全的,從不同的線程任務提交到同一個執行者。 (實際上,Java 7中的新ForkJoinTask東西是如何工作的)。順便說一句,你可能想看看'ForkJoinTask',它可能很適合這個任務。 – trutheality 2012-04-14 04:45:32

+0

我現在正在使用java6..jave7似乎解決了我所有的問題(編程問題,我的意思是)。我會測試它並報告.. – 2012-04-14 04:52:21

2

不要重新發明輪子!如何使用ConcurrentLinkedQueue?從javadocs:

基於鏈接節點的無界線程安全隊列。該隊列命令元素FIFO(先進先出)。隊列頭是最長時間在隊列中的元素。隊列的尾部是已經在隊列上的最短時間的元素。新元素插入到隊列尾部,隊列檢索操作獲取隊列頭部的元素。當許多線程將共享對共同集合的訪問時,ConcurrentLinkedQueue是一個合適的選擇。

+0

我給它一個shot..thanks我雖然它僅是爲java7,但猜測not..thanks。 – 2012-04-14 04:50:38

+0

確保你創建一個單曲在他們共同分享的任務之外領取。如果每個任務創建自己的隊列 - 線程安全或不 - 你會遇到問題。 – 2012-04-14 04:53:33

+0

'ConcurrentLinkedQueue'自Java 1.5起可用 – 2012-04-14 04:54:59

相關問題