Skip to content

Commit

Permalink
Merge pull request #49 from VadimChernyshev/metrics_memory_usage
Browse files Browse the repository at this point in the history
memory usage metrics
  • Loading branch information
bugrimov committed Feb 27, 2024
2 parents 7d30f76 + dfa82b6 commit c31b8d1
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 2 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# 1.11.3
- Add memory usage metrics

# 1.11.2
- restore signals for successful termination of processes during flow.stop() if they were overridden

Expand Down
30 changes: 30 additions & 0 deletions aqueduct/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import multiprocessing as mp
import operator
import os
import psutil
import queue
import signal
import sys
Expand Down Expand Up @@ -35,6 +36,7 @@
from .queues import FlowStepQueue, select_next_queue
from .task import BaseTask, DEFAULT_PRIORITY, StopTask
from .worker import Worker
from .metrics.base import MetricsItems

# just for using common ResourceTracker in main and child processes and avoiding
# unnecessary shared memory resource_tracker "No such file or directory" warnings
Expand Down Expand Up @@ -234,6 +236,33 @@ def _calc_queue_size(self, step: FlowStep):

# queue should be able to store at least 20 task, that's seems reasonable
return max(step.batch_size*3, 20)

async def _check_memory_usage(self, sleep_sec: float = 1.):
handler_processes_dict = {}
for step_number, handler in enumerate(self._contexts):
flow_step_name = handler.get_step_name(step_number)
pids = self._contexts[handler].pids()
processes = []
for pid in pids:
process = psutil.Process(pid)
processes.append(process)
handler_processes_dict[flow_step_name] = processes

while self.state != FlowState.STOPPED:
metrics = MetricsItems()
all_memory_usage = 0
for flow_step_name, processes in handler_processes_dict.items():
nprocs_memory_sum = 0
for process in processes:
memory = process.memory_info().rss
nprocs_memory_sum += memory
metrics.add(flow_step_name, memory)
all_memory_usage += nprocs_memory_sum
if len(processes) != 1:
metrics.add(f'{flow_step_name}_nprocs_sum', nprocs_memory_sum)
metrics.add('all_memory_usage', all_memory_usage)
self._metrics_manager.collector.add_memory_usage(metrics)
await asyncio.sleep(sleep_sec)

def _run_steps(self, timeout: Optional[int]):
if len(self._steps) == 0:
Expand Down Expand Up @@ -302,6 +331,7 @@ def _run_tasks(self):
self._tasks.append(asyncio.ensure_future(self._check_is_alive()))

self._metrics_manager.start(queues_info=self._get_queues_info())
self._tasks.append(asyncio.ensure_future(self._check_memory_usage()))

def _get_queues_info(self) -> Dict[mp.Queue, str]:
"""Returns queues between Step handlers and its names.
Expand Down
7 changes: 7 additions & 0 deletions aqueduct/metrics/collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@ def __init__(self):
super().__init__()
self.queue_sizes = MetricsItems()
self.tasks_stats = TasksStats()
self.memory_usage = MetricsItems()

def extend(self, storage: TasksMetricsStorage):
super().extend(storage)
if isinstance(storage, AqueductMetricsStorage):
self.queue_sizes.extend(storage.queue_sizes)
self.tasks_stats.extend(storage.tasks_stats)

def extend_memory_usage(self, metrics: MetricsItems):
self.memory_usage.extend(metrics)


class Collector:
Expand Down Expand Up @@ -63,6 +67,9 @@ def add_task_metrics(self, metrics: TasksMetricsStorage):
def add_tasks_stats(self, stats: TasksStats):
self._metrics.tasks_stats.extend(stats)

def add_memory_usage(self, metrics: MetricsItems):
self._metrics.extend_memory_usage(metrics)

def extract_metrics(self) -> AqueductMetricsStorage:
metrics = self._metrics
self._metrics = AqueductMetricsStorage()
Expand Down
4 changes: 4 additions & 0 deletions aqueduct/metrics/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
BATCH_SIZE_PREFIX = 'batch_size'
QSIZE_PREFIX = 'qsize'
TASKS_PREFIX = 'tasks'
MEMORY_USAGE_PREFIX = 'memory_usage'


class StatsDBuffer(Protocol):
Expand Down Expand Up @@ -71,6 +72,9 @@ def export(self, metrics: AqueductMetricsStorage):
if cnt > 0:
self.target.count(f'{self.prefix}.{TASKS_PREFIX}.{name}', cnt)

for name, memory_usage in metrics.memory_usage.items:
self.target.timing(f'{self.prefix}.{MEMORY_USAGE_PREFIX}.{name}', memory_usage)


class DummyExporter(Exporter):
"""Exports collected aqueduct metrics as is."""
Expand Down
1 change: 1 addition & 0 deletions docs/metrics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Here is all metrics collected by aqueduct:
* ``batch_size`` - Batch size. Useful when using dynamic batching (timing metric)
* ``qsize`` - Queue size (timing metric)
* ``tasks`` - Total task count (count metric)
* ``memory_usage`` - Memory usage at each flow step (timing metric)


Aqueduct supports StatsD metrics format via ``ToStatsDMetricsExporter`` class
Expand Down
3 changes: 2 additions & 1 deletion requirements/dev.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
numpy
aiohttp
cffi==1.15.0
cffi==1.15.0
psutil==5.9.4
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

required = [
'cffi==1.15.0',
'psutil==5.9.4',
]

extras = {
Expand All @@ -21,7 +22,7 @@
setup(
name='aqueduct',
packages=find_packages(),
version='1.11.2',
version='1.11.3',
license='MIT',
license_files='LICENSE.txt',
author='Data Science SWAT',
Expand Down

0 comments on commit c31b8d1

Please sign in to comment.