2012-08-29 81 views
9

我正在使用Clojure應用程序從Web API訪問數據。我會發出很多請求,並且很多請求會導致更多的請求,所以我希望將請求URL保留在隊列中,以便在後續下載之間保留60秒。Clojure中的工作隊列

this blog post我把這個在一起:

(def queue-delay (* 1000 60)) ; one minute 

(defn offer! 
    [q x] 
    (.offerLast q x) 
    q) 

(defn take! 
    [q] 
    (.takeFirst q)) 

(def my-queue (java.util.concurrent.LinkedBlockingDeque.)) 

(defn- process-queue-item 
    [item] 
    (println ">> " item) ; this would be replaced by downloading `item` 
    (Thread/sleep queue-delay)) 

如果我包括(future (process-queue-item (take! my-queue)))在我的代碼,然後在某個地方的REPL我可以(offer! my-queue "something"),我看到了立即打印「>>東西」。到現在爲止還挺好!但是我需要隊列持續整個時間我的程序處於活動狀態。我剛纔提到的(future ...)電話會在隊列中有一個項目退出隊列時提供,但我希望能夠持續觀看隊列,並在有可用時呼叫process-queue-item。另外,與通常的Clojure對併發的愛相反,我想確保一次只有一個請求被執行,並且我的程序等待60秒來完成每個後續請求。

我認爲this Stack Overflow question是相關的,但我不知道如何適應它做我想做的。如何連續輪詢我的隊列並確保一次只運行一個請求?

+0

爲什麼你想連續輪詢,但只發送每60秒?每60秒輪詢一次會完成同樣的事情嗎? – mamboking

+0

@maboking幾乎,是的。該方法唯一的缺點是將第一個項目添加到隊列中:如果程序需要5秒鐘才能確定第一個請求URL將會是什麼,然後它將在那裏坐下55秒直到隊列被檢查。無論如何,該計劃將會非常長時間運行,但我想這不是太多問題。 – bdesham

+0

您是否在避免任務調度程序?例如,這個,https://github.com/zcaudate/cronj(還有一個其他庫在該回購的自述文件中) – georgek

回答

0

我結束了滾動我自己的小型圖書館,我叫simple-queue。你可以閱讀關於GitHub的完整文檔,但這裏是完整的源代碼。 我不會保持這個答案更新,所以如果你想使用這個庫,請從GitHub獲取源代碼。

(ns com.github.bdesham.simple-queue) 

(defn new-queue 
    "Creates a new queue. Each trigger from the timer will cause the function f 
    to be invoked with the next item from the queue. The queue begins processing 
    immediately, which in practice means that the first item to be added to the 
    queue is processed immediately." 
    [f & opts] 
    (let [options (into {:delaytime 1} 
         (select-keys (apply hash-map opts) [:delaytime])), 
     delaytime (:delaytime options), 
     queue {:queue (java.util.concurrent.LinkedBlockingDeque.)}, 
     task (proxy [java.util.TimerTask] [] 
       (run [] 
       (let [item (.takeFirst (:queue queue)), 
         value (:value item), 
         prom (:promise item)] 
        (if prom 
        (deliver prom (f value)) 
        (f value))))), 
     timer (java.util.Timer.)] 
    (.schedule timer task 0 (int (* 1000 delaytime))) 
    (assoc queue :timer timer))) 

(defn cancel 
    "Permanently stops execution of the queue. If a task is already executing 
    then it proceeds unharmed." 
    [queue] 
    (.cancel (:timer queue))) 

(defn process 
    "Adds an item to the queue, blocking until it has been processed. Returns 
    (f item)." 
    [queue item] 
    (let [prom (promise)] 
    (.offerLast (:queue queue) 
       {:value item, 
       :promise prom}) 
    @prom)) 

(defn add 
    "Adds an item to the queue and returns immediately. The value of (f item) is 
    discarded, so presumably f has side effects if you're using this." 
    [queue item] 
    (.offerLast (:queue queue) 
       {:value item, 
       :promise nil})) 

使用這個隊列,返回值的示例:

(def url-queue (q/new-queue slurp :delaytime 30)) 
(def github (q/process url-queue "https://github.com")) 
(def google (q/process url-queue "http://www.google.com")) 

q/process到的調用將阻塞,所以會有兩個def語句之間有30秒的延遲。

純粹使用此隊列的副作用的例子:

(defn cache-url 
    [{url :url, filename :filename}] 
    (spit (java.io.File. filename) 
     (slurp url))) 

(def url-queue (q/new-queue cache-url :delaytime 30)) 
(q/add url-queue {:url "https://github.com", 
        :filename "github.html"}) ; returns immediately 
(q/add url-queue {:url "https://google.com", 
        :filename "google.html"}) ; returns immediately 

現在呼籲立即q/add回報。

2

這是來自a project I did for fun的代碼片段。這並不完美,但可以讓你瞭解我如何解決「等待第一件產品55秒」的問題。它基本上是通過承諾循環的,使用期貨來立即處理或直到承諾「變爲」可用。

(defn ^:private process 
    [queues] 
    (loop [[q & qs :as q+qs] queues p (atom true)] 
    (when-not (Thread/interrupted) 
     (if (or 
      (< (count (:promises @work-manager)) (:max-workers @work-manager)) 
      @p) ; blocks until a worker is available 
     (if-let [job (dequeue q)] 
      (let [f (future-call #(process-job job))] 
      (recur queues (request-promise-from-work-manager))) 
      (do 
      (Thread/sleep 5000) 
      (recur (if (nil? qs) queues qs) p))) 
     (recur q+qs (request-promise-from-work-manager)))))) 

也許你可以做類似的事情?代碼不是很好,可能需要重新編寫才能使用lazy-seq,但那只是我尚未得到的練習!

0

這很可能是瘋了,但你總是可以使用這樣的函數來創建一個減慢的懶惰序列:

(defn slow-seq [delay-ms coll] 
    "Creates a lazy sequence with delays between each element" 
    (lazy-seq 
    (if-let [s (seq coll)] 
     (do 
      (Thread/sleep delay-ms) 
      (cons (first s) 
       (slow-seq delay-ms (rest s))))))) 

這基本上將確保每一個函數調用之間的延遲。

你可以像下面這樣使用它,提供毫秒的延遲:

(doseq [i (slow-seq 500 (range 10))] 
    (println (rand-int 10)) 

或者你也可以把你的函數調用序列裏面的東西,如:

(take 10 (slow-seq 500 (repeatedly #(rand-int 10)))) 

顯然,在上述兩種情況下,您都可以用您用來執行/觸發下載的任何代碼替換(rand-int 10)

+0

如果我正在閱讀這個權利,在運行'slow-seq'之前''coll'的所有元素都必須知道,對吧?我想要一些可以讓你動態地添加項目而沒有問題的東西。具體而言,如果一個API調用的結果是我需要再次調用API,該函數是否允許將第二個調用放在隊列中? – bdesham