2013-12-12 33 views
11

如何編寫限制Q promise併發性的方法?如何限制Q promise的併發性?

例如,我有一個方法spawnProcess。它返回一個Q諾言。
我希望一次不會產生超過5個進程,但對調用代碼透明。

我需要實現與簽名

function limitConcurrency(promiseFactory, limit) 

一個功能,我可以打電話給喜歡

spawnProcess = limitConcurrency(spawnProcess, 5); 

// use spawnProcess as usual 

我已經開始在我的版本的工作,但我不知道是否有人具有簡潔的實現我可以檢查。

+0

你的瀏覽器編寫代碼,或者節點?如果是前者,則不存在併發... –

+0

@Matt:我正在寫節點。我並不是說像線程中的併發,我的意思是併發,就像「同時運行的許諾」一樣。 –

+0

你嘗試了什麼?使用等待隊列和緩衝請求不應該太難。 – schlingel

回答

10

我有,這是否對你https://github.com/ForbesLindesay/throat

您可以通過browserify使用它,或者從brcdn(https://www.brcdn.org/?module=throat&version=latest)下載獨立的構建,並將其添加爲腳本標籤庫。

然後(假設Promise構造函數polyfilled或在您的環境中實現),你可以這樣做:

//remove this line if using standalone build 
var throat = require('throat'); 

function limitConcurrency(promiseFactory, limit) { 
    var fn = throat(promiseFactory, limit); 
    return function() { 
    return Q(fn.apply(this, arguments)); 
    } 
} 

你可以只叫throat(promiseFactory, limit)直接,但會返回一個promise承諾,而不是問答承諾。

我也非常喜歡在array.map中使用它。

// only allow 3 parallel downloads 
var downloadedItems = Q.all(items.map(throat(download, 3))); 
+0

太好了,謝謝!你的圖書館看起來像比我更習慣的JS。另外,測試。 –

+0

很高興你覺得它有用:) – ForbesLindesay

2

這似乎是爲我工作。

我不確定我是否可以簡化它。在scheduleNextJob遞歸是必要的,所以running < limitlimit++始終執行在同一個滴答聲。

Also available as a gist.

'use strict'; 

var Q = require('q'); 

/** 
* Constructs a function that proxies to promiseFactory 
* limiting the count of promises that can run simultaneously. 
* @param promiseFactory function that returns promises. 
* @param limit how many promises are allowed to be running at the same time. 
* @returns function that returns a promise that eventually proxies to promiseFactory. 
*/ 
function limitConcurrency(promiseFactory, limit) { 
    var running = 0, 
     semaphore; 

    function scheduleNextJob() { 
    if (running < limit) { 
     running++; 
     return Q(); 
    } 

    if (!semaphore) { 
     semaphore = Q.defer(); 
    } 

    return semaphore.promise 
     .finally(scheduleNextJob); 
    } 

    function processScheduledJobs() { 
    running--; 

    if (semaphore && running < limit) { 
     semaphore.resolve(); 
     semaphore = null; 
    } 
    } 

    return function() { 
    var args = arguments; 

    function runJob() { 
     return promiseFactory.apply(this, args); 
    } 

    return scheduleNextJob() 
     .then(runJob) 
     .finally(processScheduledJobs); 
    }; 
} 

module.exports = { 
    limitConcurrency: limitConcurrency 
} 
2

Deferred承諾實現了gate功能,工作原理完全這樣:

spawnProcess = deferred.gate(spawnProcess, 5);  
2

我寫了一個小圖書館要做到這一點:https://github.com/suprememoocow/qlimit

這是非常容易使用,並專門設計與Q承諾一起工作:

var qlimit = require('qlimit'); 
var limit = qlimit(2); // 2 being the maximum concurrency 

// Using the same example as above 
return Q.all(items.map(limit(function(item, index, collection) { 
    return performOperationOnItem(item); 
})); 

它也可用於併發限制到特定的資源,像這樣:

var qlimit = require('qlimit'); 
var limit = qlimit(2); // 2 being the maximum concurrency 

var fetchSomethingFromEasilyOverwhelmedBackendServer = limit(function(id) { 
    // Emulating the backend service 
    return Q.delay(1000) 
    .thenResolve({ hello: 'world' }); 
});