2013-02-03 20 views
7

目前我正在嘗試使用RESTful API構建一個Web服務來處理一些長時間運行的任務(作業)。Clojure中的Web服務的異步作業隊列

這個想法是,用戶通過執行一個POST來提交一個作業,該POST返回一些用於檢查作業狀態的URL,其中也包含結果的URL。作業完成後(即將一些值寫入數據庫),結果URL將返回相應的信息(而不是沒有結果),並且作業url將指示已完成的狀態。

不幸的是,計算相當密集,因此一次只能運行一個計算,因此需要對作業進行排隊。

在僞像這樣將需要

(def job-queue (atom queue)) ;; some queue 
(def jobs (atom {})) 

(defn schedule-job [params] 
    ;; schedules the job into the queue and 
    ;; adds the job to a jobs map for checking status via GET 
    ;; note that the job should not be evaluated until popped from the queue 
) 

(POST "/analyze" [{params :params}] 
(schedulde-job params)) 

(GET "job/:id" [:d] 
(get @jobs id)) 

;; Some function that pops the next item from the queue 
;; and evaluates it when the previous item is complete 
;; Note: should not terminate when queue is empty! 

我看着Lamina允許異步處理,但它似乎沒有適合我的需要。

我的問題是如何使作業隊列出隊並在上一個作業完成後執行其任務,而不會在隊列爲空時終止,即永久處理傳入作業。

+0

我不是太擅長clojure,所以沒有幫助,只是一個側面說明:爲什麼要返回204?我覺得把一個實際的消息(只是一個200)與一個表示它還沒有完成的實體消息或類似的東西返回可能會更好? – Nanne

+0

你是對的,因爲我錯過了「如果客戶端是用戶代理,它不應該改變它的HTTP請求發送請求的文檔視圖」,否則「沒有內容「我猜想會更合適。 – JoelKuiper

+0

我總是喜歡REST API不嘗試在http上播放。如果我要求一些數據(例如某人的賬號,但是有一個錯誤的ID),我更喜歡在身體內部獲得一個200的提示,而不是這樣的人,而不是例如404。這感覺有點兒一樣。但是這是不重要的,對此很抱歉;) – Nanne

回答

9

一個java.util.concurrent.ExecutorService中可能是你想。這使您可以提交作業供以後執行,並返回一個可以查詢的Future,以發現它是否已完成。

(import '[java.util.concurrent Callable Executors]) 

(def job-executor 
    (Executors/newSingleThreadExecutor)) 

(def jobs (atom {})) 

(defn submit-job [func] 
    (let [job-id (str (java.util.UUID/randomUUID)) 
     callable (reify Callable (call [_] (func))] 
    (swap! jobs assoc job-id (.submit job-executor callable)) 
    job-id)) 

(use 'compojure.core) 

(defroutes app 
    (POST "/jobs" [& params] 
    (let [id (submit-job #(analyze params))] 
     {:status 201 :headers {"Location" (str "/jobs/" id)}})) 
    (GET "/jobs/:id" [id] 
    (let [job-future (@jobs id)] 
     (if (.isDone job-future) 
     (.get job-future) 
     {:status 404})))) 
+2

不能'ExecutorService'可以用內置的'future'替換嗎? – neoascetic

2

這似乎正在做我期望的,但它確實看起來相當不通俗。任何人都有如何改善這個想法?

;; Create a unique identifier 
(defn uuid [] (str (java.util.UUID/randomUUID))) 

;; Create a job-queue and a map for keeping track of the status 
(def job-queue (ref clojure.lang.PersistentQueue/EMPTY)) 
(def jobs (atom {})) 

(defn dequeue! [queue-ref] 
    ;; Pops the first element off the queue-ref 
    (dosync 
    (let [item (peek @queue-ref)] 
     (alter queue-ref pop) 
     item))) 

(defn schedule-job! [task] 
    ;; Schedule a task to be executed, expects a function (task) to be evaluated 
    (let [uuid (uuid) 
     job (delay task)] 
    (dosync 
     (swap! jobs assoc uuid job) 
     (alter job-queue conj job)))) 

(defn run-jobs [] 
    ;; Runs the jobs 
    (while true 
    (Thread/sleep 10) 
    (let [curr (dequeue! job-queue)] 
     (if-not (nil? curr) (@curr))))) 

(.start (Thread. run-jobs)) 
0

您的描述看起來像一個多生產者和單一消費者的情況。 下面是一個例子代碼(你可以用REST的東西掛鉤,可能還有一些 異常處理,使代理人沒有得到死者)

(def worker (agent {}))                                

(defn do-task [name func]                                
    (send worker                                  
     (fn [results]                                 
      (let [r (func)]                                
      (assoc results name r))))) 

;submit tasks                            
(do-task "uuid1" #(print 10))                               
(do-task "uuid2" #(+ 1 1)) 

;get all results 
(print @worker)