Skip to content

Commit

Permalink
Merge pull request #52 from sgjurano/lower-cpu-usage-in-main-process
Browse files Browse the repository at this point in the history
Lower cpu usage in main process
  • Loading branch information
bugrimov committed Mar 19, 2024
2 parents b1802be + 414d810 commit 6a5e3d5
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 37 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# 1.11.4
- Lower cpu usage in main process

# 1.11.3
- Add memory usage metrics

Expand Down
70 changes: 35 additions & 35 deletions aqueduct/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from time import monotonic
from typing import Dict, List, Literal, Optional, Union

from concurrent.futures import ThreadPoolExecutor, Future
from concurrent.futures import ThreadPoolExecutor
from multiprocessing.resource_tracker import _resource_tracker # noqa

from .exceptions import FlowError, MPStartMethodValueError, NotRunningError
Expand Down Expand Up @@ -144,7 +144,6 @@ async def process(self, task: BaseTask, timeout_sec: float = 5.) -> bool:
raise NotRunningError

with timeit() as timer:

future = asyncio.Future()
self._task_futures[task.task_id] = future

Expand Down Expand Up @@ -231,7 +230,7 @@ def _calc_queue_size(self, step: FlowStep):
""" If queue size not specified manually, get queue size based on batch size for handler.
We need at least batch_size places in queue and then some additional space
"""
if self._queue_size:
if self._queue_size is not None:
return self._queue_size

# queue should be able to store at least 20 task, that's seems reasonable
Expand Down Expand Up @@ -364,46 +363,47 @@ def _fetch_from_queue(out_queue: mp.Queue) -> Union[BaseTask, None]:
except queue.Empty:
return None

def _read_from_queue(self, loop: asyncio.AbstractEventLoop, q: mp.Queue) -> None:
while self.state != FlowState.STOPPED:
task = self._fetch_from_queue(q)

if task is None:
continue

fut = self._task_futures.get(task.task_id)
if fut and not fut.cancelled() and not fut.done():
task.metrics.stop_transfer_timer(MAIN_PROCESS, task.priority)
task_size = getattr(q, 'task_size', None)
if task_size:
task.metrics.save_task_size(task_size, MAIN_PROCESS, task.priority)

loop.call_soon_threadsafe(fut.set_result, task)

async def _fetch_processed(self):
""" Fetching messages from output queue.
"""Fetching messages from output queue.
To handle messages from another process and not block asyncio loop, we run queue.get()
in a separate thread
"""
loop = asyncio.get_event_loop()
running_futures: Dict[int, Optional[Future]] = {}
with ThreadPoolExecutor(max_workers=self._queue_priorities) as queue_fetch_executor:
while self.state != FlowState.STOPPED:
for priority in reversed(range(self._queue_priorities)):
if running_futures.get(priority) is None:
future = loop.run_in_executor(
queue_fetch_executor,
self._fetch_from_queue,
self._queues[priority][-1].queue,
)
running_futures[priority] = future
await asyncio.wait(running_futures.values(), return_when=asyncio.FIRST_COMPLETED)

tasks = []
for priority in reversed(range(self._queue_priorities)):
future = running_futures.get(priority)
if future is not None and future.done():
running_futures[priority] = None
task = future.result()
if task is not None:
tasks.append(task)

for task in tasks:
task.metrics.stop_transfer_timer(MAIN_PROCESS, task.priority)
task_size = getattr(self._queues[task.priority][-1].queue, 'task_size', None)
if task_size:
task.metrics.save_task_size(task_size, MAIN_PROCESS, task.priority)

future = self._task_futures.get(task.task_id)

if future and not future.cancelled() and not future.done():
future.set_result(task)
results_queues = [
queues_with_same_priority[-1].queue for queues_with_same_priority in self._queues
]
tasks = [
loop.run_in_executor(queue_fetch_executor, self._read_from_queue, loop, q)
for q in results_queues
]
done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
for t in done:
try:
t.result()
except Exception:
log.exception('error in queue-consumer')

for t in pending:
t.cancel()

async def _check_is_alive(self, sleep_sec: float = 1.):
"""Checks that all child processes are alive.
Expand Down
2 changes: 1 addition & 1 deletion aqueduct/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class Worker:

def __init__(
self,
queues: List[FlowStepQueue],
queues: List[List[FlowStepQueue]],
task_handler: BaseTaskHandler,
batch_size: int,
batch_timeout: float,
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
setup(
name='aqueduct',
packages=find_packages(),
version='1.11.3',
version='1.11.4',
license='MIT',
license_files='LICENSE.txt',
author='Data Science SWAT',
Expand Down

0 comments on commit 6a5e3d5

Please sign in to comment.