Skip to content

Commit

Permalink
Add Spinbox widget for mirror port
Browse files Browse the repository at this point in the history
Threads for writing log and mirror OSC messages
  • Loading branch information
pauloesteban committed Jul 30, 2023
1 parent 3f3c309 commit b0c8eae
Showing 1 changed file with 97 additions and 30 deletions.
127 changes: 97 additions & 30 deletions metabow_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
# Modified by Paulo Chiliguano (@pauloesteban) and KA HO Wong
# Directed by Dr Roberto Alonso Trillo
# Department of Music - Hong Kong Baptist University
# 2022
# 2023

import asyncio
import csv
import threading
import tkinter as tk
from datetime import datetime
from functools import partial
Expand All @@ -23,15 +24,29 @@
from utils import bytearray_to_fusion_data, log_file_path


class FusionThread(threading.Thread):
def __init__(self, target, args=()):
super().__init__(target=target, args=args)
self._fusion_data = None

def run(self):
self._fusion_data = self._target(self._args)

def get_fusion_data(self):
return self._fusion_data


class Window(tk.Tk):
def __init__(self, loop): # Delete type hint when building on Windows
def __init__(self, loop):
self.root = tk.Tk()
self.root.title("Metabow OSC bridge")
self.root.resizable(False, False)
self.root.protocol("WM_DELETE_WINDOW", self.on_exit)
self.loop = loop
self.port0 = tk.StringVar()
self.port0 = tk.IntVar()
self.port0.set(8888)
self.port1 = tk.IntVar()
self.port1.set(8889)
self.ports_frame = self.create_ports_frame(self.root)
self.ports_frame.grid(column=0, row=0, padx=10, pady=10, sticky=tk.NW)
self.devices_frame = self.create_scanner_frame(self.root)
Expand Down Expand Up @@ -93,7 +108,10 @@ def _instantiate_scanner(self):

def _instantiate_udp_client(self):
localhost = "127.0.0.1"
self.udp_client = udp_client.SimpleUDPClient(localhost, int(self.port0.get()))
self.udp_client = udp_client.SimpleUDPClient(localhost,
self.port0.get())
self.udp_client_mirror = udp_client.SimpleUDPClient(localhost,
self.port1.get())


def on_exit(self):
Expand All @@ -106,11 +124,20 @@ def create_ports_frame(self, container):
label_frame.grid(row=0, column=0, sticky=tk.W)
title = ttk.Label(label_frame, text=f"Device Port:")
title.grid(row=0, column=0, sticky=tk.W)
self.port0_spinbox = ttk.Spinbox(label_frame, from_=1024, to=49151, textvariable=self.port0, width=10)
self.port0_spinbox = ttk.Spinbox(label_frame,
from_=1024,
to=49151,
textvariable=self.port0,
width=10)
self.port0_spinbox.grid(row=0, column=1, sticky=tk.W)

# for widget in frame.winfo_children():
# widget.grid(padx=0, pady=5)
title1 = ttk.Label(label_frame, text=f"Mirror Port:")
title1.grid(row=1, column=0, sticky=tk.W)
self.port1_spinbox = ttk.Spinbox(label_frame,
from_=1024,
to=49151,
textvariable=self.port1,
width=10)
self.port1_spinbox.grid(row=1, column=1, sticky=tk.W)

return label_frame

Expand Down Expand Up @@ -217,6 +244,7 @@ async def connect(self):
return
self.is_notify_loop = True
self.port0_spinbox.state(['disabled'])
self.port1_spinbox.state(['disabled'])
self.address_checkbox.state(['disabled'])
self._instantiate_udp_client()
self.start_scan_button.state(['disabled'])
Expand All @@ -242,6 +270,7 @@ async def notify(self, i, device):
async def disconnect(self):
self.is_notify_loop = False
self.port0_spinbox.state(['!disabled'])
self.port1_spinbox.state(['!disabled'])
self.disconnect_button.state(['disabled'])
self.start_scan_button.state(['!disabled'])
self.address_checkbox.state(['!disabled'])
Expand All @@ -252,37 +281,75 @@ async def disconnect(self):
def notification_handler(self, device_number: int | str, sender: int, data: bytearray):
"""Simple notification handler
"""
fusion_thread = FusionThread(target=bytearray_to_fusion_data,
args=data)
fusion_thread.start()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S.%f")
address_raw = f"/{device_number}/raw"
fusion_thread.join()
fusion_data = fusion_thread.get_fusion_data()
model_thread = threading.Thread(target=self.model.tick,
args=(fusion_data[1:4],
fusion_data[4:7],
fusion_data[7:10]))
model_thread.start()
self.udp_client.send_message(address_raw, fusion_data)
self.udp_client_mirror.send_message(address_raw, fusion_data)
model_thread.join()
write_thread = threading.Thread(target=self._write_log,
args=(timestamp, device_number, fusion_data, self.model))
write_thread.start()

if self.port0 != self.port1:
send_mirror_thread = threading.Thread(target=self._send_messages,
args=(device_number, self.udp_client_mirror, self.model))
send_mirror_thread.start()
self._send_messages(device_number, self.udp_client, self.model)
send_mirror_thread.join()
else:
self._send_messages(device_number, self.udp_client, self.model)
write_thread.join()


def _write_log(self, timestamp, device_number, fusion_data, model):
data = [device_number,
timestamp,
*fusion_data[1:],
*model.quaternion.elements.tolist(),
*model.movement_acceleration.tolist(),
*model.acceleration_derivative.tolist(),
*model.movement_velocity.tolist(),
model.skewness,
model.tilt,
model.roll]

with open(self.log_name, 'a', encoding='UTF8') as f:
writer = csv.writer(f)
writer.writerow(data)


def _send_messages(self, device_number, udp_client, model):
address_quaternion = f"/{device_number}/quaternion"
quaternion = model.quaternion.elements.tolist()
udp_client.send_message(address_quaternion, quaternion)
address_sensor_frame = f"/{device_number}/motion_acceleration/sensor_frame"
join_array = bytearray_to_fusion_data(data)
self.model.tick(join_array[1:4], join_array[4:7], join_array[7:10])
quaternion = self.model.quaternion.elements.tolist()
movement_accl = self.model.movement_acceleration.tolist()
self.udp_client.send_message(address_raw, join_array)
self.udp_client.send_message(address_quaternion, quaternion)
self.udp_client.send_message(address_sensor_frame, movement_accl)
movement_accl = model.movement_acceleration.tolist()
udp_client.send_message(address_sensor_frame, movement_accl)
address_sensor_derivative = f"/{device_number}/motion_acceleration/sensor_derivative"
accl_derivative = self.model.acceleration_derivative.tolist()
self.udp_client.send_message(address_sensor_derivative, accl_derivative)
accl_derivative = model.acceleration_derivative.tolist()
udp_client.send_message(address_sensor_derivative, accl_derivative)
address_sensor_velocity = f"/{device_number}/motion_acceleration/sensor_velocity"
movement_vel = self.model.movement_velocity.tolist()
self.udp_client.send_message(address_sensor_velocity, movement_vel)
movement_vel = model.movement_velocity.tolist()
udp_client.send_message(address_sensor_velocity, movement_vel)
address_sensor_skewness = f"/{device_number}/motion_acceleration/skewness"
skewness = self.model.skewness
self.udp_client.send_message(address_sensor_skewness, skewness)
skewness = model.skewness
udp_client.send_message(address_sensor_skewness, skewness)
address_sensor_tilt = f"/{device_number}/motion_acceleration/tilt"
tilt = self.model.tilt
self.udp_client.send_message(address_sensor_tilt, tilt)
tilt = model.tilt
udp_client.send_message(address_sensor_tilt, tilt)
address_sensor_roll = f"/{device_number}/motion_acceleration/roll"
roll = self.model.roll
self.udp_client.send_message(address_sensor_roll, roll)
row = [device_number, timestamp, *join_array[1:], *quaternion, *movement_accl, *accl_derivative, *movement_vel, skewness, tilt, roll]

with open(self.log_name, 'a', encoding='UTF8') as f:
writer = csv.writer(f)
writer.writerow(row)
roll = model.roll
udp_client.send_message(address_sensor_roll, roll)


def populate_devices(self):
Expand Down

0 comments on commit b0c8eae

Please sign in to comment.