/
WinkDetection.py
148 lines (123 loc) · 5.11 KB
/
WinkDetection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
''' Demonstrates how to subscribe to and handle data from gaze and event streams '''
import numpy as np
import cv2
import time
import adhawkapi
import adhawkapi.frontend
left_wink_counter = 0
right_wink_counter = 0
left_wink = False
right_wink = False
wink_thresh = 8
class FrontendData:
''' BLE Frontend '''
def __init__(self):
# Instantiate an API object
# TODO: Update the device name to match your device
self._api = adhawkapi.frontend.FrontendApi(ble_device_name='ADHAWK MINDLINK-287')
# Tell the api that we wish to receive eye tracking data stream
# with self._handle_et_data as the handler
self._api.register_stream_handler(adhawkapi.PacketType.EYETRACKING_STREAM, self._handle_et_data)
# Tell the api that we wish to tap into the EVENTS stream
# with self._handle_events as the handler
self._api.register_stream_handler(adhawkapi.PacketType.EVENTS, self._handle_events)
# Start the api and set its connection callback to self._handle_tracker_connect/disconnect.
# When the api detects a connection to a MindLink, this function will be run.
self._api.start(tracker_connect_cb=self._handle_tracker_connect,
tracker_disconnect_cb=self._handle_tracker_disconnect)
def shutdown(self):
'''Shutdown the api and terminate the bluetooth connection'''
self._api.shutdown()
@staticmethod
def _handle_et_data(et_data: adhawkapi.EyeTrackingStreamData):
''' Handles the latest et data '''
if et_data.gaze is not None:
xvec, yvec, zvec, vergence = et_data.gaze
#print(f'Gaze={xvec:.2f},y={yvec:.2f},z={zvec:.2f},vergence={vergence:.2f}')
global gaze
gaze = (xvec, yvec, zvec)
if et_data.eye_center is not None:
global left_wink_counter, right_wink_counter, left_wink, right_wink
rxvec, ryvec, rzvec, lxvec, lyvec, lzvec = et_data.eye_center
left_nan = np.isnan(lxvec)
right_nan = np.isnan(rxvec)
if left_nan and right_nan:
left_wink_counter = 0
right_wink_counter = 0
elif not left_nan and not right_nan:
if left_wink_counter > wink_thresh:
left_wink = True
left_wink_counter = 0
if right_wink_counter > wink_thresh:
right_wink = True
right_wink_counter = 0
elif left_nan:
left_wink_counter += 1
elif right_nan:
right_wink_counter += 1
@staticmethod
def _handle_events(event_type, timestamp, *args):
pass
# t = 0
# while t < len(blinked_list):
# if time.time() - blinked_list[t] > blink_expire:
# blinked_list.pop(t)
# else:
# t += 1
#
# if event_type == adhawkapi.Events.BLINK:
# duration = args[0]
# blinked_list.append(time.time())
# print(blinked_list)
# if len(blinked_list) == 3: #triple blink
# global blink_trigger_time
# blink_trigger_time = time.time()
# blinked_list.clear()
# print('Got triple blink')
def _handle_tracker_connect(self):
print("Tracker connected")
self._api.set_et_stream_rate(30, callback=lambda *args: None) #MUST BE 30 et_stream_rate!!!
self._api.set_et_stream_control([
adhawkapi.EyeTrackingStreamTypes.GAZE,
adhawkapi.EyeTrackingStreamTypes.EYE_CENTER,
], True, callback=lambda *args: None)
#self._api.set_event_control(adhawkapi.EventControlBit.BLINK, 1, callback=lambda *args: None)
#self._api.set_event_control(adhawkapi.EventControlBit.EYE_CLOSE_OPEN, 1, callback=lambda *args: None)
def _handle_tracker_disconnect(self):
print("Tracker disconnected")
def main():
''' App entrypoint '''
cam_mat = np.array([
[562.85992723, 0, 328.00336446],
[0, 562.82866525, 216.53447402],
[0, 0, 1]
])
cam_distort = np.array([
0.10711032, -0.30212805, -0.00045349, 0.00456896, 0.24487412
])
frontend = FrontendData()
cap = cv2.VideoCapture(1)
global left_wink, right_wink
try:
while True:
if left_wink:
print("left wink")
left_wink = False
if right_wink:
print("right_wink")
right_wink = False
ret, frame = cap.read()
if not ret:
break
if not np.isnan(gaze[0]):
global gaze_coords
gaze_coords = np.array([gaze[0], -gaze[1], -gaze[2]])
img_pts, jac = cv2.projectPoints(gaze_coords, np.eye(3), np.array([0.0, 0.0, 0.0]), cam_mat, cam_distort)
frame = cv2.circle(frame, img_pts[0][0].astype(int), 5, (0, 0, 255), thickness=-1)
cv2.imshow('frame', frame)
if cv2.waitKey(1) == ord('q'):
break
finally:
frontend.shutdown()
if __name__ == '__main__':
main()