2016-11-17 18 views
1

我有一個BLE設備列表,並且正在使用RxJava與它們進行交互。我需要從列表中發出一個項目,重複寫入一個特徵直到X發生,然後繼續到列表中的下一個項目。一次發送一個項目,與它交互,直到滿足條件,然後繼續下一個項目

當前代碼:

Observable.from(mDevices) 
       .flatMap(new Func1<Device, Observable<?>>() { 
        @Override 
        public Observable<?> call(Device device) { 
         Log.d(TAG, "connecting for policing"); 
         return device.connectForPolicing(); 
        } 
       }) 
       .observeOn(AndroidSchedulers.mainThread()) 
       .subscribe(new Action1<Object>() { 
        @Override 
        public void call(Object o) { 
         Log.d(TAG, "subscribing... "); 
        } 
       }); 

其中.connectForPolicing()樣子:

public Observable<byte[]> connectForPolice() { 

     .... 

     return device.establishConnection(mContext, false) 
       .flatMap(new Func1<RxBleConnection, Observable<byte[]>>() { 
        @Override 
        public Observable<byte[]> call(RxBleConnection rxBleConnection) { 
         byte[] value = new byte[1]; 
         value[0] = (byte) (3 & 0xFF); 
         //Buzz the device 
         return rxBleConnection.writeCharacteristic(Constants.BUZZER_SELECT, value); 
        } 
       }) 
       .repeat(3)//ignore 
       .takeUntil(device.observeConnectionStateChanges().filter(new Func1<RxBleConnection.RxBleConnectionState, Boolean>() { 
        @Override 
        public Boolean call(RxBleConnection.RxBleConnectionState rxBleConnectionState) { 

         return rxBleConnectionState == RxBleConnection.RxBleConnectionState.DISCONNECTING; 
        } 
       })); 
    } 

此代碼似乎立即發出列表中的所有項目,因此將連接並在同一嗡嗡的所有項目時間。我怎樣才能一次發出一件物品,以便我可以與他們互動?

僞代碼會是這樣的:

for(Device device : devices) { 
    device.connect(); 
    while(device.isConnected()) { 
     device.beep(); 
    } 
} 

回答

1

更換flatMapconcatMap

.concatMap(device -> device.connectForPolicing()) 

flatMap使用merge運營商。它會立即發射所有物品。而concatMap使用concat,它順序發出物品。 Good article about it.

+0

我把文章讀。根據我的理解'.concatMap()'只是保留了項目的順序,但將'.flatMap()'切換到'.concatMap()'不會導致查找的阻塞行爲。如果兩個項目在列表中,他們都會讓我一個接一個排出。 – Orbit

+0

它會,但每個項目都會等到device.connectForPolicing()完成前一個項目。 –

+0

我試過了你的建議,雖然還沒有能夠獲得原始問題中描述的功能。至多,我可以讓一個設備發出嘟嘟聲,但不會重複。看着我的日誌,似乎只有一件事正在經歷。 – Orbit

0

您可以使用.flatMap(Observable, int)運算符。

Observable.from(mDevices) 
      .flatMap(
       new Func1<Device, Observable<?>>() { 
        @Override 
        public Observable<?> call(Device device) { 
         Log.d(TAG, "connecting for policing"); 
         return device.connectForPolicing(); 
        } 
       }, 
       1 
      ) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(new Action1<Object>() { 
       @Override 
       public void call(Object o) { 
        Log.d(TAG, "subscribing... "); 
       } 
      }); 

int參數限制了最大併發操作。在這種情況下,它將被順序處理。

如果你想反覆嗡嗡聲設備,直到它會斷開,然後還需要在connectForPolice()功能的變化:

public Observable<byte[]> connectForPolice(RxBleDevice device) { 

    .... 

    return device.establishConnection(mContext, false) 
      .flatMap(new Func1<RxBleConnection, Observable<byte[]>>() { // once the connection is established ... 
       @Override 
       public Observable<byte[]> call(RxBleConnection rxBleConnection) { 
        byte[] value = new byte[1]; 
        value[0] = (byte) (3 & 0xFF); 
        //Buzz the device 
        return Observable // ... we return an observable ... 
          .defer(new Func0<Observable<byte[]>>() { 
           @Override 
           public Observable<byte[]> call() { 
            return rxBleConnection.writeCharacteristic(Constants.BUZZER_SELECT, value); // ... (that on each subscription will emit a fresh write characteristic observable) ... 
           } 
          }) 
          .repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() { // ... which we will subscribe (the Observable.defer()) again ... 
           @Override 
           public Observable<?> call(Observable<? extends Void> observable) { 
            return observable.delay(10, TimeUnit.SECONDS); // ... after 10 seconds from the previous complete 
           } 
          }); 
       } 
      }) 
      .onErrorResumeNext(new Func1<Throwable, Observable<? extends byte[]>>() { // if the device will trigger disconnect then a BleDisconnectedException will be thrown ... 
       @Override 
       public Observable<? extends byte[]> call(Throwable throwable) { 
        return Observable.empty(); // ... in which situation we will just finish the Observable 
       } 
      }); 
} 
+0

我認爲主要的問題是在'.connectForPolicing()'方法中,'.establishConnection()'和'.writeCharacteristic()'方法是異步的。我需要重複寫出特徵直到X發生,此時下一個項目可以被允許沿着流向下。 – Orbit

+0

你有沒有試過我的建議? –

+0

是的,儘管我只看到一個物品放在物流的下方,即使該物品包含> 1個物品,也只有1個物品被放置。 – Orbit

相關問題