Skip to content

Commit

Permalink
store lastUpdateTime in db
Browse files Browse the repository at this point in the history
  • Loading branch information
ikreymer committed Mar 3, 2024
1 parent e5af8e5 commit 97f2903
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 17 deletions.
18 changes: 15 additions & 3 deletions backend/btrixcloud/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,16 +431,28 @@ async def update_running_crawl_stats(self, crawl_id, stats):
query = {"_id": crawl_id, "type": "crawl", "state": "running"}
return await self.crawls.find_one_and_update(query, {"$set": {"stats": stats}})

async def inc_crawl_exec_time(self, crawl_id, exec_time, last_updated_time):
async def inc_crawl_exec_time(self, crawl_id, exec_time):
"""increment exec time"""
return await self.crawls.find_one_and_update(
{"_id": crawl_id, "type": "crawl", "_lut": {"$ne": last_updated_time}},
{"_id": crawl_id, "type": "crawl"},
{
"$inc": {"crawlExecSeconds": exec_time},
"$set": {"_lut": last_updated_time},
},
)

async def get_crawl_exec_last_update_time(self, crawl_id):
"""get crawl last updated time"""
res = await self.crawls.find_one(
{"_id": crawl_id, "type": "crawl"}, projection=["_lut"]
)
return res and res.get("_lut")

async def set_crawl_exec_last_update_time(self, crawl_id, last_update_time):
"""set crawl last update time"""
return await self.crawls.find_one_and_update(
{"_id": crawl_id, "type": "crawl"}, {"$set": {"_lut": last_update_time}}
)

async def get_crawl_state(self, crawl_id):
"""return current crawl state of a crawl"""
res = await self.crawls.find_one(
Expand Down
25 changes: 11 additions & 14 deletions backend/btrixcloud/operator/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ async def sync_crawls(self, data: MCSyncData):

else:
status.scale = crawl.scale
status.lastUpdatedTime = to_k8s_date(dt_now())
now = dt_now()
await self.crawl_ops.set_crawl_exec_last_update_time(crawl_id, now)
status.lastUpdatedTime = to_k8s_date(now)

children = self._load_redis(params, status, data.children)

Expand Down Expand Up @@ -828,12 +830,15 @@ async def increment_pod_exec_time(
"""inc exec time tracking"""
now = dt_now()

if not status.lastUpdatedTime:
update_start_time = await self.crawl_ops.get_crawl_exec_last_update_time(
crawl_id
)

if not update_start_time:
await self.crawl_ops.set_crawl_exec_last_update_time(crawl_id, now)
status.lastUpdatedTime = to_k8s_date(now)
return

update_start_time = from_k8s_date(status.lastUpdatedTime)

reason = None
update_duration = (now - update_start_time).total_seconds()

Expand Down Expand Up @@ -907,16 +912,7 @@ async def increment_pod_exec_time(
max_duration = max(duration, max_duration)

if exec_time:
if not await self.crawl_ops.inc_crawl_exec_time(
crawl_id, exec_time, status.lastUpdatedTime
):
# if lastUpdatedTime is same as previous, something is wrong, don't update!
print(
f"Already updated for lastUpdatedTime {status.lastUpdatedTime}, skipping execTime update!",
flush=True,
)
status.lastUpdatedTime = to_k8s_date(now)
return
await self.crawl_ops.inc_crawl_exec_time(crawl_id, exec_time)

await self.org_ops.inc_org_time_stats(oid, exec_time, True)
status.crawlExecTime += exec_time
Expand All @@ -927,6 +923,7 @@ async def increment_pod_exec_time(
flush=True,
)

await self.crawl_ops.set_crawl_exec_last_update_time(crawl_id, now)
status.lastUpdatedTime = to_k8s_date(now)

def should_mark_waiting(self, state, started):
Expand Down

0 comments on commit 97f2903

Please sign in to comment.