Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dy] Record memory and cpu usage for pipeline runs #4761

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

dy46
Copy link
Member

@dy46 dy46 commented Mar 14, 2024

Description

This PR adds memory and cpu tracking for pipeline runs in the backend. There will be a follow up PR to surface the metrics in the frontend.

Calculating memory is relatively straightforward, we can get the memory usage for all processes for a pipeline run. To get the memory usage per pipeline, we can use psutil.Process.memory_percent(). CPU can be done in a similar way, but getting the cpu usage percentage per process is more complicated. The comments in the code explains more in depth how it's implemented, but basically we need to track the cpu usage at various points in time and compare the numbers to get the cpu usage percent.

Right now, we get memory and cpu usage every time a heartbeat is run for the pipeline run, so roughly every 10 seconds. The usage is stored in the PipelineRun.metrics field. The pipeline run cpu and memory usage will also be included in the tags for the heartbeat log.

Example:
Screenshot 2024-03-14 at 3 29 37 PM

Screenshot 2024-03-14 at 3 41 28 PM

How Has This Been Tested?

  • Tested locally with various pipelines

Checklist

  • The PR is tagged with proper labels (bug, enhancement, feature, documentation)
  • I have performed a self-review of my own code
  • I have added unit tests that prove my fix is effective or that my feature works
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • If new documentation has been added, relative paths have been added to the appropriate section of docs/mint.json

cc: @wangxiaoyou1993

self.logger.info(
f'Pipeline {self.pipeline.uuid} for run {self.pipeline_run.id} '
f'in schedule {self.pipeline_run.pipeline_schedule_id} is alive.',
**tags,
)

if memory_usage and memory_usage >= MEMORY_USAGE_MAXIMUM:
self.memory_usage_failure(tags=tags)
def __measure_and_record_usage(self) -> Tuple[float, float]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will work with local_python executor but not other executors.

self.pipeline.type in [PipelineType.INTEGRATION, PipelineType.STREAMING]
or self.pipeline.run_pipeline_in_one_process
):
if job_manager.has_pipeline_run_job(self.pipeline_run.id):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will not work when you have multiple schedulers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants