Skip to content

Commit

Permalink
Added async support
Browse files Browse the repository at this point in the history
Incorporated changes from SijmenHuizenga's pull request (dbader#438) to add async support.
  • Loading branch information
Pwnion committed Jan 16, 2024
1 parent 1173f28 commit 26d3105
Showing 1 changed file with 54 additions and 2 deletions.
56 changes: 54 additions & 2 deletions schedule/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
[2] https://github.com/Rykian/clockwork
[3] https://adam.herokuapp.com/past/2010/6/30/replace_cron_with_clockwork/
"""
import asyncio
import inspect
from collections.abc import Hashable
import datetime
import functools
Expand Down Expand Up @@ -87,7 +89,7 @@ def __init__(self) -> None:

def run_pending(self) -> None:
"""
Run all jobs that are scheduled to run.
Run all sync jobs that are scheduled to run.
Please note that it is *intended behavior that run_pending()
does not run missed jobs*. For example, if you've registered a job
Expand All @@ -99,9 +101,22 @@ def run_pending(self) -> None:
for job in sorted(runnable_jobs):
self._run_job(job)

async def run_pending_async(self):
"""
Run all sync and async jobs that are scheduled to run.
Please note that it is *intended behavior that run_pending_async()
does not run missed jobs*. For example, if you've registered a job
that should run every minute and you only call run_pending()
in one hour increments then your job won't be run 60 times in
between but only once.
"""
runnable_jobs = (job for job in self.jobs if job.should_run)
await asyncio.gather(*[self._run_job_async(job) for job in runnable_jobs])

def run_all(self, delay_seconds: int = 0) -> None:
"""
Run all jobs regardless if they are scheduled to run or not.
Run all sync jobs regardless if they are scheduled to run or not.
A delay of `delay` seconds is added between each job. This helps
distribute system load generated by the jobs more evenly
Expand All @@ -118,6 +133,25 @@ def run_all(self, delay_seconds: int = 0) -> None:
self._run_job(job)
time.sleep(delay_seconds)

async def run_all_async(self, delay_seconds=0):
"""
Run all sync and async jobs regardless if they are scheduled to run or not.
A delay of `delay` seconds is added between each job. This helps
distribute system load generated by the jobs more evenly
over time.
:param delay_seconds: A delay added between every executed job
"""
logger.debug(
"Running *all* %i jobs with %is delay in between",
len(self.jobs),
delay_seconds,
)
for job in self.jobs[:]:
await self._run_job_async(job)
await asyncio.sleep(delay_seconds)

def get_jobs(self, tag: Optional[Hashable] = None) -> List["Job"]:
"""
Gets scheduled jobs marked with the given tag, or all jobs
Expand Down Expand Up @@ -173,6 +207,17 @@ def _run_job(self, job: "Job") -> None:
if isinstance(ret, CancelJob) or ret is CancelJob:
self.cancel_job(job)

async def _run_job_async(self, job: "Job") -> None:
ret = job.run()
if inspect.isawaitable(ret):
ret = await ret

self._process_job_return_value(job, ret)

def _process_job_return_value(self, job: "Job", ret: any):
if isinstance(ret, CancelJob) or ret is CancelJob:
self.cancel_job(job)

def get_next_run(
self, tag: Optional[Hashable] = None
) -> Optional[datetime.datetime]:
Expand Down Expand Up @@ -855,6 +900,13 @@ def run_pending() -> None:
default_scheduler.run_pending()


async def run_pending_async() -> None:
"""Calls :meth:`run_pending_async <Scheduler.run_pending_async>` on the
:data:`default scheduler instance <default_scheduler>`.
"""
await default_scheduler.run_pending_async()


def run_all(delay_seconds: int = 0) -> None:
"""Calls :meth:`run_all <Scheduler.run_all>` on the
:data:`default scheduler instance <default_scheduler>`.
Expand Down

0 comments on commit 26d3105

Please sign in to comment.