Skip to content

Commit

Permalink
Merge pull request #53 from mikha1lov/main
Browse files Browse the repository at this point in the history
Dead processes metrics
  • Loading branch information
bugrimov committed Apr 12, 2024
2 parents 852b5ca + d48c460 commit 877152b
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 3 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# 1.11.6
- Add metrics with count if running and dead processes

# 1.11.5
- Changed default export period to 0.5 to fix metrics gaps

Expand Down
10 changes: 9 additions & 1 deletion aqueduct/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from .metrics.collect import Collector, TasksStats
from .metrics.export import Exporter
from .metrics.manager import get_metrics_manager
from .metrics.processes import ProcessesStats
from .metrics.queue import TaskMetricsQueue
from .metrics.timer import timeit
from .multiprocessing import (
Expand Down Expand Up @@ -411,13 +412,20 @@ async def _check_is_alive(self, sleep_sec: float = 1.):
If at least one process is not alive, it stops Flow.
"""
while self.state != FlowState.STOPPED:
processes_stats = ProcessesStats()
for handler, context in self._contexts.items():
for proc in context.processes:
if not proc.is_alive():
if self.is_running:
handler_name = handler.__class__.__name__
log.error('The process %s for %s handler is dead',
proc.pid, handler.__class__.__name__)
proc.pid, handler_name)
processes_stats.add_dead_process()
self._metrics_manager.collector.add_processes_stats(processes_stats)
await self.stop(graceful=False)
else:
processes_stats.add_running_process()
self._metrics_manager.collector.add_processes_stats(processes_stats)
await asyncio.sleep(sleep_sec)

@staticmethod
Expand Down
8 changes: 7 additions & 1 deletion aqueduct/metrics/collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from . import IMetricsItems
from .base import MetricsItems, MetricsTypes
from .processes import ProcessesStats
from .task import TasksMetricsStorage


Expand All @@ -30,13 +31,15 @@ def __init__(self):
self.queue_sizes = MetricsItems()
self.tasks_stats = TasksStats()
self.memory_usage = MetricsItems()
self.processes_stats = ProcessesStats()

def extend(self, storage: TasksMetricsStorage):
super().extend(storage)
if isinstance(storage, AqueductMetricsStorage):
self.queue_sizes.extend(storage.queue_sizes)
self.tasks_stats.extend(storage.tasks_stats)

self.processes_stats.extend(storage.processes_stats)

def extend_memory_usage(self, metrics: MetricsItems):
self.memory_usage.extend(metrics)

Expand Down Expand Up @@ -70,6 +73,9 @@ def add_tasks_stats(self, stats: TasksStats):
def add_memory_usage(self, metrics: MetricsItems):
self._metrics.extend_memory_usage(metrics)

def add_processes_stats(self, stats: ProcessesStats):
self._metrics.processes_stats.extend(stats)

def extract_metrics(self) -> AqueductMetricsStorage:
metrics = self._metrics
self._metrics = AqueductMetricsStorage()
Expand Down
6 changes: 6 additions & 0 deletions aqueduct/metrics/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from . import AQUEDUCT
from .collect import AqueductMetricsStorage


TRANSFER_TIME_PREFIX = 'transfer_time'
TASK_SIZE_PREFIX = 'task_size'
HANDLE_TIME_PREFIX = 'handle_time'
Expand All @@ -17,6 +18,7 @@
QSIZE_PREFIX = 'qsize'
TASKS_PREFIX = 'tasks'
MEMORY_USAGE_PREFIX = 'memory_usage'
PROCESSES_PREFIX = 'processes'


class StatsDBuffer(Protocol):
Expand Down Expand Up @@ -75,6 +77,10 @@ def export(self, metrics: AqueductMetricsStorage):
for name, memory_usage in metrics.memory_usage.items:
self.target.timing(f'{self.prefix}.{MEMORY_USAGE_PREFIX}.{name}', memory_usage)

for name, cnt in metrics.processes_stats.items:
if cnt > 0:
self.target.count(f'{self.prefix}.{PROCESSES_PREFIX}.{name}', cnt)


class DummyExporter(Exporter):
"""Exports collected aqueduct metrics as is."""
Expand Down
2 changes: 2 additions & 0 deletions aqueduct/metrics/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ def stop(self):
# Остановка фоновых корутин
for task in self._tasks:
task.cancel()
# Send last metrics
self.exporter.export(self.collector.extract_metrics())

async def _export_metrics(self):
while True:
Expand Down
24 changes: 24 additions & 0 deletions aqueduct/metrics/processes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from dataclasses import asdict, dataclass
from typing import Iterable, Tuple

from . import IMetricsItems


@dataclass
class ProcessesStats(IMetricsItems):
dead: int = 0
running: int = 0

@property
def items(self) -> Iterable[Tuple[str, int]]:
return asdict(self).items()

def extend(self, stats: 'ProcessesStats'):
self.dead += stats.dead
self.running += stats.running

def add_dead_process(self):
self.dead += 1

def add_running_process(self):
self.running += 1
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
setup(
name='aqueduct',
packages=find_packages(),
version='1.11.5',
version='1.11.6',
license='MIT',
license_files='LICENSE.txt',
author='Data Science SWAT',
Expand Down

0 comments on commit 877152b

Please sign in to comment.