Skip to content
Snippets Groups Projects
Commit 8ee3ec46 authored by Neil Gershenfeld's avatar Neil Gershenfeld
Browse files

wip

parent db450764
No related branches found
No related tags found
No related merge requests found
#
# blueart.py
# BLE NUS UART routines
#
# Neil Gershenfeld 11/13/23
#
# based on:
# https://github.com/micropython/micropython/blob/master/examples/bluetooth/ble_uart_peripheral.py
# https://github.com/micropython/micropython/blob/master/examples/bluetooth/ble_advertising.py
#
from micropython import const
import struct
import bluetooth
_ADV_TYPE_FLAGS = const(0x01)
_ADV_TYPE_NAME = const(0x09)
_ADV_TYPE_UUID16_COMPLETE = const(0x3)
_ADV_TYPE_UUID32_COMPLETE = const(0x5)
_ADV_TYPE_UUID128_COMPLETE = const(0x7)
_ADV_TYPE_UUID16_MORE = const(0x2)
_ADV_TYPE_UUID32_MORE = const(0x4)
_ADV_TYPE_UUID128_MORE = const(0x6)
_ADV_TYPE_APPEARANCE = const(0x19)
_ADV_MAX_PAYLOAD = const(32)
def advertising_payload(limited_disc=False,br_edr=False,name=None, services=None,appearance=0):
payload = bytearray()
def _append(adv_type, value):
nonlocal payload
payload += struct.pack("BB",len(value)+1,adv_type)+value
_append(_ADV_TYPE_FLAGS,struct.pack("B",(0x01 if limited_disc else 0x02)+(0x18 if br_edr else 0x04)),)
if name:
_append(_ADV_TYPE_NAME,name)
if services:
for uuid in services:
b = bytes(uuid)
if len(b) == 2:
_append(_ADV_TYPE_UUID16_COMPLETE,b)
elif len(b) == 4:
_append(_ADV_TYPE_UUID32_COMPLETE,b)
elif len(b) == 16:
_append(_ADV_TYPE_UUID128_COMPLETE,b)
if appearance:
_append(_ADV_TYPE_APPEARANCE,struct.pack("<h",appearance))
if len(payload) > _ADV_MAX_PAYLOAD:
raise ValueError("advertising payload too large")
return payload
def decode_field(payload,adv_type):
i = 0
result = []
while (i+1 < len(payload)):
if payload[i+1] == adv_type:
result.append(payload[i+2:i+payload[i]+1])
i += 1+payload[i]
return result
def decode_name(payload):
n = decode_field(payload,_ADV_TYPE_NAME)
return str(n[0],"utf-8") if n else ""
def decode_services(payload):
services = []
for u in decode_field(payload,_ADV_TYPE_UUID16_COMPLETE):
services.append(bluetooth.UUID(struct.unpack("<h",u)[0]))
for u in decode_field(payload,_ADV_TYPE_UUID32_COMPLETE):
services.append(bluetooth.UUID(struct.unpack("<d",u)[0]))
for u in decode_field(payload,_ADV_TYPE_UUID128_COMPLETE):
services.append(bluetooth.UUID(u))
return services
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_FLAG_WRITE = const(0x0008)
_FLAG_NOTIFY = const(0x0010)
_UART_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_TX = (bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E"),_FLAG_NOTIFY,)
_UART_RX = (bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E"),_FLAG_WRITE,)
_UART_SERVICE = (_UART_UUID,(_UART_TX,_UART_RX),)
_ADV_APPEARANCE_GENERIC_COMPUTER = const(128)
class uart:
def __init__(self, name="BLE UART",rxbuf=100):
self._ble = bluetooth.BLE()
self._ble.active(True)
self._ble.irq(self._irq)
((self._tx_handle,self._rx_handle),) = self._ble.gatts_register_services((_UART_SERVICE,))
self._ble.gatts_set_buffer(self._rx_handle,rxbuf, True)
self._connections = set()
self._rx_buffer = bytearray()
self._handler = None
self._payload = advertising_payload(name=name,appearance=_ADV_APPEARANCE_GENERIC_COMPUTER)
self._advertise()
def irq(self,handler):
self._handler = handler
def _irq(self,event,data):
if event == _IRQ_CENTRAL_CONNECT:
conn_handle, _, _ = data
self._connections.add(conn_handle)
elif event == _IRQ_CENTRAL_DISCONNECT:
conn_handle, _, _ = data
if conn_handle in self._connections:
self._connections.remove(conn_handle)
self._advertise()
elif event == _IRQ_GATTS_WRITE:
conn_handle, value_handle = data
if conn_handle in self._connections and value_handle == self._rx_handle:
self._rx_buffer += self._ble.gatts_read(self._rx_handle)
if self._handler:
self._handler()
def any(self):
return len(self._rx_buffer)
def read(self, sz=None):
if not sz:
sz = len(self._rx_buffer)
result = self._rx_buffer[0:sz]
self._rx_buffer = self._rx_buffer[sz:]
return result
def write(self, data):
for conn_handle in self._connections:
self._ble.gatts_notify(conn_handle,self._tx_handle,data)
def close(self):
for conn_handle in self._connections:
self._ble.gap_disconnect(conn_handle)
self._connections.clear()
def _advertise(self, interval_us=500000):
self._ble.gap_advertise(interval_us,adv_data=self._payload)
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<script>
//
// hello.BLE-blink.C3.html
// Seeed XIAO ESP32C3 BLE blink hello-world
// requires Chrome/Chromium/Edge
// with Experimental Web Platform features enabled
//
// Neil Gershenfeld 11/13/23
//
// based on https://github.com/makerdiary/web-device-cli
//
const ServiceUUID = '6e400001-b5a3-f393-e0a9-e50e24dcca9e'
......@@ -14,16 +22,19 @@ var bleDevice
var nusService
var rxCharacteristic
//
function message(msg) {
document.getElementById("message").innerHTML = msg
}
function connect() {
if (!navigator.bluetooth) {
console.log('Bluetooth not available, enable browser flag')
message('Bluetooth not available, enable Experimental Web Platform features browser flag')
return
}
navigator.bluetooth.requestDevice({
optionalServices:[ServiceUUID],acceptAllDevices:true})
.then(device => {
bleDevice = device
console.log('connect to '+device.name)
message('connecting to '+device.name)
bleDevice.addEventListener('gattserverdisconnected', ondisconnect)
return device.gatt.connect()
})
......@@ -39,9 +50,9 @@ function connect() {
let txCharacteristic = characteristic
txCharacteristic.startNotifications()
txCharacteristic.addEventListener('characteristicvaluechanged',receive)
console.log(bleDevice.name+' connected')})
message(bleDevice.name+' connected')})
.catch(error => {
console.log(''+error);
message(''+error);
if(bleDevice && bleDevice.gatt.connected) {
bleDevice.gatt.disconnect()
}
......@@ -49,27 +60,27 @@ function connect() {
}
function disconnect() {
if (!bleDevice) {
console.log('nothing connected')
message('nothing connected')
return
}
if (bleDevice.gatt.connected)
bleDevice.gatt.disconnect()
else
console.log('already disconnected')
message('already disconnected')
}
function ondisconnect() {
console.log(bleDevice.name+' disconnected')
message(bleDevice.name+' disconnected')
}
function receive(event) {
let value = event.target.value
let str = '';
for (let i = 0; i < value.byteLength; i++)
str += String.fromCharCode(value.getUint8(i))
console.log('receive: '+str)
message('received: '+str)
}
function transmit(s) {
if(bleDevice && bleDevice.gatt.connected) {
console.log("send: "+s)
message("sent: "+s)
let arr = new Uint8Array(s.length)
for (let i = 0; i < s.length; i++) {
let val = s[i].charCodeAt(0)
......@@ -78,7 +89,7 @@ function transmit(s) {
send(arr)
}
else
console.log('not connected')
message('not connected')
}
function send(a) {
let chunk = a.slice(0,MTU)
......@@ -98,11 +109,12 @@ function LEDoff() {
</script>
</head>
<body>
<button onclick='connect()'>Connect</button>&nbsp;
<button onclick='disconnect()'>Disconnect</button>&nbsp;
<button onclick='LEDon()'>turn LED on</button>&nbsp;
<button onclick='LEDoff()'>turn LED off</button>&nbsp;
<div id='message'></div>
<br>
<button onclick="connect()" style="margin-left:2%;font-size:150%">Connect</button>&nbsp;
<button onclick="disconnect()" style="font-size:150%">Disconnect</button>&nbsp;
<button onclick="LEDon()" style="font-size:150%">turn LED on</button>&nbsp;
<button onclick="LEDoff()" style="font-size:150%">turn LED off</button>&nbsp;
<br><br>
<div id='message' style="margin-left:2%"></div>
</body>
</html>
#
# hello.BLE-blink.C3.py
# Seeed XIAO ESP32C3 BLE blink hello-world
#
# Neil Gershenfeld 11/13/23
#
# This work may be reproduced, modified, distributed,
# performed, and displayed for any purpose, but must
# acknowledge this project. Copyright is retained and
# must be preserved. The work is provided as is; no
# warranty is provided, and users accept all liability.
#
# load MicroPython:
# https://micropython.org/download/ESP32_GENERIC_C3/
# esptool.py --chip esp32c3 --port /dev/ttyACM0 erase_flash
# esptool.py --chip esp32c3 --port /dev/ttyACM0 --baud 460800 write_flash -z 0x0 ESP32_GENERIC_C3-20231005-v1.21.0.bin
#
import blueart
from machine import Pin
#
led_pin = 20
button_pin = 21
#
led = Pin(led_pin,Pin.OUT)
led.value(0)
button = Pin(button_pin,Pin.IN,Pin.PULL_UP)
button_up = True
#
def rx():
message = uart.read().decode().strip()
print("received: "+message)
if (message == "1"):
led.value(1)
elif (message == "0"):
led.value(0)
#
uart = blueart.uart()
uart.irq(handler=rx)
#
try:
while True:
if ((button.value() == 0) and button_up):
led.value(1)
print("send: button down")
uart.write('button down')
button_up = False
elif ((button.value() == 1) and (not button_up)):
led.value(0)
print("send: button up")
uart.write('button up')
button_up = True
except:
uart.close()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment