Skip to content

Commit

Permalink
Merge branch 'staging' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
shahthepro committed Sep 7, 2023
2 parents be4af98 + 78f315d commit 5218b49
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 34 deletions.
53 changes: 31 additions & 22 deletions eagleproject/notify/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from core.common import Severity
from notify.models import EventSeen

EVENT_DUPE_WINDOW_SECONDS = timedelta(seconds=3600) # 1hr

def event_order_comp(a, b) -> int:
""" Compare to Events for ordering """
Expand Down Expand Up @@ -33,7 +34,10 @@ class Event:
""" An event worthy of an action """
def __init__(self, title, details, severity=Severity.NORMAL,
stamp=datetime.utcnow(), tags=['default'], block_number=0,
transaction_index=0, log_index=0):
transaction_index=0, log_index=0, deduplicate_time_window=EVENT_DUPE_WINDOW_SECONDS):

assert isinstance(deduplicate_time_window, timedelta), "since is not a timedelta object"

self._severity = severity or Severity.NORMAL
self._title = title
self._details = details
Expand All @@ -43,6 +47,7 @@ def __init__(self, title, details, severity=Severity.NORMAL,
self._transaction_index = transaction_index
self._log_index = log_index
self.vague_hash = False
self.deduplicate_time_window = deduplicate_time_window

def __str__(self):
return "{} [{}] {}: {}".format(
Expand Down Expand Up @@ -109,7 +114,7 @@ def tags(self):

def event_critical(title, details, stamp=datetime.utcnow(), tags=None,
block_number=0, transaction_index=0, log_index=0,
log_model=None):
log_model=None, deduplicate_time_window=EVENT_DUPE_WINDOW_SECONDS):
""" Create a critical severity event """

if log_model is not None:
Expand All @@ -126,12 +131,13 @@ def event_critical(title, details, stamp=datetime.utcnow(), tags=None,
block_number=block_number,
transaction_index=transaction_index,
log_index=log_index,
deduplicate_time_window=deduplicate_time_window,
)


def event_high(title, details, stamp=datetime.utcnow(), tags=None,
block_number=0, transaction_index=0, log_index=0,
log_model=None):
log_model=None, deduplicate_time_window=EVENT_DUPE_WINDOW_SECONDS):
""" Create a high severity event """

if log_model is not None:
Expand All @@ -148,12 +154,13 @@ def event_high(title, details, stamp=datetime.utcnow(), tags=None,
block_number=block_number,
transaction_index=transaction_index,
log_index=log_index,
deduplicate_time_window=deduplicate_time_window,
)


def event_normal(title, details, stamp=datetime.utcnow(), tags=None,
block_number=0, transaction_index=0, log_index=0,
log_model=None):
log_model=None, deduplicate_time_window=EVENT_DUPE_WINDOW_SECONDS):
""" Create a normal severity event """

if log_model is not None:
Expand All @@ -170,12 +177,13 @@ def event_normal(title, details, stamp=datetime.utcnow(), tags=None,
block_number=block_number,
transaction_index=transaction_index,
log_index=log_index,
deduplicate_time_window=deduplicate_time_window,
)


def event_low(title, details, stamp=datetime.utcnow(), tags=None,
block_number=0, transaction_index=0, log_index=0,
log_model=None):
log_model=None, deduplicate_time_window=EVENT_DUPE_WINDOW_SECONDS):
""" Create a low severity event """

if log_model is not None:
Expand All @@ -192,21 +200,13 @@ def event_low(title, details, stamp=datetime.utcnow(), tags=None,
block_number=block_number,
transaction_index=transaction_index,
log_index=log_index,
deduplicate_time_window=deduplicate_time_window,
)


def seen_filter(events, since):
""" Filter out any events seen since `since` and add newly discovered hashes
to the DB """
assert isinstance(since, timedelta), "since is not a timedelta object"

hashes_since = [
x.event_hash
for x in EventSeen.objects.filter(
last_seen__gt=datetime.now(tz=timezone.utc) - since
).only('event_hash')
]

def seen_filter(events):
""" Filter out any events seen since `event.deduplicate_time_window` and
add newly discovered hashes to the DB """
filtered = []
events_parsed = []

Expand All @@ -216,13 +216,22 @@ def seen_filter(events, since):
# Deduplicate unprocessed events
if event_hash in events_parsed:
continue

events_parsed.append(event_hash)

if event_hash not in hashes_since:
filtered.append(ev)

EventSeen.objects.update_or_create(event_hash=event_hash, defaults={
_, created = EventSeen.objects.get_or_create(
event_hash=event_hash,
last_seen__gt=datetime.now(tz=timezone.utc) - ev.deduplicate_time_window,
defaults={
'event_hash': event_hash,
'last_seen': datetime.now(tz=timezone.utc)
})
}
)

if not created:
# Do not send duplicate alert within the defined time window
continue

filtered.append(ev)

return filtered
7 changes: 1 addition & 6 deletions eagleproject/notify/main.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
from datetime import timedelta

from notify.actions import execute_all_actions, create_actions_from_events
from notify.events import seen_filter
from notify.triggers import run_all_triggers

EVENT_DUPE_WINDOW_SECONDS = timedelta(seconds=3600) # 1hr


def run_all():
events = run_all_triggers()
events = seen_filter(events, since=EVENT_DUPE_WINDOW_SECONDS)
events = seen_filter(events)
actions = create_actions_from_events(events)
execute_all_actions(actions)
23 changes: 19 additions & 4 deletions eagleproject/notify/triggers/vault_oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
from core.blockchain.strategies import OUSD_BACKING_ASSETS, OETH_BACKING_ASSETS

from time import sleep
from datetime import timedelta

DEDUPE_WINDOW_SECONDS = timedelta(days=1) # 1d

# USD-pegged stable coins drift thresholds
MAX_USD_PRICE = Decimal("1.05")
Expand Down Expand Up @@ -63,13 +66,25 @@ def run_trigger(transfers, new_transfers):
continue
except AssertionError as e:
events.append(
event_high("Exceptional Oracle Price 🧙‍♀️", str(e))
event_high(
"Exceptional Oracle Price 🧙‍♀️",
str(e),
deduplicate_time_window=DEDUPE_WINDOW_SECONDS,
)
)
continue
except RPCError as e:
print("RPC Error when reading price for {}".format(symbol), e)
sleep(3)
if retries <= 0:
events.append(event_high("RPC Error when reading price for {} 🔴".format(symbol), str(e)))
if "below peg" in e.message or retries <= 0:
events.append(
event_high(
"RPC Error when reading price for {} 🔴".format(symbol),
str(e),
deduplicate_time_window=DEDUPE_WINDOW_SECONDS,
),
)
break
elif retries > 0:
sleep(3)

return events
2 changes: 1 addition & 1 deletion eagleproject/notify/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def gc(request):
# event_seen is time sensitive and old entries are irrelevant
try:
EventSeen.objects.filter(
last_seen__gt=datetime.utcnow() - timedelta(hours=2)
last_seen__gt=datetime.utcnow() - timedelta(hours=24)
).delete()
except Exception:
log.exception(
Expand Down
2 changes: 1 addition & 1 deletion eagleproject/scripts/gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def run():
# event_seen is time sensitive and old entries are irrelevant
try:
EventSeen.objects.filter(
last_seen__gt=datetime.utcnow() - timedelta(hours=2)
last_seen__gt=datetime.utcnow() - timedelta(hours=24)
).delete()
except Exception:
log.exception(
Expand Down

0 comments on commit 5218b49

Please sign in to comment.