Skip to content

Commit

Permalink
Add periodic task for syncing active syncs
Browse files Browse the repository at this point in the history
  • Loading branch information
StanGirard committed May 20, 2024
1 parent 101fa12 commit 54f1056
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 7 deletions.
3 changes: 3 additions & 0 deletions backend/celery_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,6 @@
)
else:
raise ValueError(f"Unsupported broker URL: {CELERY_BROKER_URL}")


celery.autodiscover_tasks(["modules.sync.task"])
4 changes: 4 additions & 0 deletions backend/celery_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,4 +217,8 @@ def process_integration_brain_sync():
"task": f"{__name__}.ping_telemetry",
"schedule": crontab(minute="*/30", hour="*"),
},
"process_sync_active": {
"task": "process_sync_active",
"schedule": crontab(minute="*/5", hour="*"),
},
}
2 changes: 1 addition & 1 deletion backend/modules/sync/controller/sync_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ async def get_files_folder_user_sync(
dependencies=[Depends(AuthBearer())],
tags=["Sync"],
)
async def get_syncs_active_in_interval():
async def get_syncs_active_in_interval() -> List[SyncsActive]:
"""
Get all active syncs that need to be synced.
Expand Down
1 change: 0 additions & 1 deletion backend/modules/sync/dto/inputs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import datetime
from typing import List, Optional

from pydantic import BaseModel
Expand Down
6 changes: 3 additions & 3 deletions backend/modules/sync/repository/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def get_details_sync_active(self, sync_active_id: int):
)
return None

async def get_syncs_active_in_interval(self):
async def get_syncs_active_in_interval(self) -> List[SyncsActive]:
"""
Retrieve active syncs that are due for synchronization based on their interval.
Expand All @@ -179,7 +179,7 @@ async def get_syncs_active_in_interval(self):
response = (
self.db.table("syncs_active")
.select("*")
.lt("last_synced", (current_time - timedelta(minutes=1)).isoformat())
.lt("last_synced", (current_time - timedelta(minutes=360)).isoformat())
.execute()
)
if response.data:
Expand All @@ -188,6 +188,6 @@ async def get_syncs_active_in_interval(self):
# Now we can call the sync_google_drive_if_not_synced method to sync the Google Drive files
logger.info("Syncing Google Drive for sync_active_id: %s", sync["id"])

return response.data
return [SyncsActive(**sync) for sync in response.data]
logger.warning("No active syncs found due for synchronization")
return []
3 changes: 2 additions & 1 deletion backend/modules/sync/repository/sync_interfaces.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from abc import ABC, abstractmethod
from typing import List
from uuid import UUID

from modules.sync.dto.inputs import (
Expand Down Expand Up @@ -74,7 +75,7 @@ def get_details_sync_active(self, sync_active_id: int):
pass

@abstractmethod
async def get_syncs_active_in_interval(self):
async def get_syncs_active_in_interval(self) -> List[SyncsActive]:
pass


Expand Down
2 changes: 1 addition & 1 deletion backend/modules/sync/service/sync_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def update_sync_active(
def delete_sync_active(self, sync_active_id: str, user_id: str):
return self.repository.delete_sync_active(sync_active_id, user_id)

async def get_syncs_active_in_interval(self):
async def get_syncs_active_in_interval(self) -> List[SyncsActive]:
return await self.repository.get_syncs_active_in_interval()

def get_details_sync_active(self, sync_active_id: int):
Expand Down
31 changes: 31 additions & 0 deletions backend/modules/sync/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import asyncio

from celery_config import celery
from modules.knowledge.repository.storage import Storage
from modules.sync.repository.sync_files import SyncFiles
from modules.sync.service.sync_service import SyncService, SyncUserService
from modules.sync.utils.googleutils import GoogleSyncUtils


@celery.task(name="process_sync_active")
def process_sync_active():
loop = asyncio.get_event_loop()
loop.run_until_complete(_process_sync_active())


async def _process_sync_active():
sync_active_service = SyncService()
sync_user_service = SyncUserService()
sync_files_repo_service = SyncFiles()
storage = Storage()

google_sync_utils = GoogleSyncUtils(
sync_user_service=sync_user_service,
sync_active_service=sync_active_service,
sync_files_repo=sync_files_repo_service,
storage=storage,
)
active = await sync_active_service.get_syncs_active_in_interval()

for sync in active:
await google_sync_utils.sync(sync_active_id=sync.id, user_id=sync.user_id)

0 comments on commit 54f1056

Please sign in to comment.