Skip to content

Tutorial: RxBleCustomOperation

Dariusz Seweryn edited this page Mar 1, 2019 · 1 revision

Intention

Every additional layer of code one adds to their application comes with a cost. This cost (apart from additional fields/methods/classes that add up to the 64k DEX limit) is potentially reduced performance. This library is no exception. In case of BLE it is rarely a problem due to the nature and typical use-cases of this interface which is to occasionally transmit a limited amount of data.

The intention for RxAndroidBle is to allow developers to model their connection algorithms in possibly stateless way to reduce possibility of bugs. Sometimes however there is a need to transmit bulk data as quickly as possible — BLE was not designed with this use-case in mind — in the end to do so requires a lot of roundtrips from the application code to the BLE stack and back. Needless to say — additional layers of code make these roundtrips longer. This is where one can implement their own RxBleCustomOperation which has access to the raw Android's BluetoothGatt/BluetoothGattCallback objects and cut the "middle-man" out.

There are two potential benefits of using a custom operation:

Important

  • Using RxBleCustomOperation bypasses all layers of RxAndroidBle code which help to keep the Android BLE stack in a stable state — this may potentially degrade usage experience
  • RxBleCustomOperation partially uses library's internal API which may potentially change in minor version updates

Example usages

Getting access to native API outside of the library classes scope

Example by @RobLewissource

Click to see the code
package net.grlewis.cufftest;

import android.bluetooth.BluetoothGatt;

import com.polidea.rxandroidble2.RxBleCustomOperation;
import com.polidea.rxandroidble2.internal.connection.RxBleGattCallback;

import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.annotations.NonNull;

public class GetGattOperation implements RxBleCustomOperation<BluetoothGatt> {
    
    private BluetoothGatt gatt;
    
    // How this works:
    
    // You call rxBleConnection.queue( <instance of this class> )
    // It returns an Observable<T>--call it Observable A
    
    // The queue manager calls the .asObservable() method below,
    // which returns another Observable<T>--call it Observable B
    // It is placed in the queue for execution
    
    // When it's time to run this operation, ConnectionOperationQueue will
    // subscribe to Observable B (here, Observable.just( bluetoothGatt ))
    
    // Emissions from this Observable B (here, the bluetoothGatt) are forwarded to the Observable A returned by the .queue() method
    
    // Instances can be queued and received via a subscription to Observable A:
    // rxBleConnection.queue( new GetGattOperation() ).subscribe( gatt -> {} );
    
    
    @Override // returns "Observable B" (see above)
    @NonNull
    public Observable<BluetoothGatt> asObservable(BluetoothGatt bluetoothGatt,
                                                  RxBleGattCallback rxBleGattCallback,
                                                  Scheduler scheduler) throws Throwable {
        
        gatt = bluetoothGatt;
        return Observable.just(bluetoothGatt);  // return Observable B (emits Gatt then completes)
    }
    
    public BluetoothGatt getGatt() {
        return gatt;
    } 
}

Call vanilla API refresh method by reflection

BluetoothGatt.refresh() refreshes internals (e.g. service/characteristic caches) of a particular BluetoothGatt.

Click to see the code
RxBleCustomOperation<Void> bluetoothGattRefreshCustomOp = (bluetoothGatt, rxBleGattCallback, scheduler) -> {
    try {
        Method bluetoothGattRefreshFunction = bluetoothGatt.getClass().getMethod("refresh");
        boolean success = (Boolean) bluetoothGattRefreshFunction.invoke(bluetoothGatt);
        if (!success) return Observable.error(new RuntimeException("BluetoothGatt.refresh() returned false"));
        return Observable.<Void>empty().delay(500, TimeUnit.MILLISECONDS);
    } catch (NoSuchMethodException e) {
        return Observable.error(e);
    } catch (IllegalAccessException e) {
        return Observable.error(e);
    } catch (InvocationTargetException e) {
        return Observable.error(e);
    }
};

Discover services bypassing library cache

Click to see the code
RxBleCustomOperation<List<BluetoothGattService>> discoverServicesCustomOp = (bluetoothGatt, rxBleGattCallback, scheduler) -> {
    boolean success = bluetoothGatt.discoverServices();
    if (!success) return Observable.error(new RuntimeException("BluetoothGatt.discoverServices() returned false"));
    return rxBleGattCallback.getOnServicesDiscovered()
            .take(1) // so this RxBleCustomOperation will complete after the first result from BluetoothGattCallback.onServicesDiscovered()
            .map(RxBleDeviceServices::getBluetoothGattServices);
};

Optimise batch write

Click to see the code
RxBleCustomOperation<Void> optimisedWriteOp = (bluetoothGatt, rxBleGattCallback, scheduler) -> Observable.create(
        emitter -> {
            Log.i("START", String.valueOf(testConnection.batchCount));
            final byte[] data = new byte[20];
            testConnection.characteristic.setValue(data);
            final AtomicBoolean writeCompleted = new AtomicBoolean(false);
            final AtomicBoolean ackCompleted = new AtomicBoolean(false);
            final AtomicInteger batchesSent = new AtomicInteger(0);
            final Runnable writeNextBatch = () -> {
                data[0]++;
                if (!bluetoothGatt.writeCharacteristic(testConnection.characteristic)) {
                    emitter.onError(new BleGattCannotStartException(bluetoothGatt, BleGattOperationType.CHARACTERISTIC_WRITE));
                } else {
                    Log.i("SEND", String.valueOf(data[0]));
                    batchesSent.incrementAndGet();
                }
            };
            final BluetoothGattCallback bluetoothGattCallback = new BluetoothGattCallback() {
                @Override
                public void onCharacteristicWrite(BluetoothGatt gatt, BluetoothGattCharacteristic characteristic, int status) {
                    if (status != BluetoothGatt.GATT_SUCCESS) {
                        emitter.onError(new BleGattException(gatt, status, BleGattOperationType.CHARACTERISTIC_WRITE));
                    } else if (batchesSent.get() == testConnection.batchCount) {
                        if (ackCompleted.get()) {
                            batchesSent.set(0);
                            ackCompleted.set(false);
                            emitter.onNext(null);
                            writeNextBatch.run();
                        } else {
                            writeCompleted.set(true);
                        }
                    } else {
                        characteristic.setValue(data);
                        writeNextBatch.run();
                    }
                }

                @Override
                public void onCharacteristicChanged(BluetoothGatt gatt, BluetoothGattCharacteristic characteristic) {
                    final byte[] bytes = characteristic.getValue();
                    Log.i("ACK", Arrays.toString(bytes) + "/" + bytes.length + "/" + System.identityHashCode(bytes));
                    characteristic.setValue(data);
                    if (writeCompleted.get()) {
                        batchesSent.set(0);
                        writeCompleted.set(false);
                        emitter.onNext(null);
                        writeNextBatch.run();
                    } else {
                        ackCompleted.set(true);
                    }
                }
            };

            rxBleGattCallback.setNativeCallback(bluetoothGattCallback);

            Log.i("SEND", String.valueOf(data[0]));
            if (!bluetoothGatt.writeCharacteristic(testConnection.characteristic)) {
                emitter.onError(new BleGattCannotStartException(bluetoothGatt, BleGattOperationType.CHARACTERISTIC_WRITE));
            } else {
                batchesSent.incrementAndGet();
            }
        },
        Emitter.BackpressureMode.NONE
);