2011-09-30 29 views
2

我正在尋找一種方法來限制對java中的方法的訪問,不超過每X秒一次。這裏是我的情況:在java中的最短時間鎖定

我想在多線程並行運行這段代碼:

private MyService service; 
public void run() { 
    // Send request to remote service 
    InputStream response = service.executeRequest(); 

    // Process response 
    ... some more code 
} 

的executeRequest()方法將HTTP請求發送到遠程服務器(這是不是我的,我沒有訪問其實現)並等待來自服務器的響應。然後它會對數據進行一些處理。 我想有很多線程並行運行它。我的問題是,如果同時發送太多的請求,遠程服務器將崩潰。所以我想要一些確保executeRequest()方法永遠不會每秒調用一次的方法。

你知道我怎麼能在java中做到這一點?謝謝

+1

什麼限制併發執行的數量和阻斷新的呼叫,直到服務實例可用?您是否使用任何特定的技術爲您的遠程服務器? – Thomas

+0

遠程服務器不是我的。 MyService.executeRequest向遠程服務器發送http請求並等待響應。我無法控制它。 – jonasr

回答

2

<1秒你可以使用一個信號量節流線程能夠調用executeRequest()的數量:

http://download.oracle.com/javase/1,5,0/docs/api/java/util/concurrent/Semaphore.html

正在執行的線程可能會在進入execute之前增加信號量,其他線程可能會等待它降至0或數字,這反映了允許並行運行的數量。

一個TimerTask:

http://download.oracle.com/javase/1.4.2/docs/api/java/util/TimerTask.html

可以用了3秒後減小信號...進入節流不超過1新進入者每3秒:

+0

信號量+1。忘了那些。 – FloppyDisk

+0

這對我有效,謝謝 – jonasr

4

Hrm,我不確定限制訪問方法的頻率將導致防止過載。

也許在上面的帖子中沒有足夠的信息,但似乎WorkerThread + JobQueue設置在這裏工作得很好。

深思: Multithreaded job queue manager

編輯:試圖少一點含糊......

  • 已在服務器收集請求到一些數據結構, 也許是一類被稱爲工作。
  • 有工作然後被放置在隊列的底部 。
  • 讓WorkerThread對象將Job對象從隊列頂部的 上彈出並處理它們。
  • 確保只需要實例化多個WorkerThread對象,因爲您需要維護適當的服務器負載。 只有實驗將確定該數量,但作爲一個非常粗糙 規則,開始處理核# - 1(又名開始7名工人 一個8芯機)

EDIT#2在光新的信息:在客戶端

  • 做一個工人,可以追蹤喬布斯已經提交什麼

    • 設置一個隊列,其工作變得響應和作業仍在處理。這將允許限制任何時候提交的作業數量。
    • 製作工人的軌道「lastSubmissionTime」,以防止任何提交存在的從以前
  • +0

    對不起,我不夠精確。遠程服務器不是我的。我正在向遠程服務器發送http請求並等待響應。我不知道它是如何構建的,但是在太短的時間間隔內發送太多的請求會使其崩潰,這就是爲什麼我想限制每秒發送一次的請求的數量 – jonasr

    +0

    啊,在這種情況下,構建在客戶端的Job和JobQueue,並且有一個工作人員管理向服務器提交的作業。您可以設置它,以便工人每秒提交一份工作,或者您可以通過跟蹤提交和回覆來跟蹤工作中有多少未完成的工作......或者同時執行兩項操作:) – claymore1977

    0

    你有沒有想過關於使用sleep在跳轉到遠程呼叫之前讓線程暫停?您可以讓線程在1到5之間隨機選擇一個秒,這將限制任何時候觸發該方法的線程數。

    你也可以在1秒後過期的方法上放一個lock,這樣每個線程都會「抓取」鎖,執行該方法,但其鎖會失效,以便下一個線程可以抓住它並執行。把鎖放在方法的開始處 - 除非保持線程一秒thread.sleep(1000)然後繼續執行。這將限制您一次只能觸發該方法的一個線程。

    編輯:應對OP的評論下面

    class X { 
        private final ReentrantLock lock = new ReentrantLock(); 
        // ... 
    
        public void m() { 
        lock.lock(); // block until condition holds 
        try { 
         thread.sleep(1000) //added to example by floppydisk. 
        } finally { 
         lock.unlock() 
         doYourConnectionHere(); 
        } 
        } 
    } 
    

    改自:ReentrantLock。在try/catch內部,你所做的只是thread.sleep(1000)而不是實際做某件事。然後它釋放下一個線程的鎖並繼續執行方法體的其餘部分 - 在你的情況下連接到遠程服務器。

    +0

    你的第一個建議對我來說並不真正有效,因爲沒有什麼能夠阻止兩個線程同時喚醒並幾乎同時調用服務。你的第二個建議是我正在尋找的東西,但我沒有找到它的一個例子。 – jonasr

    +0

    @jonasr:更新了答案以提供更好的代碼示例。 – FloppyDisk

    +0

    這個問題是每個線程在調用方法之前都會等待1秒,即使他們不需要。這可能不是很多,但如果我需要延長它可能是一個問題 – jonasr

    0

    使用動態代理你可以用你的服務,並在InvocationHandler的處理最高執行:

    MyService proxy = (MyService) Proxy.newProxyInstance(// 
        MyService.class.getClassLoader(), // 
        new Class[] {MyService.class}, // 
        new MaxInvocationHandler()); 
    

    哪裏天真實現的InvocationHandler的可能是這個樣子:

    class MaxInvocationHandler implements InvocationHandler { 
        private static final long MAX_INTERVAL = 1000L; 
        private static final long MAX_INVOCATIONS = 1; 
    
        AtomicLong time = new AtomicLong(); 
        AtomicLong counter = new AtomicLong(); 
    
        @Override 
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 
         long currentTime = System.currentTimeMillis(); 
         if (time.get() < currentTime) { 
         time.set(currentTime + MAX_INTERVAL); 
         counter.set(1); 
         } else if(counter.incrementAndGet() > MAX_INVOCATIONS) { 
         throw new RuntimeException("Max invocation exceeded"); 
         } 
    
         return method.invoke(proxy, args); 
        } 
        } 
    
    1

    限制併發客戶端上方不是一個好的模式—客戶應該如何相互瞭解?

    0

    在您的控制器類中,您調用worker來執行該服務,請使用ExecutorService啓動線程池以便使用該工作器類。

    ExecutorService pool = Executors.newFixedThreadPool(10); 
    pool.submit(new MyServiceImpl(someObject)); 
    

    要添加什麼別人已經提出,有職工班採取任務從執行隊列中,等待的分鐘數,你走了另一任務隊列之前需要。我以這2分鐘爲例。

    例子:

    public class MyServiceImpl implements MyService , Runnable { 
    
        public static final int MAX_SIZE = 10; 
        private final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(MAX_SIZE); 
    
        @Override 
        public void run() { 
        try 
        { 
        Object obj; 
         while ((obj==queue.take()) != null) 
         { 
         executeRequest(obj); 
         //wait for 2 min 
         Thread.sleep(1000 * 60 * 2); 
         } 
        } 
        catch (InterruptedException e) 
        {} 
        } 
    
        public void executeRequest(Object obj) 
        { 
        // do routine 
        } 
    
        public MyServiceImpl (Object token) 
        { 
        try 
        { 
         queue.put(token); 
        } 
        catch (InterruptedException e) 
        { 
         throw new AssertionError(e); 
        } 
        } 
    }