2016-01-27 11 views
2

我只是沒有得到這個概念,並尋找啓示。我試圖在數據被提取時觀察,但是混淆了這個過程。這是迄今爲止我所擁有的。 ajax請求使用d3.tsv完成。Rxjs從D3包裝其他庫函數(Observable)

var test = Rx.Observable.just(
    d3.tsv("https://gist.githubusercontent.com/mbostock/3885304/raw/37bd91278846c053188a130a01770cddff023590/data.tsv", 
     function(d) { 
      return { 
      letter: d.letter, 
      frequency: +d.frequency 
      }; 
     }, 
     function(error, rows) { 
      console.log('mytest2',rows); 
     } 
    ) 
); 

var observer = Rx.Observer.create(
    function (x) { console.log('onNext: %s', x); }, 
    function (e) { console.log('onError: %s', e); }, 
    function() { console.log('onCompleted'); }); 

var subscription = test.subscribe(observer); 

雖然這個ajax請求在技術上起作用,但所有Observable函數都在數據到達之前發生。我如何構建這個我的'onNext'日誌給我的數據,而不是隻在d3.tsv函數中獲取它?

+1

「我只是沒有得到這個概念,並尋找啓示」我知道的感覺:D – Nobita

回答

1

有RxJS操作員致力於將回調轉換爲觀測值(.fromCallback,.fromNodeCallback)。然而,他們不會在這裏工作,因爲他們期望ONE回調,並且回調是最後一個參數。在這裏你有兩個回調,一個用於成功,一個用於結果。我不知道這種情況下有任何特殊的操作符,所以我建議您使用自定義幫助函數。

function d3fn (url, success_handler, error_handler) { 
    success_handler ({ 
    letter : 'letter', 
    frequency : 9 
    }); 
} 

var d3 = {tsv : d3fn}; 

function fromD3Callback (d3fn, ctx) { 
return function() { 
    var args = Array.prototype.slice.call(arguments); 
    var subject = new Rx.AsyncSubject(); 

    function success_handler() { 
    subject.onNext.apply(subject, Array.prototype.slice.call(arguments)); 
    subject.onCompleted(); 
    } 

    function error_handler() { 
    subject.onError(Array.prototype.slice.call(arguments)); 
    } 

    args.push(success_handler); 
    args.push(error_handler); 

    d3fn.apply(ctx, args); 
    return subject.asObservable(); 
} 
} 

var test = fromD3Callback(d3.tsv)("https://gist.githubusercontent.com/mbostock/3885304/raw/37bd91278846c053188a130a01770cddff023590/data.tsv") 
    .map(function(d) { 
      return { 
      letter: d.letter, 
      frequency: +d.frequency 
      }; 
     }) 
    .catch(function(error, rows) { 
      console.log('mytest2',rows); 
      return Rx.Observable.throw({error: error, rows: rows}); 
     }); 

var observer = Rx.Observer.create(
    function (x) { console.log('onNext: %o', x); }, 
    function (e) { console.log('onError: %s', e); }, 
    function() { console.log('onCompleted'); }); 

var subscription = test.subscribe(observer); 
+0

謝謝你的反饋意見。可悲的是,它沒有奏效,但它讓我思考如何找到正確的答案。我進一步研究.fromNodeCallback,並得到了工作。問題是我不知道如何將這個工作代碼變成一個帶有過濾器的工作代碼,即.map,.flatmap,.filter等等。如果您有任何建議,我在下面發佈了答案。如果它包含一個新的Rx.AsyncSubject()甚至更好,因爲我認爲這是我需要去的方向。 – jamesRH

+0

已更正。 jsfiddle在這裏:http://jsfiddle.net/n1ugv4g4/1/。這回答了你問的問題。如果你有這個問題或另一個問題的擴展,請發佈一個新問題,除非它是一件小事。 – user3743222

+0

這主要回答我的問題。我已經更新了小提琴有d3庫,這返回數據集的第一項。 http://jsfiddle.net/53dg04ju/2/我的目標是像我的解決方案一樣返回整個數據集,但靈活性與您的答案相同。如果你可以更新,那會很棒。 – jamesRH

1

我已經找到一個答案,我的問題,但會愛一個更好的。我已經想通了,這就是:

var url = "https://gist.githubusercontent.com/mbostock/3885304/raw/37bd91278846c053188a130a01770cddff023590/data.tsv" 

var fetch = Rx.Observable.fromNodeCallback(d3.tsv); 

var source = fetch(url, function(d) { 
    return { 
    letter: d.letter, 
    frequency: +d.frequency 
    }; 
}) 

var observer = Rx.Observer.create(
    function (o) { 
     console.log('Next: success!', o); 
    }, 
    function (err) { 
     console.log('Error: ' + err); 
    }, 
    function() { 
     console.log('Completed'); 
    }); 

var subscription = source.subscribe(observer); 

我唯一的問題是我不知道如何執行過濾掉的這個,所以我會開到一個更好的解決方案。