Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: YAN-913 Async Drop Example #169

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open

Conversation

calgray
Copy link
Collaborator

@calgray calgray commented Jun 2, 2022

No description provided.

@coveralls
Copy link

coveralls commented Jun 2, 2022

Coverage Status

Coverage decreased (-0.06%) to 81.154% when pulling 78bd896 on YAN-913-async-exec into 7867702 on master.

Comment on lines +819 to +837
class LastCharWriterApp(AppDROP):
# Note: cannot share string member with test thread
_lastByte = multiprocessing.Value(ctypes.c_char, b" ")
_stream = QueueAsyncStream()

def run(self):
asyncio.run(self.arun())

async def arun(self):
outputDrop = self.outputs[0]
async for data in self._stream:
self._lastByte.value = data[-1:]
outputDrop.write(self._lastByte.value)

def dataWritten(self, uid, data):
self._stream.append(data)

def dropCompleted(self, uid, drop_state):
self._stream.end()
Copy link
Collaborator Author

@calgray calgray Jun 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here's another approach to stream objects using the existing streaming interface. This works with python threading and multiprocessing just has a few catches regarding memory sharing with test code.

The reason for using a stream object is so that the dataWritten callback is running on a different app's subprocess and should return quickly to not block it. The run method here is invoked using with its own subprocess and is more suited to perform any heavy data processing.

The advantage of using an async stream in particular here this can scale to 2 or more streams using a single subprocess using something like asyncio.gather which switches between streams depending on which one has data ready.

Comment on lines +796 to +817
class QueueAsyncStream(AsyncIterator):
_q = multiprocessing.Queue()
_end = multiprocessing.Event()
async def __anext__(self):
while True:
if self._q.qsize() > 0:
return self._q.get()
elif self._end.is_set():
raise StopAsyncIteration
else:
await asyncio.sleep(0)

def append(self, data):
while not self._q.full():
try:
self._q.put(data, block=True, timeout=1)
break
except queue.Full:
pass

def end(self):
self._end.set()
Copy link
Collaborator Author

@calgray calgray Jun 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously I used a deque here however multiprocess safe issues lead me to trial a queue. Here I've also set the stream base instead to AsyncIterator instead of AsyncIterable where the primary difference is the iterator can only be iterated once. This greatly simplifies the lifecycle of around when to pop data and embraces shared ownership of stream data.

This implementation assumes putting data in the stream in one process and getting it out in another has minimal overhead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants