Skip to content

Commit

Permalink
Merge pull request #190 from google/uael/pandora-server
Browse files Browse the repository at this point in the history
pandora: import bumble pandora server from avatar
  • Loading branch information
uael committed May 17, 2023
2 parents 4bd8c24 + afcce0d commit f3bfbab
Show file tree
Hide file tree
Showing 10 changed files with 1,844 additions and 2 deletions.
30 changes: 30 additions & 0 deletions apps/pandora_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import asyncio
import click
import logging

from bumble.pandora import PandoraDevice, serve

BUMBLE_SERVER_GRPC_PORT = 7999
ROOTCANAL_PORT_CUTTLEFISH = 7300


@click.command()
@click.option('--grpc-port', help='gRPC port to serve', default=BUMBLE_SERVER_GRPC_PORT)
@click.option(
'--rootcanal-port', help='Rootcanal TCP port', default=ROOTCANAL_PORT_CUTTLEFISH
)
@click.option(
'--transport',
help='HCI transport',
default=f'tcp-client:127.0.0.1:<rootcanal-port>',
)
def main(grpc_port: int, rootcanal_port: int, transport: str) -> None:
if '<rootcanal-port>' in transport:
transport = transport.replace('<rootcanal-port>', str(rootcanal_port))
device = PandoraDevice({'transport': transport})
logging.basicConfig(level=logging.DEBUG)
asyncio.run(serve(device, port=grpc_port))


if __name__ == '__main__':
main() # pylint: disable=no-value-for-parameter
105 changes: 105 additions & 0 deletions bumble/pandora/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Bumble Pandora server.
This module implement the Pandora Bluetooth test APIs for the Bumble stack.
"""

__version__ = "0.0.1"

import grpc
import grpc.aio

from .config import Config
from .device import PandoraDevice
from .host import HostService
from .security import SecurityService, SecurityStorageService
from pandora.host_grpc_aio import add_HostServicer_to_server
from pandora.security_grpc_aio import (
add_SecurityServicer_to_server,
add_SecurityStorageServicer_to_server,
)
from typing import Callable, List, Optional

# public symbols
__all__ = [
'register_servicer_hook',
'serve',
'Config',
'PandoraDevice',
]


# Add servicers hooks.
_SERVICERS_HOOKS: List[Callable[[PandoraDevice, Config, grpc.aio.Server], None]] = []


def register_servicer_hook(
hook: Callable[[PandoraDevice, Config, grpc.aio.Server], None]
) -> None:
_SERVICERS_HOOKS.append(hook)


async def serve(
bumble: PandoraDevice,
config: Config = Config(),
grpc_server: Optional[grpc.aio.Server] = None,
port: int = 0,
) -> None:
# initialize a gRPC server if not provided.
server = grpc_server if grpc_server is not None else grpc.aio.server()
port = server.add_insecure_port(f'localhost:{port}')

try:
while True:
# load server config from dict.
config.load_from_dict(bumble.config.get('server', {}))

# add Pandora services to the gRPC server.
add_HostServicer_to_server(
HostService(server, bumble.device, config), server
)
add_SecurityServicer_to_server(
SecurityService(bumble.device, config), server
)
add_SecurityStorageServicer_to_server(
SecurityStorageService(bumble.device, config), server
)

# call hooks if any.
for hook in _SERVICERS_HOOKS:
hook(bumble, config, server)

# open device.
await bumble.open()
try:
# Pandora require classic devices to be discoverable & connectable.
if bumble.device.classic_enabled:
await bumble.device.set_discoverable(True)
await bumble.device.set_connectable(True)

# start & serve gRPC server.
await server.start()
await server.wait_for_termination()
finally:
# close device.
await bumble.close()

# re-initialize the gRPC server.
server = grpc.aio.server()
server.add_insecure_port(f'localhost:{port}')
finally:
# stop server.
await server.stop(None)
48 changes: 48 additions & 0 deletions bumble/pandora/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from bumble.pairing import PairingDelegate
from dataclasses import dataclass
from typing import Any, Dict


@dataclass
class Config:
io_capability: PairingDelegate.IoCapability = PairingDelegate.NO_OUTPUT_NO_INPUT
pairing_sc_enable: bool = True
pairing_mitm_enable: bool = True
pairing_bonding_enable: bool = True
smp_local_initiator_key_distribution: PairingDelegate.KeyDistribution = (
PairingDelegate.DEFAULT_KEY_DISTRIBUTION
)
smp_local_responder_key_distribution: PairingDelegate.KeyDistribution = (
PairingDelegate.DEFAULT_KEY_DISTRIBUTION
)

def load_from_dict(self, config: Dict[str, Any]) -> None:
io_capability_name: str = config.get(
'io_capability', 'no_output_no_input'
).upper()
self.io_capability = getattr(PairingDelegate, io_capability_name)
self.pairing_sc_enable = config.get('pairing_sc_enable', True)
self.pairing_mitm_enable = config.get('pairing_mitm_enable', True)
self.pairing_bonding_enable = config.get('pairing_bonding_enable', True)
self.smp_local_initiator_key_distribution = config.get(
'smp_local_initiator_key_distribution',
PairingDelegate.DEFAULT_KEY_DISTRIBUTION,
)
self.smp_local_responder_key_distribution = config.get(
'smp_local_responder_key_distribution',
PairingDelegate.DEFAULT_KEY_DISTRIBUTION,
)
157 changes: 157 additions & 0 deletions bumble/pandora/device.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Generic & dependency free Bumble (reference) device."""

from bumble import transport
from bumble.core import (
BT_GENERIC_AUDIO_SERVICE,
BT_HANDSFREE_SERVICE,
BT_L2CAP_PROTOCOL_ID,
BT_RFCOMM_PROTOCOL_ID,
)
from bumble.device import Device, DeviceConfiguration
from bumble.host import Host
from bumble.sdp import (
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
DataElement,
ServiceAttribute,
)
from typing import Any, Dict, List, Optional


class PandoraDevice:
"""
Small wrapper around a Bumble device and it's HCI transport.
Notes:
- The Bumble device is idle by default.
- Repetitive calls to `open`/`close` will result on new Bumble device instances.
"""

# Bumble device instance & configuration.
device: Device
config: Dict[str, Any]

# HCI transport name & instance.
_hci_name: str
_hci: Optional[transport.Transport] # type: ignore[name-defined]

def __init__(self, config: Dict[str, Any]) -> None:
self.config = config
self.device = _make_device(config)
self._hci_name = config.get('transport', '')
self._hci = None

@property
def idle(self) -> bool:
return self._hci is None

async def open(self) -> None:
if self._hci is not None:
return

# open HCI transport & set device host.
self._hci = await transport.open_transport(self._hci_name)
self.device.host = Host(controller_source=self._hci.source, controller_sink=self._hci.sink) # type: ignore[no-untyped-call]

# power-on.
await self.device.power_on()

async def close(self) -> None:
if self._hci is None:
return

# flush & re-initialize device.
await self.device.host.flush()
self.device.host = None # type: ignore[assignment]
self.device = _make_device(self.config)

# close HCI transport.
await self._hci.close()
self._hci = None

async def reset(self) -> None:
await self.close()
await self.open()

def info(self) -> Optional[Dict[str, str]]:
return {
'public_bd_address': str(self.device.public_address),
'random_address': str(self.device.random_address),
}


def _make_device(config: Dict[str, Any]) -> Device:
"""Initialize an idle Bumble device instance."""

# initialize bumble device.
device_config = DeviceConfiguration()
device_config.load_from_dict(config)
device = Device(config=device_config, host=None)

# Add fake a2dp service to avoid Android disconnect
device.sdp_service_records = _make_sdp_records(1)

return device


# TODO(b/267540823): remove when Pandora A2dp is supported
def _make_sdp_records(rfcomm_channel: int) -> Dict[int, List[ServiceAttribute]]:
return {
0x00010001: [
ServiceAttribute(
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
DataElement.unsigned_integer_32(0x00010001),
),
ServiceAttribute(
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.uuid(BT_HANDSFREE_SERVICE),
DataElement.uuid(BT_GENERIC_AUDIO_SERVICE),
]
),
),
ServiceAttribute(
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.sequence([DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]),
DataElement.sequence(
[
DataElement.uuid(BT_RFCOMM_PROTOCOL_ID),
DataElement.unsigned_integer_8(rfcomm_channel),
]
),
]
),
),
ServiceAttribute(
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.sequence(
[
DataElement.uuid(BT_HANDSFREE_SERVICE),
DataElement.unsigned_integer_16(0x0105),
]
)
]
),
),
]
}

0 comments on commit f3bfbab

Please sign in to comment.