2016-12-13 42 views
1

我已經繼承了一些輪詢更新的代碼,將這些更新附加到延遲序列並處理。從clojure 1.7.0-alpha5升級到任何更高版本後,由於惰性序列的chunking代碼顯示中斷。我寫了一個例子來說明這個問題:處理無限(懶惰)序列

(defn infinite-updates 
    [] 
    (letfn [(step [n] 
      (lazy-seq 
      ;; Poll for an update 
      (Thread/sleep 3000) 
      (cons n (step (rand-int 5)))))] 
    (lazy-seq 
     (step (rand-int 5))))) 

;; Run with: 
(doseq [t (sequence (map inc) (infinite-updates))] (println t)) 

該項目上的Clojure 1.7.0-素α5β1和作品本身:每3秒t打印。

一旦我升級超過該修訂版,它會將結果分塊,因此在大約1.5分鐘後打印出32 t

我已經使用了以下嘗試:

(defn unchunk [s] 
    (when (seq s) 
    (lazy-seq 
    (cons (first s) 
     (unchunk (next s)))))) 

unchunk,沒有運氣的數據。

我該如何處理這些更新,因爲它們是可用的還是有更通俗的方式來編寫infinite-updates,這樣我可以在不依賴懶惰seq的情況下處理所有更新?

回答

3

您的問題是爲clojure.core.async量身定做的。此代碼顯示的輪廓:

(ns tst.clj.core 
    (:use clj.core 
     clojure.test) 
    (:require 
    [clojure.core.async :as async] 
)) 

; create a buffer of arbitrary size 99 (any size, even zero, would work) 
(let [buffer (async/chan 99) ] 
    (async/go  ; start sending loop in another thread 
    (while true 
     (Thread/sleep 3000) 
     (let [val (rand-int 5) ] 
     (println "putting:" val) 
     (async/>!! buffer val)))) 
    (while true  ; start receiving loop in this thread 
    (println "received:" 
     (async/<!! buffer)))) 

與輸出:

*clojure-version* => {:major 1, :minor 8, :incremental 0, :qualifier nil} 
java.version => 1.8.0_111 
putting: 4 
received: 4 
putting: 1 
received: 1 
putting: 0 
received: 0 
putting: 0 
received: 0 
putting: 0 
received: 0 
putting: 1 
received: 1 
putting: 1 
received: 1 
putting: 0 
received: 0 
putting: 0 
received: 0 
putting: 0 
received: 0 
putting: 3 
received: 3 
putting: 0 
received: 0 
putting: 1 
received: 1 
putting: 4 
received: 4 
putting: 3 
received: 3 
putting: 2 
received: 2 
putting: 4 
received: 4 

每3秒。另請參閱:

http://www.braveclojure.com/core-async/

http://clojure.com/blog/2013/06/28/clojure-core-async-channels.html

http://clojure.github.io/core.async/

https://github.com/clojure/core.async/blob/master/examples/walkthrough.clj

https://www.infoq.com/presentations/clojure-core-async

https://youtu.be/enwIIGzhahw

2

而不是使用傳感器,使用「常規」地圖(您手動構建的列表不是分塊):

(doseq [t (map inc (infinite-updates))] (println t))