Skip to content

Commit

Permalink
working version
Browse files Browse the repository at this point in the history
  • Loading branch information
i-am-zaidali committed Oct 1, 2023
1 parent bf4fcab commit c4cd358
Showing 1 changed file with 52 additions and 25 deletions.
77 changes: 52 additions & 25 deletions timer/timers.py
@@ -1,6 +1,7 @@
import asyncio
import itertools
import logging
from typing import Dict, List
from typing import Dict, List, Iterable, Callable, Any, Optional, TypeVar

import discord
from discord.ext import tasks
Expand All @@ -14,6 +15,40 @@
guild_defaults = {"timers": [], "timer_settings": {"notify_users": True, "emoji": "\U0001f389"}}
log = logging.getLogger("red.craycogs.Timer.timers")

_T = TypeVar("_T")

Missing = object()


def all_min(
iterable: Iterable[_T],
key: Callable[[_T], Any] = lambda x: x,
*,
sortkey: Optional[Callable[[_T], Any]] = Missing,
):
"""A simple one liner function that returns all the least elements of an iterable instead of just one like the builtin `min()`.
!!!!!! SORT THE DATA PRIOR TO USING THIS FUNCTION !!!!!!
or pass the `sortkey` argument to this function which will be passed to the `sorted()` builtin to sort the iterable
A small explanation of what it does from bard:
- itertools.groupby() groups the elements in the iterable by their key value.
- map() applies the function lambda x: (x[0], list(x[1])) to each group.
This function returns a tuple containing the key of the group and a list of all of the elements in the group.
- min() returns the tuple with the minimum key value.
- [1] gets the second element of the tuple, which is the list of all of the minimum elements in the iterable.
"""
if sortkey is not Missing:
iterable = sorted(iterable, key=sortkey)
try:
return min(
map(lambda x: (x[0], list(x[1])), itertools.groupby(iterable, key=key)),
key=lambda x: x[0],
)[1]

except ValueError:
return []


class Timer(commands.Cog):
"""Start countdowns that help you keep track of the time passed"""
Expand All @@ -29,6 +64,9 @@ def __init__(self, bot: Red):

self.cache: Dict[int, List[TimerObj]] = {}

self.task = self.end_timer.start()
self.to_end: List[TimerObj] = []

async def red_delete_data_for_user(self, *, requester, user_id: int):
for timers in self.cache.values():
for timer in timers:
Expand Down Expand Up @@ -83,42 +121,31 @@ async def _back_to_config(self):
await self.config.guild_from_id(guild_id).timers.set([x.json for x in timers])

async def cog_unload(self):
self.timer_task.cancel()
self.task.cancel()
await self._back_to_config()

@tasks.loop(seconds=3)
@tasks.loop(seconds=1)
async def end_timer(self):
if self.end_timer._current_loop and self.end_timer._current_loop % 100 == 0:
await self.to_config()

if not self.bot.get_cog("Giveaways"):
# safeguard idk
return

c = self.cache.copy()
timers = [
timer
for data in c.values()
for timer in data
if isinstance(timer, TimerObj) and timer.ended
]

if not timers:
return

results = await asyncio.gather(*[timer.end() for timer in timers], return_exceptions=True)

list(
map(
lambda x: self.remove_from_cache(x),
timers,
)
results = await asyncio.gather(
*[timer.end() for timer in self.to_end], return_exceptions=True
)

for result in results:
if isinstance(result, Exception):
log.error(f"A timer ended with an error:", exc_info=result)

self.to_end = all_min(
itertools.chain.from_iterable(self.cache.values()),
key=lambda x: x.remaining_time,
sortkey=lambda x: x.remaining_time,
)

interval = getattr(next(iter(self.to_end), None), "remaining_time", 1)
self.end_timer.change_interval(seconds=interval)

@commands.group(name="timer")
@commands.mod_or_permissions(manage_messages=True)
async def timer(self, ctx: commands.Context):
Expand Down

0 comments on commit c4cd358

Please sign in to comment.