Skip to content

Commit

Permalink
173 multithread submission processing (#174)
Browse files Browse the repository at this point in the history
Closes #173 
Closes #181 

- Added the multithread_handler.py to handle creating the event loop,
and monitoring the future from the process executor
- Added the ProcessPoolExecutor stuff to the submission POST. I was not
able to get this working in its own class/function for some reason, I
had to add this stuff for the executor and background task directly to
the endpoint. Otherwise, the execution task never fired.
- Added pytests

---------

Co-authored-by: lchen-2101 <73617864+lchen-2101@users.noreply.github.com>
  • Loading branch information
jcadam14 and lchen-2101 committed May 2, 2024
1 parent cf26810 commit 0ddf51b
Show file tree
Hide file tree
Showing 12 changed files with 241 additions and 155 deletions.
2 changes: 1 addition & 1 deletion src/.env.local
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ FS_UPLOAD_CONFIG__PROTOCOL="file"
FS_UPLOAD_CONFIG__ROOT="../upload"
FS_UPLOAD_CONFIG__MKDIR=true
FS_DOWNLOAD_CONFIG__PROTOCOL="file"
EXPIRED_SUBMISSION_CHECK_SECS=60
EXPIRED_SUBMISSION_CHECK_SECS=120
2 changes: 1 addition & 1 deletion src/.env.template
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ FS_UPLOAD_CONFIG__ROOT="../upload"
FS_UPLOAD_CONFIG__MKDIR=true
FS_DOWNLOAD_CONFIG__PROTOCOL="file"
USER_FI_API_URL=http://localhost:8881/v1/institutions/
EXPIRED_SUBMISSION_CHECK_SECS=60
EXPIRED_SUBMISSION_CHECK_SECS=120
2 changes: 1 addition & 1 deletion src/sbl_filing_api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class Settings(BaseSettings):
submission_file_extension: str = "csv"
submission_file_size: int = 2 * (1024**3)

expired_submission_check_secs: int = 60
expired_submission_check_secs: int = 120

user_fi_api_url: str = "http://sbl-project-user_fi-1:8888/v1/institutions/"

Expand Down
3 changes: 2 additions & 1 deletion src/sbl_filing_api/entities/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
async_sessionmaker,
async_scoped_session,
)
from sqlalchemy.pool import NullPool
from asyncio import current_task
from sbl_filing_api.config import settings

engine = create_async_engine(settings.conn.unicode_string(), echo=True).execution_options(
engine = create_async_engine(settings.conn.unicode_string(), echo=True, poolclass=NullPool).execution_options(
schema_translate_map={None: settings.db_schema}
)
SessionLocal = async_scoped_session(async_sessionmaker(engine, expire_on_commit=False), current_task)
Expand Down
10 changes: 8 additions & 2 deletions src/sbl_filing_api/entities/repos/submission_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,17 @@ async def add_submission(session: AsyncSession, filing_id: int, filename: str, s
return new_sub


async def update_submission(submission: SubmissionDAO, incoming_session: AsyncSession = None) -> SubmissionDAO:
session = incoming_session if incoming_session else SessionLocal()
async def update_submission(session: AsyncSession, submission: SubmissionDAO) -> SubmissionDAO:
return await upsert_helper(session, submission, SubmissionDAO)


async def expire_submission(submission_id: int):
async with SessionLocal() as session:
submission = await get_submission(session, submission_id)
submission.state = SubmissionState.VALIDATION_EXPIRED
await upsert_helper(session, submission, SubmissionDAO)


async def upsert_filing_period(session: AsyncSession, filing_period: FilingPeriodDTO) -> FilingPeriodDAO:
return await upsert_helper(session, filing_period, FilingPeriodDAO)

Expand Down
21 changes: 16 additions & 5 deletions src/sbl_filing_api/routers/filing.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import asyncio

from concurrent.futures import ProcessPoolExecutor
from fastapi import Depends, Request, UploadFile, BackgroundTasks, status
from fastapi.responses import Response, JSONResponse, FileResponse
from multiprocessing import Manager
from regtech_api_commons.api.router_wrapper import Router
from regtech_api_commons.api.exceptions import RegTechHttpException
from sbl_filing_api.entities.models.model_enums import UserActionType
from sbl_filing_api.services import submission_processor
from sbl_filing_api.services.multithread_handler import handle_submission, check_future
from typing import Annotated, List

from sbl_filing_api.entities.engine.engine import get_session
Expand Down Expand Up @@ -31,6 +36,7 @@ async def set_db(request: Request, session: Annotated[AsyncSession, Depends(get_
request.state.db_session = session


executor = ProcessPoolExecutor()
router = Router(dependencies=[Depends(set_db), Depends(verify_user_lei_relation)])


Expand All @@ -46,7 +52,7 @@ async def get_filing(request: Request, response: Response, lei: str, period_code
res = await repo.get_filing(request.state.db_session, lei, period_code)
if res:
return res
response.status_code = status.HTTP_404_NOT_FOUND
response.status_code = status.HTTP_204_NO_CONTENT


@router.post("/institutions/{lei}/filings/{period_code}", response_model=FilingDTO)
Expand Down Expand Up @@ -161,16 +167,21 @@ async def upload_file(
)

submission.state = SubmissionState.SUBMISSION_UPLOADED
submission = await repo.update_submission(submission)
submission = await repo.update_submission(request.state.db_session, submission)
except Exception as e:
submission.state = SubmissionState.UPLOAD_FAILED
submission = await repo.update_submission(submission)
submission = await repo.update_submission(request.state.db_session, submission)
raise RegTechHttpException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
name="Submission Unprocessable",
detail=f"Error while trying to process Submission {submission.id}",
) from e
background_tasks.add_task(submission_processor.validation_monitor, period_code, lei, submission, content)

exec_check = Manager().dict()
exec_check["continue"] = True
loop = asyncio.get_event_loop()
future = loop.run_in_executor(executor, handle_submission, period_code, lei, submission, content, exec_check)
background_tasks.add_task(check_future, future, submission.id, exec_check)

return submission

Expand Down Expand Up @@ -236,7 +247,7 @@ async def accept_submission(request: Request, id: int, lei: str, period_code: st

submission.accepter_id = accepter.id
submission.state = SubmissionState.SUBMISSION_ACCEPTED
submission = await repo.update_submission(submission, request.state.db_session)
submission = await repo.update_submission(request.state.db_session, submission)
return submission


Expand Down
30 changes: 30 additions & 0 deletions src/sbl_filing_api/services/multithread_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import asyncio
import logging

from sbl_filing_api.config import settings
from sbl_filing_api.entities.models.dao import SubmissionDAO
from sbl_filing_api.entities.repos import submission_repo as repo
from sbl_filing_api.services.submission_processor import validate_and_update_submission


logger = logging.getLogger(__name__)


def handle_submission(period_code: str, lei: str, submission: SubmissionDAO, content: bytes, exec_check):
loop = asyncio.get_event_loop()
try:
coro = validate_and_update_submission(period_code, lei, submission, content, exec_check)
loop.run_until_complete(coro)
except Exception as e:
logger.error(e, exc_info=True, stack_info=True)


async def check_future(future, submission_id, exec_check):
await asyncio.sleep(settings.expired_submission_check_secs)
if not future.done():
future.cancel()
exec_check["continue"] = False
await repo.expire_submission(submission_id)
logger.warning(
f"Validation for submission {submission_id} did not complete within the expected timeframe, will be set to VALIDATION_EXPIRED."
)
109 changes: 51 additions & 58 deletions src/sbl_filing_api/services/submission_processor.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import json
import asyncio
import pandas as pd
import importlib.metadata as imeta
import logging

from io import BytesIO
from fastapi import UploadFile
from regtech_data_validator.create_schemas import validate_phases, ValidationPhase
from regtech_data_validator.data_formatters import df_to_json, df_to_download
from regtech_data_validator.checks import Severity
import pandas as pd
import importlib.metadata as imeta
from sbl_filing_api.entities.engine.engine import SessionLocal
from sbl_filing_api.entities.models.dao import SubmissionDAO, SubmissionState
from sbl_filing_api.entities.repos.submission_repo import update_submission
from http import HTTPStatus
import logging
from fsspec import AbstractFileSystem, filesystem
from sbl_filing_api.config import FsProtocol, settings
from regtech_api_commons.api.exceptions import RegTechHttpException
Expand All @@ -22,23 +22,6 @@
REPORT_QUALIFIER = "_report"


async def validation_monitor(period_code: str, lei: str, submission: SubmissionDAO, content: bytes):
try:
await asyncio.wait_for(
validate_and_update_submission(period_code, lei, submission, content),
timeout=settings.expired_submission_check_secs,
)
except asyncio.TimeoutError as te:
log.warn(
f"Validation for submission {submission.id} did not complete within the expected timeframe, will be set to VALIDATION_EXPIRED.",
te,
exc_info=True,
stack_info=True,
)
submission.state = SubmissionState.VALIDATION_EXPIRED
await update_submission(submission)


def validate_file_processable(file: UploadFile) -> None:
extension = file.filename.split(".")[-1].lower()
if file.content_type != settings.submission_file_type or extension != settings.submission_file_extension:
Expand Down Expand Up @@ -99,47 +82,57 @@ async def get_from_storage(period_code: str, lei: str, file_identifier: str, ext
) from e


async def validate_and_update_submission(period_code: str, lei: str, submission: SubmissionDAO, content: bytes):
validator_version = imeta.version("regtech-data-validator")
submission.validation_ruleset_version = validator_version
submission.state = SubmissionState.VALIDATION_IN_PROGRESS
submission = await update_submission(submission)
async def validate_and_update_submission(
period_code: str, lei: str, submission: SubmissionDAO, content: bytes, exec_check: dict
):
async with SessionLocal() as session:
try:
validator_version = imeta.version("regtech-data-validator")
submission.validation_ruleset_version = validator_version
submission.state = SubmissionState.VALIDATION_IN_PROGRESS
submission = await update_submission(session, submission)

df = pd.read_csv(BytesIO(content), dtype=str, na_filter=False)

# Validate Phases
result = validate_phases(df, {"lei": lei})

# Update tables with response
if not result[0]:
submission.state = (
SubmissionState.VALIDATION_WITH_ERRORS
if Severity.ERROR.value in result[1]["validation_severity"].values
else SubmissionState.VALIDATION_WITH_WARNINGS
)
else:
submission.state = SubmissionState.VALIDATION_SUCCESSFUL

submission.validation_results = build_validation_results(result)
submission_report = df_to_download(result[1])
await upload_to_storage(
period_code, lei, str(submission.id) + REPORT_QUALIFIER, submission_report.encode("utf-8")
)

try:
df = pd.read_csv(BytesIO(content), dtype=str, na_filter=False)
if not exec_check["continue"]:
log.warning(f"Submission {submission.id} is expired, will not be updating final state with results.")
return

# Validate Phases
result = validate_phases(df, {"lei": lei})
await update_submission(session, submission)

# Update tables with response
if not result[0]:
submission.state = (
SubmissionState.VALIDATION_WITH_ERRORS
if Severity.ERROR.value in result[1]["validation_severity"].values
else SubmissionState.VALIDATION_WITH_WARNINGS
)
else:
submission.state = SubmissionState.VALIDATION_SUCCESSFUL
submission.validation_results = build_validation_results(result)
submission_report = df_to_download(result[1])
await upload_to_storage(
period_code, lei, str(submission.id) + REPORT_QUALIFIER, submission_report.encode("utf-8")
)
await update_submission(submission)
except RuntimeError as re:
log.error("The file is malformed", re, exc_info=True, stack_info=True)
submission.state = SubmissionState.SUBMISSION_UPLOAD_MALFORMED
await update_submission(session, submission)

except RuntimeError as re:
log.error("The file is malformed", re, exc_info=True, stack_info=True)
submission.state = SubmissionState.SUBMISSION_UPLOAD_MALFORMED
await update_submission(submission)
except Exception as e:
log.error(
f"Validation for submission {submission.id} did not complete due to an unexpected error.",
e,
exc_info=True,
stack_info=True,
)
submission.state = SubmissionState.VALIDATION_ERROR
await update_submission(submission)
except Exception as e:
log.error(
f"Validation for submission {submission.id} did not complete due to an unexpected error.",
e,
exc_info=True,
stack_info=True,
)
submission.state = SubmissionState.VALIDATION_ERROR
await update_submission(session, submission)


def build_validation_results(result):
Expand Down
31 changes: 22 additions & 9 deletions tests/api/routers/test_filing_api.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import datetime
from http import HTTPStatus
import httpx
Expand Down Expand Up @@ -25,6 +26,7 @@
from sbl_filing_api.entities.models.model_enums import UserActionType
from sbl_filing_api.routers.dependencies import verify_lei
from sbl_filing_api.services import submission_processor
from sbl_filing_api.services.multithread_handler import handle_submission, check_future

from sqlalchemy.exc import IntegrityError
from tempfile import NamedTemporaryFile
Expand Down Expand Up @@ -63,7 +65,7 @@ def test_get_filing(self, app_fixture: FastAPI, get_filing_mock: Mock, authed_us

get_filing_mock.return_value = None
res = client.get("/v1/filing/institutions/1234567890ABCDEFGH00/filings/2024/")
assert res.status_code == 404
assert res.status_code == 204

def test_unauthed_post_filing(self, app_fixture: FastAPI):
client = TestClient(app_fixture)
Expand Down Expand Up @@ -234,6 +236,7 @@ async def test_get_submission_by_id(self, mocker: MockerFixture, app_fixture: Fa
)

client = TestClient(app_fixture)

res = client.get("/v1/filing/institutions/1234567890ZXWVUTSR00/filings/2024/submissions/1")
mock.assert_called_with(ANY, 1)
assert res.status_code == 200
Expand Down Expand Up @@ -271,10 +274,17 @@ def test_authed_upload_file(

mock_validate_file = mocker.patch("sbl_filing_api.services.submission_processor.validate_file_processable")
mock_validate_file.return_value = None

mock_upload = mocker.patch("sbl_filing_api.services.submission_processor.upload_to_storage")
mock_upload.return_value = None
mock_validate_submission = mocker.patch("sbl_filing_api.services.submission_processor.validation_monitor")
mock_validate_submission.return_value = None

mock_get_loop = mocker.patch("asyncio.get_event_loop")
mock_event_loop = Mock()
mock_get_loop.return_value = mock_event_loop
mock_event_loop.run_in_executor.return_value = asyncio.Future()

mock_background_task = mocker.patch("fastapi.BackgroundTasks.add_task")

async_mock = AsyncMock(return_value=return_sub)
mock_add_submission = mocker.patch(
"sbl_filing_api.entities.repos.submission_repo.add_submission", side_effect=async_mock
Expand All @@ -290,10 +300,15 @@ def test_authed_upload_file(

res = client.post("/v1/filing/institutions/1234567890ZXWVUTSR00/filings/2024/submissions", files=files)
mock_add_submission.assert_called_with(ANY, 1, "submission.csv", user_action_submit.id)
mock_validate_submission.assert_called_with(
"2024", "1234567890ZXWVUTSR00", return_sub, open(submission_csv, "rb").read()
mock_event_loop.run_in_executor.assert_called_with(
ANY, handle_submission, "2024", "1234567890ZXWVUTSR00", return_sub, open(submission_csv, "rb").read(), ANY
)
assert mock_event_loop.run_in_executor.call_args.args[6]["continue"]
mock_background_task.assert_called_with(
check_future, mock_event_loop.run_in_executor.return_value, return_sub.id, ANY
)
assert mock_update_submission.call_args.args[0].state == SubmissionState.SUBMISSION_UPLOADED
assert mock_background_task.call_args.args[3]["continue"]
assert mock_update_submission.call_args.args[1].state == SubmissionState.SUBMISSION_UPLOADED
assert res.status_code == 200
assert res.json()["id"] == 1
assert res.json()["state"] == SubmissionState.SUBMISSION_UPLOADED
Expand Down Expand Up @@ -357,8 +372,6 @@ def test_submission_update_fail(

mock_upload = mocker.patch("sbl_filing_api.services.submission_processor.upload_to_storage")
mock_upload.return_value = None
mock_validate_submission = mocker.patch("sbl_filing_api.services.submission_processor.validation_monitor")
mock_validate_submission.return_value = None

mock_update_submission = mocker.patch(
"sbl_filing_api.entities.repos.submission_repo.update_submission", side_effect=async_mock
Expand Down Expand Up @@ -388,7 +401,7 @@ def test_submission_update_fail(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail="Failed to upload file"
)
res = client.post("/v1/filing/institutions/1234567890ZXWVUTSR00/filings/2024/submissions", files=file)
assert mock_update_submission.call_args.args[0].state == SubmissionState.UPLOAD_FAILED
assert mock_update_submission.call_args.args[1].state == SubmissionState.UPLOAD_FAILED
assert res.status_code == 500
assert res.json()["error_detail"] == "Error while trying to process SUBMIT User Action"

Expand Down

0 comments on commit 0ddf51b

Please sign in to comment.