backpressure

    0熱度

    1回答

    我在我的android應用程序中使用RxJava。我正在使用interval()函數使用計時器,但即使添加了onBackPressureDrop(),我仍然得到Missing Backpressure異常。我還爲我的訂戶添加了onError(),並將異常記錄到Crashlytics,但它仍然崩潰。請幫忙。我花了一個星期的時間來解決問題,但無濟於事。代碼偶爾崩潰,我甚至一次都無法重現它。 Trace

    0熱度

    1回答

    我一直在掙扎一段時間,我相信這是一個非常基本的問題。 我有一個Flowable從網絡中檢索一捆物品併發出它們。 Flowable .create(new FlowableOnSubscribe<Item>() { @Override public void subscribe(FlowableEmitter<Item> emitter) throws Except

    2熱度

    1回答

    我用阿卡流「ActorPublisher演員作爲流每個連接的數據Source發送到傳入的WebSocket或HTTP連接。 ActorPublisher的contract是定期通過提供需求請求數據 - 下游可接受的元素數量。如果需求爲0,我不應該發送更多元素。我觀察到,如果我緩衝元素,當消費者速度緩慢時,緩衝區大小在1到60之間波動,但大多數在40-50之間。 要流我使用阿卡-HTTP「s到的We

    3熱度

    1回答

    我知道,從Akka 2.4.16開始,沒有「遠程」實現Reactive Streams。規範側重於在單個JVM上運行的流。 但是,考慮用例涉及另一個JVM進行某些處理,同時保持背壓。這個想法是有一個主要的應用程序,提供一個運行流的用戶界面。例如,這個流有一個階段執行一些重要的計算,應該運行在不同的機器上。我感興趣的方式來運行在分佈式的方式流 - 我一些相關的文章來指出一些想法: 通過TCP連接流使

    4熱度

    1回答

    最近我意識到我不明白RxJava2背壓是如何工作的。 我做了小測試,我希望它應該會失敗,MissingBackpressureException例外: @Test public void testBackpressureWillFail() { Observable.<Integer>create(e -> { for (int i = 0; i < 10000; i++)

    3熱度

    1回答

    我想測試一些Akka流功能,如conflate。爲此,我需要在簡單的單元測試中構建一個不受背壓影響的源。天真的嘗試,如 Source.tick(1.milli, 1.milli, "tick").map(_ => Random.nextDouble()) 由於背壓不起作用。 OTOH通過HTTP可能是矯枉過正。 如何創建一個簡單的Source對於不受背壓影響的單元測試?

    10熱度

    2回答

    我正在使用datastax java驅動程序3.1.0連接到cassandra羣集,我的cassandra羣集版本是2.0.10。我正在使用QUORUM一致性進行異步寫入。 private final ExecutorService executorService = Executors.newFixedThreadPool(10); public void save(String p

    4熱度

    1回答

    我正在分析Spark結構化流式處理中的背壓功能。有誰知道細節?是否有可能通過代碼調整處理傳入記錄? 謝謝

    0熱度

    1回答

    在RxJava 1/RxScala中,如何在下列情況下節流/背壓可觀測源? def fast: Observable[Foo] // Supports backpressure def afterExpensiveOp: Observable[Bar] = fast.flatMap(foo => Observable.from(expensiveOp(foo)) // Signa

    0熱度

    1回答

    我已經編寫了一個Akka應用程序,該應用程序從Kafka獲取輸入,然後使用分片演員處理數據並輸出到Kafka。 但在某些場合分片區域不能處理負載,我也得到: 你或許應該實行流量控制,以避免水浸 遠程連接。 如何在此鏈/流中實施背壓? 卡夫卡消費 - >共享演員 - >卡夫卡生產者 從代碼一些片斷: ReactiveKafka kafka = new ReactiveKafka(); Subsc