Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

173 multithread submission processing #174

Merged
merged 26 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e96f824
Initial commit
jcadam14 Apr 23, 2024
e0da4ee
Added ProcessPoolExecutor and background tasks
jcadam14 Apr 23, 2024
c7867d5
Added pytest
jcadam14 Apr 23, 2024
02b10be
Added more pytest
jcadam14 Apr 23, 2024
f07e160
Merge branch 'main' into 173-multithread-submission-processing
jcadam14 Apr 24, 2024
145bbb3
Merge branch 'main' into 173-multithread-submission-processing
jcadam14 Apr 24, 2024
29e126d
Trying the asyncio.create_task, might not keep
jcadam14 Apr 25, 2024
5629420
Merge branch 'main' into 173-multithread-submission-processing
jcadam14 Apr 29, 2024
c456e6b
Fixed issues after merge and removed took out BANGs
jcadam14 Apr 29, 2024
2da74b0
Incorporated Le's changes for session
jcadam14 Apr 29, 2024
22025e7
Updated to use run_in_executor and updated pytests
jcadam14 Apr 29, 2024
ff2e79f
Forgot to add Pool Executor. Kind of important
jcadam14 Apr 29, 2024
6e95703
Merge branch 'main' into 173-multithread-submission-processing
jcadam14 Apr 29, 2024
d123f47
Updated so all repo functions expect a session
jcadam14 Apr 30, 2024
4a3acbf
Removed None from session methods
jcadam14 Apr 30, 2024
14a50ef
Updated to use 404s where agreed, changed to return Response
jcadam14 Apr 30, 2024
7968e38
Lots of updates to make things simpler and common across repo functions
jcadam14 Apr 30, 2024
afc4672
Updated regex check to throw RequestValidationError
jcadam14 May 1, 2024
2e47b3c
Linting
jcadam14 May 1, 2024
2ec5d0c
linting and fixed the regex bug
jcadam14 May 1, 2024
760ddb7
Changed pytest to fit with current use of api-commons
jcadam14 May 1, 2024
5cd7609
Changed pytest for ContactInfo regex texts and continued use of Value…
jcadam14 May 1, 2024
8951ba2
lock file
jcadam14 May 1, 2024
dd34de6
fix: NullPool to ensure db connection
lchen-2101 May 1, 2024
8183dd8
Merge branch '177-wip-204-no_content-response-likely-causing-500-erro…
jcadam14 May 2, 2024
17401ec
Merge branch 'main' into 173-multithread-submission-processing
jcadam14 May 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/.env.local
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ FS_UPLOAD_CONFIG__PROTOCOL="file"
FS_UPLOAD_CONFIG__ROOT="../upload"
FS_UPLOAD_CONFIG__MKDIR=true
FS_DOWNLOAD_CONFIG__PROTOCOL="file"
EXPIRED_SUBMISSION_CHECK_SECS=60
EXPIRED_SUBMISSION_CHECK_SECS=120
2 changes: 1 addition & 1 deletion src/.env.template
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ FS_UPLOAD_CONFIG__ROOT="../upload"
FS_UPLOAD_CONFIG__MKDIR=true
FS_DOWNLOAD_CONFIG__PROTOCOL="file"
USER_FI_API_URL=http://localhost:8881/v1/institutions/
EXPIRED_SUBMISSION_CHECK_SECS=60
EXPIRED_SUBMISSION_CHECK_SECS=120
2 changes: 1 addition & 1 deletion src/sbl_filing_api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class Settings(BaseSettings):
submission_file_extension: str = "csv"
submission_file_size: int = 2 * (1024**3)

expired_submission_check_secs: int = 60
expired_submission_check_secs: int = 120

user_fi_api_url: str = "http://sbl-project-user_fi-1:8888/v1/institutions/"

Expand Down
3 changes: 2 additions & 1 deletion src/sbl_filing_api/entities/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
async_sessionmaker,
async_scoped_session,
)
from sqlalchemy.pool import NullPool
from asyncio import current_task
from sbl_filing_api.config import settings

engine = create_async_engine(settings.conn.unicode_string(), echo=True).execution_options(
engine = create_async_engine(settings.conn.unicode_string(), echo=True, poolclass=NullPool).execution_options(
schema_translate_map={None: settings.db_schema}
)
SessionLocal = async_scoped_session(async_sessionmaker(engine, expire_on_commit=False), current_task)
Expand Down
10 changes: 8 additions & 2 deletions src/sbl_filing_api/entities/repos/submission_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,17 @@ async def add_submission(session: AsyncSession, filing_id: int, filename: str, s
return new_sub


async def update_submission(submission: SubmissionDAO, incoming_session: AsyncSession = None) -> SubmissionDAO:
session = incoming_session if incoming_session else SessionLocal()
async def update_submission(session: AsyncSession, submission: SubmissionDAO) -> SubmissionDAO:
return await upsert_helper(session, submission, SubmissionDAO)


async def expire_submission(submission_id: int):
async with SessionLocal() as session:
submission = await get_submission(session, submission_id)
submission.state = SubmissionState.VALIDATION_EXPIRED
await upsert_helper(session, submission, SubmissionDAO)


async def upsert_filing_period(session: AsyncSession, filing_period: FilingPeriodDTO) -> FilingPeriodDAO:
return await upsert_helper(session, filing_period, FilingPeriodDAO)

Expand Down
21 changes: 16 additions & 5 deletions src/sbl_filing_api/routers/filing.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import asyncio

from concurrent.futures import ProcessPoolExecutor
from fastapi import Depends, Request, UploadFile, BackgroundTasks, status
from fastapi.responses import Response, JSONResponse, FileResponse
from multiprocessing import Manager
from regtech_api_commons.api.router_wrapper import Router
from regtech_api_commons.api.exceptions import RegTechHttpException
from sbl_filing_api.entities.models.model_enums import UserActionType
from sbl_filing_api.services import submission_processor
from sbl_filing_api.services.multithread_handler import handle_submission, check_future
from typing import Annotated, List

from sbl_filing_api.entities.engine.engine import get_session
Expand Down Expand Up @@ -31,6 +36,7 @@ async def set_db(request: Request, session: Annotated[AsyncSession, Depends(get_
request.state.db_session = session


executor = ProcessPoolExecutor()
router = Router(dependencies=[Depends(set_db), Depends(verify_user_lei_relation)])


Expand All @@ -46,7 +52,7 @@ async def get_filing(request: Request, response: Response, lei: str, period_code
res = await repo.get_filing(request.state.db_session, lei, period_code)
if res:
return res
response.status_code = status.HTTP_404_NOT_FOUND
response.status_code = status.HTTP_204_NO_CONTENT


@router.post("/institutions/{lei}/filings/{period_code}", response_model=FilingDTO)
Expand Down Expand Up @@ -161,16 +167,21 @@ async def upload_file(
)

submission.state = SubmissionState.SUBMISSION_UPLOADED
submission = await repo.update_submission(submission)
submission = await repo.update_submission(request.state.db_session, submission)
except Exception as e:
submission.state = SubmissionState.UPLOAD_FAILED
submission = await repo.update_submission(submission)
submission = await repo.update_submission(request.state.db_session, submission)
raise RegTechHttpException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
name="Submission Unprocessable",
detail=f"Error while trying to process Submission {submission.id}",
) from e
background_tasks.add_task(submission_processor.validation_monitor, period_code, lei, submission, content)

exec_check = Manager().dict()
exec_check["continue"] = True
loop = asyncio.get_event_loop()
future = loop.run_in_executor(executor, handle_submission, period_code, lei, submission, content, exec_check)
background_tasks.add_task(check_future, future, submission.id, exec_check)

return submission

Expand Down Expand Up @@ -236,7 +247,7 @@ async def accept_submission(request: Request, id: int, lei: str, period_code: st

submission.accepter_id = accepter.id
submission.state = SubmissionState.SUBMISSION_ACCEPTED
submission = await repo.update_submission(submission, request.state.db_session)
submission = await repo.update_submission(request.state.db_session, submission)
return submission


Expand Down
30 changes: 30 additions & 0 deletions src/sbl_filing_api/services/multithread_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import asyncio
import logging

from sbl_filing_api.config import settings
from sbl_filing_api.entities.models.dao import SubmissionDAO
from sbl_filing_api.entities.repos import submission_repo as repo
from sbl_filing_api.services.submission_processor import validate_and_update_submission


logger = logging.getLogger(__name__)


def handle_submission(period_code: str, lei: str, submission: SubmissionDAO, content: bytes, exec_check):
loop = asyncio.get_event_loop()
try:
coro = validate_and_update_submission(period_code, lei, submission, content, exec_check)
loop.run_until_complete(coro)
except Exception as e:
logger.error(e, exc_info=True, stack_info=True)


async def check_future(future, submission_id, exec_check):
await asyncio.sleep(settings.expired_submission_check_secs)
if not future.done():
future.cancel()
exec_check["continue"] = False
await repo.expire_submission(submission_id)
logger.warning(
f"Validation for submission {submission_id} did not complete within the expected timeframe, will be set to VALIDATION_EXPIRED."
)
109 changes: 51 additions & 58 deletions src/sbl_filing_api/services/submission_processor.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import json
import asyncio
import pandas as pd
import importlib.metadata as imeta
import logging

from io import BytesIO
from fastapi import UploadFile
from regtech_data_validator.create_schemas import validate_phases, ValidationPhase
from regtech_data_validator.data_formatters import df_to_json, df_to_download
from regtech_data_validator.checks import Severity
import pandas as pd
import importlib.metadata as imeta
from sbl_filing_api.entities.engine.engine import SessionLocal
from sbl_filing_api.entities.models.dao import SubmissionDAO, SubmissionState
from sbl_filing_api.entities.repos.submission_repo import update_submission
from http import HTTPStatus
import logging
from fsspec import AbstractFileSystem, filesystem
from sbl_filing_api.config import FsProtocol, settings
from regtech_api_commons.api.exceptions import RegTechHttpException
Expand All @@ -22,23 +22,6 @@
REPORT_QUALIFIER = "_report"


async def validation_monitor(period_code: str, lei: str, submission: SubmissionDAO, content: bytes):
try:
await asyncio.wait_for(
validate_and_update_submission(period_code, lei, submission, content),
timeout=settings.expired_submission_check_secs,
)
except asyncio.TimeoutError as te:
log.warn(
f"Validation for submission {submission.id} did not complete within the expected timeframe, will be set to VALIDATION_EXPIRED.",
te,
exc_info=True,
stack_info=True,
)
submission.state = SubmissionState.VALIDATION_EXPIRED
await update_submission(submission)


def validate_file_processable(file: UploadFile) -> None:
extension = file.filename.split(".")[-1].lower()
if file.content_type != settings.submission_file_type or extension != settings.submission_file_extension:
Expand Down Expand Up @@ -99,47 +82,57 @@ async def get_from_storage(period_code: str, lei: str, file_identifier: str, ext
) from e


async def validate_and_update_submission(period_code: str, lei: str, submission: SubmissionDAO, content: bytes):
validator_version = imeta.version("regtech-data-validator")
submission.validation_ruleset_version = validator_version
submission.state = SubmissionState.VALIDATION_IN_PROGRESS
submission = await update_submission(submission)
async def validate_and_update_submission(
period_code: str, lei: str, submission: SubmissionDAO, content: bytes, exec_check: dict
):
async with SessionLocal() as session:
try:
validator_version = imeta.version("regtech-data-validator")
submission.validation_ruleset_version = validator_version
submission.state = SubmissionState.VALIDATION_IN_PROGRESS
submission = await update_submission(session, submission)

df = pd.read_csv(BytesIO(content), dtype=str, na_filter=False)

# Validate Phases
result = validate_phases(df, {"lei": lei})

# Update tables with response
if not result[0]:
submission.state = (
SubmissionState.VALIDATION_WITH_ERRORS
if Severity.ERROR.value in result[1]["validation_severity"].values
else SubmissionState.VALIDATION_WITH_WARNINGS
)
else:
submission.state = SubmissionState.VALIDATION_SUCCESSFUL

submission.validation_results = build_validation_results(result)
submission_report = df_to_download(result[1])
await upload_to_storage(
period_code, lei, str(submission.id) + REPORT_QUALIFIER, submission_report.encode("utf-8")
)

try:
df = pd.read_csv(BytesIO(content), dtype=str, na_filter=False)
if not exec_check["continue"]:
log.warning(f"Submission {submission.id} is expired, will not be updating final state with results.")
return

# Validate Phases
result = validate_phases(df, {"lei": lei})
await update_submission(session, submission)

# Update tables with response
if not result[0]:
submission.state = (
SubmissionState.VALIDATION_WITH_ERRORS
if Severity.ERROR.value in result[1]["validation_severity"].values
else SubmissionState.VALIDATION_WITH_WARNINGS
)
else:
submission.state = SubmissionState.VALIDATION_SUCCESSFUL
submission.validation_results = build_validation_results(result)
submission_report = df_to_download(result[1])
await upload_to_storage(
period_code, lei, str(submission.id) + REPORT_QUALIFIER, submission_report.encode("utf-8")
)
await update_submission(submission)
except RuntimeError as re:
log.error("The file is malformed", re, exc_info=True, stack_info=True)
submission.state = SubmissionState.SUBMISSION_UPLOAD_MALFORMED
await update_submission(session, submission)

except RuntimeError as re:
log.error("The file is malformed", re, exc_info=True, stack_info=True)
submission.state = SubmissionState.SUBMISSION_UPLOAD_MALFORMED
await update_submission(submission)
except Exception as e:
log.error(
f"Validation for submission {submission.id} did not complete due to an unexpected error.",
e,
exc_info=True,
stack_info=True,
)
submission.state = SubmissionState.VALIDATION_ERROR
await update_submission(submission)
except Exception as e:
log.error(
f"Validation for submission {submission.id} did not complete due to an unexpected error.",
e,
exc_info=True,
stack_info=True,
)
submission.state = SubmissionState.VALIDATION_ERROR
await update_submission(session, submission)


def build_validation_results(result):
Expand Down
31 changes: 22 additions & 9 deletions tests/api/routers/test_filing_api.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import datetime
from http import HTTPStatus
import httpx
Expand Down Expand Up @@ -25,6 +26,7 @@
from sbl_filing_api.entities.models.model_enums import UserActionType
from sbl_filing_api.routers.dependencies import verify_lei
from sbl_filing_api.services import submission_processor
from sbl_filing_api.services.multithread_handler import handle_submission, check_future

from sqlalchemy.exc import IntegrityError
from tempfile import NamedTemporaryFile
Expand Down Expand Up @@ -63,7 +65,7 @@ def test_get_filing(self, app_fixture: FastAPI, get_filing_mock: Mock, authed_us

get_filing_mock.return_value = None
res = client.get("/v1/filing/institutions/1234567890ABCDEFGH00/filings/2024/")
assert res.status_code == 404
assert res.status_code == 204

def test_unauthed_post_filing(self, app_fixture: FastAPI):
client = TestClient(app_fixture)
Expand Down Expand Up @@ -234,6 +236,7 @@ async def test_get_submission_by_id(self, mocker: MockerFixture, app_fixture: Fa
)

client = TestClient(app_fixture)

res = client.get("/v1/filing/institutions/1234567890ZXWVUTSR00/filings/2024/submissions/1")
mock.assert_called_with(ANY, 1)
assert res.status_code == 200
Expand Down Expand Up @@ -271,10 +274,17 @@ def test_authed_upload_file(

mock_validate_file = mocker.patch("sbl_filing_api.services.submission_processor.validate_file_processable")
mock_validate_file.return_value = None

mock_upload = mocker.patch("sbl_filing_api.services.submission_processor.upload_to_storage")
mock_upload.return_value = None
mock_validate_submission = mocker.patch("sbl_filing_api.services.submission_processor.validation_monitor")
mock_validate_submission.return_value = None

mock_get_loop = mocker.patch("asyncio.get_event_loop")
mock_event_loop = Mock()
mock_get_loop.return_value = mock_event_loop
mock_event_loop.run_in_executor.return_value = asyncio.Future()

mock_background_task = mocker.patch("fastapi.BackgroundTasks.add_task")

async_mock = AsyncMock(return_value=return_sub)
mock_add_submission = mocker.patch(
"sbl_filing_api.entities.repos.submission_repo.add_submission", side_effect=async_mock
Expand All @@ -290,10 +300,15 @@ def test_authed_upload_file(

res = client.post("/v1/filing/institutions/1234567890ZXWVUTSR00/filings/2024/submissions", files=files)
mock_add_submission.assert_called_with(ANY, 1, "submission.csv", user_action_submit.id)
mock_validate_submission.assert_called_with(
"2024", "1234567890ZXWVUTSR00", return_sub, open(submission_csv, "rb").read()
mock_event_loop.run_in_executor.assert_called_with(
ANY, handle_submission, "2024", "1234567890ZXWVUTSR00", return_sub, open(submission_csv, "rb").read(), ANY
)
assert mock_event_loop.run_in_executor.call_args.args[6]["continue"]
mock_background_task.assert_called_with(
check_future, mock_event_loop.run_in_executor.return_value, return_sub.id, ANY
)
assert mock_update_submission.call_args.args[0].state == SubmissionState.SUBMISSION_UPLOADED
assert mock_background_task.call_args.args[3]["continue"]
assert mock_update_submission.call_args.args[1].state == SubmissionState.SUBMISSION_UPLOADED
assert res.status_code == 200
assert res.json()["id"] == 1
assert res.json()["state"] == SubmissionState.SUBMISSION_UPLOADED
Expand Down Expand Up @@ -357,8 +372,6 @@ def test_submission_update_fail(

mock_upload = mocker.patch("sbl_filing_api.services.submission_processor.upload_to_storage")
mock_upload.return_value = None
mock_validate_submission = mocker.patch("sbl_filing_api.services.submission_processor.validation_monitor")
mock_validate_submission.return_value = None

mock_update_submission = mocker.patch(
"sbl_filing_api.entities.repos.submission_repo.update_submission", side_effect=async_mock
Expand Down Expand Up @@ -388,7 +401,7 @@ def test_submission_update_fail(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail="Failed to upload file"
)
res = client.post("/v1/filing/institutions/1234567890ZXWVUTSR00/filings/2024/submissions", files=file)
assert mock_update_submission.call_args.args[0].state == SubmissionState.UPLOAD_FAILED
assert mock_update_submission.call_args.args[1].state == SubmissionState.UPLOAD_FAILED
assert res.status_code == 500
assert res.json()["error_detail"] == "Error while trying to process SUBMIT User Action"

Expand Down