Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make library async #204

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
145 changes: 110 additions & 35 deletions schedule/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@
from collections.abc import Hashable
except ImportError:
from collections import Hashable
import asyncio
import datetime
import functools
import inspect
import logging
import random
import re
Expand Down Expand Up @@ -96,6 +98,19 @@ def run_pending(self):
for job in sorted(runnable_jobs):
self._run_job(job)

async def arun_pending(self):
"""
Run all jobs that are scheduled to run.

Please note that it is *intended behavior that run_pending()
does not run missed jobs*. For example, if you've registered a job
that should run every minute and you only call run_pending()
in one hour increments then your job won't be run 60 times in
between but only once.
"""
runnable_jobs = (job for job in self.jobs if job.should_run)
await asyncio.gather(*[self._arun_job(job) for job in runnable_jobs])

def run_all(self, delay_seconds=0):
"""
Run all jobs regardless if they are scheduled to run or not.
Expand All @@ -112,6 +127,22 @@ def run_all(self, delay_seconds=0):
self._run_job(job)
time.sleep(delay_seconds)

async def arun_all(self, delay_seconds=0):
"""
Run all jobs regardless if they are scheduled to run or not.

A delay of `delay` seconds is added between each job. This helps
distribute system load generated by the jobs more evenly
over time.

:param delay_seconds: A delay added between every executed job
"""
logger.info('Running *all* %i jobs with %is delay inbetween',
len(self.jobs), delay_seconds)
for job in self.jobs[:]:
await self._arun_job(job)
await asyncio.sleep(delay_seconds)

def clear(self, tag=None):
"""
Deletes scheduled jobs marked with the given tag, or all jobs
Expand Down Expand Up @@ -151,6 +182,11 @@ def _run_job(self, job):
if isinstance(ret, CancelJob) or ret is CancelJob:
self.cancel_job(job)

async def _arun_job(self, job):
ret = await job.arun()
if isinstance(ret, CancelJob) or ret is CancelJob:
self.cancel_job(job)

@property
def next_run(self):
"""
Expand Down Expand Up @@ -194,6 +230,7 @@ def __init__(self, interval, scheduler=None):
self.job_func = None # the job job_func to run
self.unit = None # time units, e.g. 'minutes', 'hours', ...
self.at_time = None # optional time at which this job runs
self.at_time_fn = None # optional time at which this job runs
self.last_run = None # datetime of the last run
self.next_run = None # datetime of the next run
self.period = None # timedelta between runs, only valid for
Expand Down Expand Up @@ -382,7 +419,8 @@ def at(self, time_str):
"""
Specify a particular time that the job should be run at.

:param time_str: A string in one of the following formats: `HH:MM:SS`,
:param time_str: A function returning a datetime.time object, or
a string in one of the following formats: `HH:MM:SS`,
`HH:MM`,`:MM`, `:SS`. The format must make sense given how often
the job is repeating; for example, a job that repeats every minute
should not be given a string in the form `HH:MM:SS`. The difference
Expand All @@ -393,41 +431,46 @@ def at(self, time_str):
if (self.unit not in ('days', 'hours', 'minutes')
and not self.start_day):
raise ScheduleValueError('Invalid unit')
if not isinstance(time_str, str):
raise TypeError('at() should be passed a string')
if self.unit == 'days' or self.start_day:
if not re.match(r'^([0-2]\d:)?[0-5]\d:[0-5]\d$', time_str):
raise ScheduleValueError('Invalid time format')
if self.unit == 'hours':
if not re.match(r'^([0-5]\d)?:[0-5]\d$', time_str):
raise ScheduleValueError(('Invalid time format for'
' an hourly job'))
if self.unit == 'minutes':
if not re.match(r'^:[0-5]\d$', time_str):
raise ScheduleValueError(('Invalid time format for'
' a minutely job'))
time_values = time_str.split(':')
if len(time_values) == 3:
hour, minute, second = time_values
elif len(time_values) == 2 and self.unit == 'minutes':
hour = 0
minute = 0
_, second = time_values
if callable(time_str):
self.at_time_fn = time_str
elif isinstance(time_str, datetime.time):
self.at_time = time_str
else:
hour, minute = time_values
second = 0
if self.unit == 'days' or self.start_day:
hour = int(hour)
if not (0 <= hour <= 23):
raise ScheduleValueError('Invalid number of hours')
elif self.unit == 'hours':
hour = 0
elif self.unit == 'minutes':
hour = 0
minute = 0
minute = int(minute)
second = int(second)
self.at_time = datetime.time(hour, minute, second)
if not isinstance(time_str, str):
raise TypeError('at() should be passed a string')
if self.unit == 'days' or self.start_day:
if not re.match(r'^([0-2]\d:)?[0-5]\d:[0-5]\d$', time_str):
raise ScheduleValueError('Invalid time format')
if self.unit == 'hours':
if not re.match(r'^([0-5]\d)?:[0-5]\d$', time_str):
raise ScheduleValueError(('Invalid time format for'
' an hourly job'))
if self.unit == 'minutes':
if not re.match(r'^:[0-5]\d$', time_str):
raise ScheduleValueError(('Invalid time format for'
' a minutely job'))
time_values = time_str.split(':')
if len(time_values) == 3:
hour, minute, second = time_values
elif len(time_values) == 2 and self.unit == 'minutes':
hour = 0
minute = 0
_, second = time_values
else:
hour, minute = time_values
second = 0
if self.unit == 'days' or self.start_day:
hour = int(hour)
if not (0 <= hour <= 23):
raise ScheduleValueError('Invalid number of hours')
elif self.unit == 'hours':
hour = 0
elif self.unit == 'minutes':
hour = 0
minute = 0
minute = int(minute)
second = int(second)
self.at_time = datetime.time(hour, minute, second)
return self

def to(self, latest):
Expand Down Expand Up @@ -487,6 +530,20 @@ def run(self):
self._schedule_next_run()
return ret

async def arun(self):
"""
Run the job and immediately reschedule it.

:return: The return value returned by the `job_func`
"""
logger.info('Running job %s', self)
ret = self.job_func()
if inspect.isawaitable(ret):
ret = await ret
self.last_run = datetime.datetime.now()
self._schedule_next_run()
return ret

def _schedule_next_run(self):
"""
Compute the instant when this job should run next.
Expand Down Expand Up @@ -522,6 +579,10 @@ def _schedule_next_run(self):
if days_ahead <= 0: # Target day already happened this week
days_ahead += 7
self.next_run += datetime.timedelta(days_ahead) - self.period
if self.at_time_fn is not None:
self.at_time = self.at_time_fn(self.next_run)
if isinstance(self.at_time, datetime.datetime):
self.at_time = self.at_time.time()
if self.at_time is not None:
if (self.unit not in ('days', 'hours', 'minutes')
and self.start_day is None):
Expand Down Expand Up @@ -582,6 +643,13 @@ def run_pending():
default_scheduler.run_pending()


async def arun_pending():
"""Calls :meth:`run_pending <Scheduler.run_pending>` on the
:data:`default scheduler instance <default_scheduler>`.
"""
await default_scheduler.arun_pending()


def run_all(delay_seconds=0):
"""Calls :meth:`run_all <Scheduler.run_all>` on the
:data:`default scheduler instance <default_scheduler>`.
Expand Down Expand Up @@ -615,3 +683,10 @@ def idle_seconds():
:data:`default scheduler instance <default_scheduler>`.
"""
return default_scheduler.idle_seconds


async def run(scheduler=default_scheduler, sleeptime=1):
"""Async run scheduler every `sleeptime` seconds."""
while True:
await scheduler.arun_pending()
await asyncio.sleep(sleeptime)