Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

构建dongle版的microbit插件 #17

Open
wwj718 opened this issue May 29, 2019 · 18 comments
Open

构建dongle版的microbit插件 #17

wwj718 opened this issue May 29, 2019 · 18 comments

Comments

@wwj718
Copy link
Contributor

wwj718 commented May 29, 2019

@wangshub 完成了dongle class,我这两天会发布一个包含dongle class新版本。

大家可以先用于本地测试

import time
import pygatt
import tenacity
import binascii


# pygatt api doc : https://github.com/peplin/pygatt/blob/master/pygatt/device.py
class Dongle(object):
    def __init__(self):
        self.adapter = pygatt.BGAPIBackend()
        self.device = None
        self.is_running = True

    @tenacity.retry(stop=tenacity.stop_after_attempt(3))
    def bled_start(self):
        self.adapter.start()

    def bled_stop(self):
        self.adapter.stop()
        self.is_running = False

    def bled_scan(self):
        devices = self.adapter.scan()
        return devices

    def bled_rssi(self):
        return self.device.get_rssi()

    @tenacity.retry(stop=tenacity.stop_after_attempt(3))
    def bled_connect(self, mac_addr):
        self.device = self.adapter.connect(mac_addr, address_type=pygatt.BLEAddressType.random)
        print('connect to {}'.format(mac_addr))
        print("rssi   : " + str(self.device.get_rssi()))
        print("chars  : " + str(self.device.discover_characteristics()))
        time.sleep(1)

    def bled_subscribe(self, uuid, callback=None, indication=False):
        self.device.subscribe(uuid, callback=callback, indication=indication)

    def bled_write(self, uuid, data):
        self.device.char_write(uuid, bytearray(data))

    def bled_read(self, uuid):
        data = self.device.char_read(uuid)
        return data

    @staticmethod
    def _ascii_to_hex(rawhex):
        return binascii.unhexlify(rawhex)


if __name__ == '__main__':

    def read_cb(handle, value):
        bytes_str = binascii.hexlify(value)
        print(bytes_str)

    dongle = Dongle()
    try:
        dongle.bled_start()

        devices = dongle.bled_scan()
        for dev in devices:
            print(dev.get('address'), dev.get('rssi'), dev.get('name'))

        dongle.bled_connect('C1:68:4E:87:AD:70')

        dongle.bled_subscribe('0000ffe4-0000-1000-8000-00805f9a34fb', callback=read_cb)

        data = dongle.bled_read('0000ffe4-0000-1000-8000-00805f9a34fb')
        print('data = ', data)

        while True:
            time.sleep(1)
    finally:
        dongle.bled_stop()

我之前已经完成了对microbit官方固件的分析:https://blog.just4fun.site/scratch3-microbit-analysis.html ,可以直接接入scratch microbit官方固件。read部分我测试已经没问题了。

@wwj718
Copy link
Contributor Author

wwj718 commented May 29, 2019

@bilikyar 提到希望

读取模拟口的值

我的建议是利用makecode构建新的ble 固件,非常简单。

image

提醒

makecode默认的配置不会公开蓝牙,需要设置

image

ble服务细节: https://lancaster-university.github.io/microbit-docs/ble/profile/

@bilikyar
Copy link
Contributor

这个库具体怎么用? 必须要配BLED 112这款USB Dongle吗? 我在华硕笔记本上运行这个代码报了下面的错误:

RetryError[<Future at 0x2ee1130 state=finished raised BGAPIError>]

@wwj718
Copy link
Contributor Author

wwj718 commented May 30, 2019

@bilikyar 我目前是配合 BLED 112,对USB Dongle的要求,可以参考pygatt文档

@wwj718
Copy link
Contributor Author

wwj718 commented May 30, 2019

最新的codelab-adapter内置了Dongle

测试插件: https://github.com/Scratch3Lab/codelab_adapter_extensions/blob/master/extension_dongle_scan_test.py

image

@wwj718
Copy link
Contributor Author

wwj718 commented May 30, 2019

对microbit scratch官方固件的读取已经完成:

https://github.com/Scratch3Lab/codelab_adapter_extensions/blob/master/extension_dongle_read_test.py

image

对数据的解析 参考我之前的分析: https://blog.just4fun.site/scratch3-microbit-analysis.html

@bilikyar
Copy link
Contributor

bilikyar commented Jun 1, 2019

@wangshub dongle插件应该是支持Makeblock写的固件是吗?我用此文章的方法做实验的时候遇到了问题,Microbit固件代码如下:

microbit-屏幕截图 (1)

1.运行如下代码时,无法检测UART通道上写入的值

dongle.bled_connect('E1:4C:E0:01:43:18') #这是我Microbit的Mac地址
dongle.bled_subscribe('6e400002-b5a3-f393-e0a9-e50e24dcca9e', callback=read_cb)   #订阅rx

2.运行如下代码时,Microbit没有任何变化

dongle.bled_write('6e400003-b5a3-f393-e0a9-e50e24dcca9e',0x0a) # 往tx写换行符

3.此固件我已经通过其他各类BLE软件测试过了,运行没问题

@wangshub
Copy link
Member

wangshub commented Jun 1, 2019

@bilikyar

Hi,我的手上没有设备,暂时没办法测试,我观察到这个位置代码有些不一样,传参改成 List 试一下?

dongle.bled_write('6e400003-b5a3-f393-e0a9-e50e24dcca9e', [0x0a]) # 往tx写换行符

@wangshub
Copy link
Member

wangshub commented Jun 1, 2019

image

@bilikyar
Copy link
Contributor

bilikyar commented Jun 1, 2019

@wangshub 恩,你帮到我了,最后确定了不是插件的问题,是我固件的问题:
1.写UART运行成功
2.订阅特征部分还是遇到困难,不能订阅直接写入UART的信息

@bilikyar
Copy link
Contributor

bilikyar commented Jun 1, 2019

虽然这个插件能完美解决Scratch link的问题,可是scratch link的固件阉割掉了很多BLE的功能:

  1. IO 服务
  2. 磁力计服务
  3. UART 服务部分直接重写了

所以,我想研究codelab的固件,使得BLE 版 Microbit 更加强大

@bilikyar
Copy link
Contributor

https://github.com/audetto/asi-link

I've written a super simple implementation in nodejs of a link for serial ports (USB or BT Classic), which should work on all platforms.
Not tested with any real device as I do not have a device to test it with. I am still writing the scratch-vm extension for an mBot, then I will do the full test.

I might be completely off, but it looks super simple, with a few caveats

  • the whole device-manager.scratch.mit.edu + wss:// seems an over complication to me.

    • I am running with ws://localhost and it works perfectly
    • the certificates and the key are available in cleartext from the mac package, so it is hardly providing any security
    • what is the rationale behind this solution? it makes it hard to recompile and reimplement scratch-link
  • about the protocol: using the gui I can load multiple BT extensions, but so far I have not found a way to detect which device each message belongs to. Have I missed something in the protocol?

社区里开始有人写link的nodejs版本,应该对我们后面的BLE的研究有启发

@bilikyar
Copy link
Contributor

我已经成功通过MackCode开启Microbit的任意的BLE服务(总过有七个服务,可是最多同时开3到4个,不然内存会不够)并通过dongle来对其读写订阅等,算是有了比较好的进展。现在就需要一个adapter插件和Scratch插件了。

@bilikyar
Copy link
Contributor

经过一段时间的整理,我在dongle的基础上完成了一下内容,代码如下:

  1. 收集A和B按钮的数据
  2. 收集加速度,温度,磁力计传感器的数据
  3. 对于led发送文字和形状信息
  4. 对于引脚的输入数据的收集,其中包含模拟和数字信息
  5. 对于引脚的数字控制以及PWM控制
  6. 对于UART端的控制
import time
import pygatt
import tenacity
import binascii


# pygatt api doc : https://github.com/peplin/pygatt/blob/master/pygatt/device.py
class Dongle(object):
    def __init__(self):
        self.adapter = pygatt.BGAPIBackend()
        self.device = None
        self.is_running = True

    @tenacity.retry(stop=tenacity.stop_after_attempt(3))
    def bled_start(self):
        self.adapter.start()

    def bled_stop(self):
        self.adapter.stop()
        self.is_running = False

    def bled_scan(self):
        devices = self.adapter.scan()
        return devices

    def bled_rssi(self):
        return self.device.get_rssi()

    @tenacity.retry(stop=tenacity.stop_after_attempt(3))
    def bled_connect(self, mac_addr):
        self.device = self.adapter.connect(
            mac_addr, address_type=pygatt.BLEAddressType.random)
        print('connect to {}'.format(mac_addr))
        print("rssi   : " + str(self.device.get_rssi()))

        # print("chars  : " + str(self.device.discover_characteristics()))
        # for i in self.device.discover_characteristics():
        #     print(i)

        time.sleep(1)

    def bled_subscribe(self, uuid, callback=None, indication=False):
        try:
            self.device.subscribe(uuid, callback=callback, indication=indication)
        except Exception as e:
            print(e)

    def bled_write(self, uuid, data,length = 1):
        if isinstance(data,int):
            data = data.to_bytes(length, byteorder='little')
        print("写入:", bytearray(data))
        self.device.char_write(uuid, bytearray(data))

    def bled_read(self, uuid):
        data = self.device.char_read(uuid)
        return data

    @staticmethod
    def _ascii_to_hex(rawhex):
        return binascii.unhexlify(rawhex)


if __name__ == '__main__':

    def read_cb(handle, value):

        a = value
        b = [bytes([a[i]])+ bytes([a[i+1]]) for i in range(0,len(a),2)]
        c = [int.from_bytes(i, byteorder='little', signed=True) for i in b]
        print(c)

        # bytes_str = binascii.hexlify(value)
        # print(bytes_str)

    dongle = Dongle()
    try:
        dongle.bled_start()

        devices = dongle.bled_scan()
        for dev in devices:
            print(dev.get('address'), dev.get('rssi'), dev.get('name'))

        target_mac_address = 'DA:19:8A:61:AD:95'

        microbit_uuid = {
        'button_A' : 'e95dda90-251d-470a-a062-fa1922dfa9a8', #按钮A数据
        'button_B' : 'e95dda91-251d-470a-a062-fa1922dfa9a8', #按钮B数据
        'accelerometer' : 'e95dca4b-251d-470a-a062-fa1922dfa9a8', #加速度数据
        'Temperature' : 'e95d9250-251d-470a-a062-fa1922dfa9a8', #温度计数据
        'magnetometer' : 'e95dfb11-251d-470a-a062-fa1922dfa9a8', #磁力计数据
        'led_char' : 'e95d93ee-251d-470a-a062-fa1922dfa9a8', #led文字
        'led_matrix' : 'e95d7b77-251d-470a-a062-fa1922dfa9a8', #led矩阵
        'pin_date' : 'e95d8d00-251d-470a-a062-fa1922dfa9a8',  # pin读写服务
        'pinMode_config' : 'e95d5899-251d-470a-a062-fa1922dfa9a8',  # pin引脚配置
        'ADC_config' : 'e95db9fe-251d-470a-a062-fa1922dfa9a8',  # pinAD配置
        'PWM_config' : 'e95dd822-251d-470a-a062-fa1922dfa9a8',  # PWM设置
        'tx': '6e400003-b5a3-f393-e0a9-e50e24dcca9e',  # tx设置
        'rx': '6e400002-b5a3-f393-e0a9-e50e24dcca9e',  # rx设置
        }

        dongle.bled_connect(target_mac_address)  # 这是我Microbit的Mac地址

        dongle.bled_write(microbit_uuid['led_matrix'], [17, 27, 27, 27, 27]) #矩阵图标的显示方式
        
        dongle.bled_subscribe(microbit_uuid['accelerometer'], callback=read_cb,indication=False)  # 订阅传感器数据

        while True:
            time.sleep(1)
    finally:
        dongle.bled_stop()

@bilikyar
Copy link
Contributor

目前待解决的问题如下:

  1. scratch连接特定的microbit(这个可以单独开一个issue讨论)
  2. adapter 收集数据逻辑(后端)
  3. scratch 具体的积木实现(前端)

@wwj718
Copy link
Contributor Author

wwj718 commented Jun 25, 2019

@bilikyar 问题1 可以在这个issue里讨论: #27

这个issue试图跟进的问题是,如何将adapter作为像link一样的server,所有的交互在前端实现(link的策略是基于rpc消息)

@bilikyar
Copy link
Contributor

@wwj718 根据在 Scratch micro:bit extension与Scratch Link通信的细节 里分析的消息体的结构,我用dongle库编写了一下 adapter 的 Microbit ble 插件,由于库的限制无法呈现出 link 全部的功能,是一种单线程的实现方式。 还有个问题是,scratch 端主动断开之后,重新连接 Microbit,只能是通过重启插件的方式来连接。

'''
EIM: Everything Is Message
'''
import time
import threading

from codelab_adapter import settings
from codelab_adapter.core_extension import Extension
from codelab_adapter.utils import threaded
from codelab_adapter.dongle import Dongle
import time
import pygatt
import tenacity
import binascii
import base64


class BleMicrobit(Extension):
    def __init__(self):
        name = type(self).__name__  # class name
        super().__init__(name)
        self.TOPIC = "eim"

    def read_cb(self, handle, value):
        message = base64.b64encode(value)

        self.publish({
            "payload": {
                "encoding": "base64",
                "message": str(message)[2:-1]  #处理字符
            },
            "method": "characteristicDidChange",
            "topic": self.TOPIC,
        })

    @threaded
    def message_monitor(self):
        while self._running:
            read_message = self.read()
            topic = read_message.get("topic")
            if topic == self.TOPIC:

                self.logger.info("消息:{}".format(read_message))

                method = read_message.get("method")
                data = read_message.get("payload")

                if method == 'discover':
                    try:
                        self.services = data.get('filters')[
                            0].get('services')[0]
                    except Exception as e:
                        self.publish({
                            "messageID": read_message.get("messageID"),
                            "error": str(e),
                            "topic": self.TOPIC,
                        })
                    else:
                        self.publish({
                            "messageID": read_message.get("messageID"),
                            "result": "",
                            "topic": self.TOPIC,
                        })

                    try:
                        self.logger.info("开始扫描BLE")
                        devices = self.dongle.bled_scan()
                    except Exception as e:
                        self.logger.info("BLE扫描错误:{}".format(e))
                    else:
                        for dev in devices:
                            try:
                                complete_list_16_bit_service_class_uuids = dev.get('packet_data').get(
                                    'connectable_advertisement_packet').get('complete_list_16-bit_service_class_uuids')
                            except Exception as e:
                                pass
                            else:
                                if isinstance(complete_list_16_bit_service_class_uuids, bytearray):
                                    if int.from_bytes(complete_list_16_bit_service_class_uuids, byteorder='little') == self.services:
                                        self.publish({
                                            "payload": {
                                                "name": dev.get('name'),
                                                "rssi": dev.get('rssi'),
                                                "peripheralId": dev.get('address')
                                            },
                                            "method": "didDiscoverPeripheral",
                                            "topic": self.TOPIC,
                                        })

                elif method == 'connect':
                    self.peripheralId = data.get("peripheralId")

                    try:
                        self.logger.info("开始连接")
                        self.dongle.bled_connect(self.peripheralId)
                    except Exception as e:
                        self.logger.info("连接错误:{}".format(str(e)))

                        self.publish({
                            "messageID": read_message.get("messageID"),
                            "error": str(e),
                            "topic": self.TOPIC
                        })

                        self.ble_connected = False
                    else:
                        self.publish({
                            "messageID": read_message.get("messageID"),
                            "result": "",
                            "topic": self.TOPIC
                        })

                elif method == 'read':
                    characteristicId = data.get("characteristicId")

                    self.dongle.bled_subscribe(
                        characteristicId, callback=self.read_cb, indication=True)  # 订阅传感器数据

                    self.publish(
                        {
                            "messageID": read_message.get("messageID"),
                            "result": {
                                "encoding": "base64",
                                "message": ""
                            },
                            "topic": self.TOPIC
                        }
                    )

                elif method == 'write':
                    characteristicId = data.get("characteristicId")
                    text = data.get("message").encode('ascii')
                    self.logger.info("收到的消息为:{}".format(text))
                    try:
                        self.dongle.bled_write(
                            characteristicId, text)
                    except Exception as e:
                        self.logger.info("写消息错误:{}".format(e))
                    else:
                        self.publish(
                            {
                                "messageID": read_message.get("messageID"),
                                "topic": self.TOPIC,
                                "result": 6
                            }
                        )

    def run(self):

        self.dongle = Dongle()

        try:
            self.logger.info("启动BLE")

            self.dongle.bled_start()
            self.message_monitor()

            while True:
                time.sleep(1)

        except Exception as e:
            self.logger.info("BLE启动出错:{}".format(e))

        finally:
            self.dongle.bled_stop()


export = BleMicrobit

@wwj718
Copy link
Contributor Author

wwj718 commented Jul 29, 2019

@bilikyar 这是个不错的工具 https://github.com/ukBaz/python-bluezero

@bilikyar
Copy link
Contributor

bilikyar commented Jul 30, 2019

@wwj718 是个很不错的工具, 尤其是用树莓派来做python教学的时候,通过BLE设备开展教学工作的好方法,最重要的一点不需要再添加额外的蓝牙适配器。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants