forked from iandobbie/StatusLED
-
Notifications
You must be signed in to change notification settings - Fork 0
/
TestProcesses.py
49 lines (38 loc) · 1.09 KB
/
TestProcesses.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from multiprocessing import Process, Queue
from time import sleep
class LED_processor(Process):
def __init__(self, effectQueue, outQueue):
Process.__init__(self)
self.effectQueue = effectQueue
self.outQueue = outQueue
def effect(self, x):
calc = x * x
self.outQueue.put(calc)
def run(self):
while True:
f, x = self.effectQueue.get()
if f != 'kill':
getattr(self, f)(x)
else:
return
class Machine:
def __init__(self):
print('Hi')
self.effectQueue = Queue()
self.outQueue = Queue()
self.LED_processor = LED_processor(self.effectQueue, self.outQueue)
print('LED_processor created')
self.LED_processor.start()
def run_effect(self, x):
self.effectQueue.put(['effect', x])
out = self.outQueue.get()
print(out)
def kill(self):
self.effectQueue.put(['kill',0])
if __name__ == '__main__':
m = Machine()
sleep(2)
m.run_effect(3)
sleep(3)
m.kill()
m.LED_processor.join()