Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Worker metrics. #5234

Merged
merged 1 commit into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/scripts/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ if [ "$TEST" = "azure" ]; then
command: "azurite-blob --blobHost 0.0.0.0 --cert /etc/pulp/azcert.pem --key /etc/pulp/azkey.pem"' vars/main.yaml
sed -i -e '$a azure_test: true\
pulp_scenario_settings: {"domain_enabled": true}\
pulp_scenario_env: {"otel_bsp_max_export_batch_size": 1, "otel_bsp_max_queue_size": 1, "otel_exporter_otlp_endpoint": "http://localhost:4318", "otel_exporter_otlp_protocol": "http/protobuf", "pulp_otel_enabled": "true"}\
pulp_scenario_env: {"otel_bsp_max_export_batch_size": 1, "otel_bsp_max_queue_size": 1, "otel_exporter_otlp_endpoint": "http://localhost:4318", "otel_exporter_otlp_protocol": "http/protobuf", "otel_metric_export_interval": 800, "pulp_otel_enabled": "true"}\
' vars/main.yaml
fi

Expand Down
2 changes: 2 additions & 0 deletions CHANGES/3821.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Added two new metrics related to Tasks: `pulp_tasks_unblocked_waiting_queue` has the number of unblocked waiting tasks that have been waiting longer than five(5) seconds,
while `pulp_tasks_longest_unblocked_waiting_time` record the time in seconds of the longest waiting time for a task in the queue.
1 change: 1 addition & 0 deletions pulpcore/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
TASK_DISPATCH_LOCK = 21
TASK_SCHEDULING_LOCK = 42
TASK_UNBLOCKING_LOCK = 84
TASK_METRICS_HEARTBEAT_LOCK = 74
decko marked this conversation as resolved.
Show resolved Hide resolved


#: All valid task states.
Expand Down
80 changes: 74 additions & 6 deletions pulpcore/tasking/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,23 @@
import signal
import socket
import contextlib
from datetime import timedelta
from datetime import datetime, timedelta
decko marked this conversation as resolved.
Show resolved Hide resolved
from multiprocessing import Process
from tempfile import TemporaryDirectory
from packaging.version import parse as parse_version
from opentelemetry.metrics import get_meter

from django.conf import settings
from django.db import connection
from django.db.models import Case, Count, F, Max, Value, When
from django.utils import timezone

from pulpcore.constants import (
TASK_STATES,
TASK_INCOMPLETE_STATES,
TASK_SCHEDULING_LOCK,
TASK_UNBLOCKING_LOCK,
TASK_METRICS_HEARTBEAT_LOCK,
)
from pulpcore.exceptions import AdvisoryLockError
from pulpcore.app.apps import pulp_plugin_configs
Expand All @@ -38,12 +41,18 @@
_logger = logging.getLogger(__name__)
random.seed()

# The following four constants are current "best guesses".
# Unless/until we can provide reasonable ways to decide to change their values,
# they will live as constants instead of "proper" settings.

# Number of heartbeats for a task to finish on graceful worker shutdown (approx)
TASK_GRACE_INTERVAL = 3
# Number of heartbeats between attempts to kill the subprocess (approx)
TASK_KILL_INTERVAL = 1
# Number of heartbeats between cleaning up worker processes (approx)
WORKER_CLEANUP_INTERVAL = 100
# Threshold time in seconds of an unblocked task before we consider a queue stalled
THRESHOLD_UNBLOCKED_WAITING_TIME = 5


class PulpcoreWorker:
Expand All @@ -55,7 +64,8 @@ def __init__(self):

self.task = None
self.name = f"{os.getpid()}@{socket.getfqdn()}"
self.heartbeat_period = settings.WORKER_TTL / 3
self.heartbeat_period = timedelta(seconds=settings.WORKER_TTL / 3)
self.last_metric_heartbeat = timezone.now()
decko marked this conversation as resolved.
Show resolved Hide resolved
self.versions = {app.label: app.version for app in pulp_plugin_configs()}
self.cursor = connection.cursor()
self.worker = self.handle_worker_heartbeat()
Expand All @@ -64,6 +74,19 @@ def __init__(self):
WORKER_CLEANUP_INTERVAL / 10, WORKER_CLEANUP_INTERVAL
)

meter = get_meter(__name__)
self.tasks_unblocked_queue_meter = meter.create_gauge(
name="tasks_unblocked_queue",
description="Number of unblocked tasks waiting in the queue.",
unit="tasks",
)

self.tasks_longest_unblocked_time_meter = meter.create_gauge(
name="tasks_longest_unblocked_time",
description="The age of the longest waiting task.",
unit="seconds",
)

# Add a file descriptor to trigger select on signals
self.sentinel, sentinel_w = os.pipe()
os.set_blocking(self.sentinel, False)
Expand All @@ -90,6 +113,8 @@ def _signal_handler(self, thesignal, frame):
def _pg_notify_handler(self, notification):
if notification.channel == "pulp_worker_wakeup":
self.wakeup = True
elif notification.channel == "pulp_worker_metrics_heartbeat":
self.last_metric_heartbeat = datetime.fromisoformat(notification.payload)
elif self.task and notification.channel == "pulp_worker_cancel":
if notification.payload == str(self.task.pk):
self.cancel_task = True
Expand Down Expand Up @@ -140,7 +165,7 @@ def worker_cleanup(self):
qs.delete()

def beat(self):
if self.worker.last_heartbeat < timezone.now() - timedelta(seconds=self.heartbeat_period):
if self.worker.last_heartbeat < timezone.now() - self.heartbeat_period:
self.worker = self.handle_worker_heartbeat()
if self.task_grace_timeout > 0:
self.task_grace_timeout -= 1
Expand All @@ -150,6 +175,7 @@ def beat(self):
self.worker_cleanup()
with contextlib.suppress(AdvisoryLockError), PGAdvisoryLock(TASK_SCHEDULING_LOCK):
dispatch_scheduled_tasks()
self.record_unblocked_waiting_tasks_metric()

def notify_workers(self):
self.cursor.execute("NOTIFY pulp_worker_wakeup")
Expand Down Expand Up @@ -223,7 +249,7 @@ def identify_unblocked_tasks(self):
_logger.debug("Marking canceling task %s unblocked.", task.pk)
task.unblock()
changed = True
# Don't consider this tasks reosurces as held.
# Don't consider this task's resources as held.
continue

elif (
Expand All @@ -244,6 +270,7 @@ def identify_unblocked_tasks(self):
# Record the resources of the pending task
taken_exclusive_resources.update(exclusive_resources)
taken_shared_resources.update(shared_resources)

return changed

def iter_tasks(self):
Expand Down Expand Up @@ -293,7 +320,7 @@ def sleep(self):
_logger.debug(_("Worker %s entering sleep state."), self.name)
while not self.shutdown_requested and not self.wakeup:
r, w, x = select.select(
[self.sentinel, connection.connection], [], [], self.heartbeat_period
[self.sentinel, connection.connection], [], [], self.heartbeat_period.seconds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, that explains why we didn't have it as timedelta before.

)
self.beat()
if connection.connection in r:
Expand Down Expand Up @@ -329,7 +356,7 @@ def supervise_task(self, task):
[self.sentinel, connection.connection, task_process.sentinel],
[],
[],
self.heartbeat_period,
self.heartbeat_period.seconds,
)
self.beat()
if connection.connection in r:
Expand Down Expand Up @@ -392,6 +419,45 @@ def handle_available_tasks(self):
keep_looping = True
self.supervise_task(task)

def record_unblocked_waiting_tasks_metric(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any safeguard that prohibits the execution of this method if the telemetry is disabled?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. It simply will not be emitted if the agent is not used.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question is valid. In the current state, we still run the query even if we send nothing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we have an otel_enabled setting we can look at?

Copy link
Member

@lubosmj lubosmj May 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet, we have just an environment variable that can be used for lookup (

* ``PULP_OTEL_ENABLED`` set to ``True``.
).

I can imagine doing something similar to DomainMetricsEmitter.build() here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it in the tests. Gonna work on the same thing here.

if os.getenv("PULP_OTEL_ENABLED").lower() != "true":
return

now = timezone.now()
if now > self.last_metric_heartbeat + self.heartbeat_period:
with contextlib.suppress(AdvisoryLockError), PGAdvisoryLock(
TASK_METRICS_HEARTBEAT_LOCK
):
# For performance reasons we aggregate these statistics on a single database call.
unblocked_tasks_stats = (
Task.objects.filter(unblocked_at__isnull=False, started_at__isnull=True)
.annotate(unblocked_for=Value(timezone.now()) - F("unblocked_at"))
.aggregate(
longest_unblocked_waiting_time=Max(
"unblocked_for", default=timezone.timedelta(0)
),
unblocked_tasks_count_gte_threshold=Count(
Case(
When(
unblocked_for__gte=Value(
timezone.timedelta(seconds=THRESHOLD_UNBLOCKED_WAITING_TIME)
),
then=1,
)
)
),
)
)

self.tasks_unblocked_queue_meter.set(
unblocked_tasks_stats["unblocked_tasks_count_gte_threshold"]
)
self.tasks_longest_unblocked_time_meter.set(
unblocked_tasks_stats["longest_unblocked_waiting_time"].seconds
)

self.cursor.execute(f"NOTIFY pulp_worker_metrics_heartbeat, '{str(now)}'")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think we do not need to re-notify the worker, do we?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We notify all workers, that the work was done. So they hold off of doing it for another cooldown time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But since you asked, a comment to that end may help.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks man. In the end I removed since it doesn't make sense to be used anymore.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry @lubosmj. @mdellweg reminded me why we're using this. #5234 (review)


def run(self, burst=False):
with WorkerDirectory(self.name):
signal.signal(signal.SIGINT, self._signal_handler)
Expand All @@ -400,6 +466,7 @@ def run(self, burst=False):
# Subscribe to pgsql channels
connection.connection.add_notify_handler(self._pg_notify_handler)
self.cursor.execute("LISTEN pulp_worker_cancel")
self.cursor.execute("LISTEN pulp_worker_metrics_heartbeat")
if burst:
self.handle_available_tasks()
else:
Expand All @@ -412,5 +479,6 @@ def run(self, burst=False):
break
self.sleep()
self.cursor.execute("UNLISTEN pulp_worker_wakeup")
self.cursor.execute("UNLISTEN pulp_worker_metrics_heartbeat")
self.cursor.execute("UNLISTEN pulp_worker_cancel")
self.shutdown()
31 changes: 31 additions & 0 deletions pulpcore/tests/functional/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,37 @@ async def _send_request():
return _received_otel_span


@pytest.fixture(scope="session")
def received_otel_metrics():
"""A fixture for checking the presence of specific metrics on the otel collector server.

Ensure the collector server is up and running before executing tests with this fixture. To do
so, please, run the server as follows: python3 pulpcore/tests/functional/assets/otel_server.py
"""

def _received_otel_metric(data, retries=3):
decko marked this conversation as resolved.
Show resolved Hide resolved
if os.environ.get("PULP_OTEL_ENABLED") != "true":
# pretend everything is working as expected if tests are run from
# a non-configured runner
return True

async def _send_request():
async with aiohttp.ClientSession(raise_for_status=False) as session:
otel_server_url = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT")
async with session.post(f"{otel_server_url}/metrics_test", json=data) as response:
return response.status

while retries:
status = asyncio.run(_send_request())
if status == 200:
return True
sleep(2)
retries -= 1
return False

return _received_otel_metric


@pytest.fixture
def test_path():
return os.getenv("PYTEST_CURRENT_TEST").split()[0]
Expand Down
46 changes: 46 additions & 0 deletions pulpcore/tests/functional/api/test_tasking.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""Tests related to the tasking system."""

import os
import json
import pytest
import subprocess
import time

from aiohttp import BasicAuth
from urllib.parse import urljoin
from uuid import uuid4
Expand Down Expand Up @@ -349,3 +351,47 @@ def test_task_version_prevent_pickup(dispatch_task, pulpcore_bindings):
task = pulpcore_bindings.TasksApi.read(task_href)
assert task.state == "waiting"
pulpcore_bindings.TasksApi.tasks_cancel(task_href, {"state": "canceled"})


def test_emmiting_unblocked_task_telemetry(
dispatch_task, pulpcore_bindings, pulp_settings, received_otel_metrics
):
if os.getenv("PULP_OTEL_ENABLED").lower() != "true":
pytest.skip("Need PULP_OTEL_ENABLED to run this test.")

# Checking online workers ready to get a task
workers_online = pulpcore_bindings.WorkersApi.list(online="true").count
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


# We need to generate long running tasks to block the workers from executing other tasks
resident_task_hrefs = [
dispatch_task("pulpcore.app.tasks.test.sleep", args=(30,))
for worker in range(workers_online)
]

# Then we dispatch a quick unblockable task just to keep it waiting in the queue
task_href = dispatch_task("pulpcore.app.tasks.test.sleep", args=(0,))

task = pulpcore_bindings.TasksApi.read(task_href)
assert task.state == "waiting"

# And trigger the metrics
assert received_otel_metrics(
{
"name": "tasks_unblocked_queue",
"description": "Number of unblocked tasks waiting in the queue.",
"unit": "tasks",
}
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the value of the metric?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we talked before, this involves some changes to the test machinery, and yet we probably won't be able to catch the right value for the metric during the tests.
Talking with @lubosmj and @dkliban we understood that changing to use the OpenTelemetry Collector and test the metrics as it's exported to be consumed by Prometheus could generate better results. Yet, it's a considerable effort that is out of the scope of this task.


assert received_otel_metrics(
{
"name": "tasks_longest_unblocked_time",
"description": "The age of the longest waiting task.",
"unit": "seconds",
}
)
decko marked this conversation as resolved.
Show resolved Hide resolved

[
pulpcore_bindings.TasksApi.tasks_cancel(task_href, {"state": "canceled"})
for task_href in resident_task_hrefs
]