2017-02-21 19 views
0

我正在啓動幾百個併發的http-kit.client/get請求,並提供回調以將結果寫入單個文件。在併發http-kit/get實例中使​​用I/O回調的最簡單方法

什麼是處理線程安全的好辦法?從core.asyc使用chan<!!

下面的代碼,我會考慮:

(defn launch-async [channel url]                                 
    (http/get url {:timeout 5000                                 
       :user-agent "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12; rv:10.0) Gecko/20100101 Firefox/10.0"}            
      (fn [{:keys [status headers body error]}]                            
      (if error                                   
       (put! channel (json/generate-string {:url url :headers headers :status status}))                 
       (put! channel (json/generate-string body))))))                          

(defn process-async [channel func]                                
    (when-let [response (<!! channel)]                                
    (func response)))                                   

(defn http-gets-async [func urls]                                
    (let [channel (chan)]                                   
    (doall (map #(launch-async channel %) urls))                             
    (process-async channel func)))  

感謝您的見解。

回答

3

既然您已經在您的示例中使用了core.async,我想我會指出幾個問題以及如何解決它們。另一個答案提到使用更基本的方法,並且我完全同意簡單的方法就好。但是,對於頻道,您有一種簡單的方式來使用不涉及映射到矢量的數據,如果您有很多響應,矢量也會隨着時間的推移而變大。考慮以下問題以及我們如何解決這些問題:

(1)如果您的url列表中有超過1024個元素,則當前版本將崩潰。有看跌期權內部緩衝區,並執行是異步(即put!take!不會阻止,但總是立即返回),並且限制是1024,是在地方,以防止通道的無界異步使用。要親自看看,請致電(http-gets-async println (repeat 1025 "http://blah-blah-asdf-fakedomain.com"))

你想要做的只是在有空間的時候纔會將某些東西放在頻道上。這被稱爲反壓。從go block best practices優秀的維基以一個頁面,一個聰明的方法來從HTTP-KIT回調做,這是使用put!回調選項來啓動你的下一個HTTP GET;這個時候put!立即成功纔會發生,所以你永遠不會有一個情況下,你可以超越通道的緩衝區:

(defn launch-async 
    [channel [url & urls]] 
    (when url 
    (http/get url {:timeout 5000 
        :user-agent "Mozilla"} 
       (fn [{:keys [status headers body error]}] 
       (let [put-on-chan (if error 
            (json/generate-string {:url url :headers headers :status status}) 
            (json/generate-string body))] 
        (put! channel put-on-chan (fn [_] (launch-async channel urls)))))))) 

(2)其次,你似乎只處理一個響應。相反,使用一去環:

(defn process-async 
    [channel func] 
    (go-loop [] 
    (when-let [response (<! channel)] 
     (func response) 
     (recur)))) 

(3)這是你的http-gets-async功能。我看不出有什麼害處在這裏加入緩衝液,因爲它可以幫助你在一開始火了請求的一陣不錯:

(defn http-gets-async 
    [func urls] 
    (let [channel (chan 1000)] 
    (launch-async channel urls) 
    (process-async channel func))) 

現在,你必須處理的URL的無限數量的能力,與背壓力。爲了測試這個,定義一個計數器,然後讓你的處理函數增加這個計數器來查看你的進度。使用本地主機地址,很容易喋喋不休(不推薦發射了數以十萬計的請求,比方說,谷歌等):

(def responses (atom 0)) 
(http-gets-async (fn [_] (swap! responses inc)) 
       (repeat 1000000 "http://localhost:8000")) 

因爲這是所有異步,你的函數會立即返回你可以看看@responses成長。

您可以做的另一件有趣的事情是,您可以在process-async中運行處理功能,而不必在通道本身上應用它作爲傳感器。

(defn process-async 
    [channel] 
    (go-loop [] 
    (when-let [_ (<! channel)] 
     (recur)))) 

(defn http-gets-async 
    [func urls] 
    (let [channel (chan 10000 (map func))] ;; <-- transducer on channel 
    (launch-async channel urls) 
    (process-async channel))) 

有很多方法可以做到這一點,包括構建它,這樣的通道關閉(請注意,上面,它保持打開狀態)。如果你願意,你可以在這方面有基本的幫助,而且它們很容易使用。可能性非常多。

+0

我希望'launch-asyc'溢出時提供一個大的url序列。爲什麼不是這樣? (我認爲這是因爲它是作爲官方建議提供的)。感謝 – user3639782

+0

另一件事。只要我將代碼發送到repl(boot repl),它就會寫入文件,但是當我將相同的代碼包裝在'-main'函數中並將其作爲腳本運行時,沒有任何反應。它應該這樣表現嗎?感謝 – user3639782

+1

@ user3639782從「repeat」函數產生的序列中取出的每個元素是「按需」產生的,即序列是懶惰的,實際上可以是無限的。所以,網址列表幾乎沒有記憶。關於您的其他問題,我不確定您寫入文件的意思。 – Josh

1

這是很簡單的,我不會用core.async它。你可以用一個原子存儲來做到這一點,使用一個響應的矢量,然後有一個單獨的線程讀取原子的內容,直到看到所有的響應。然後,在您的http-kit回調中,您可以直接將該響應輸入到原子中。

如果你確實想使用core.async,我推薦一個緩衝通道來阻止你的http-kit線程池。

相關問題