Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

173 multithread submission processing #174

Merged
merged 26 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e96f824
Initial commit
jcadam14 Apr 23, 2024
e0da4ee
Added ProcessPoolExecutor and background tasks
jcadam14 Apr 23, 2024
c7867d5
Added pytest
jcadam14 Apr 23, 2024
02b10be
Added more pytest
jcadam14 Apr 23, 2024
f07e160
Merge branch 'main' into 173-multithread-submission-processing
jcadam14 Apr 24, 2024
145bbb3
Merge branch 'main' into 173-multithread-submission-processing
jcadam14 Apr 24, 2024
29e126d
Trying the asyncio.create_task, might not keep
jcadam14 Apr 25, 2024
5629420
Merge branch 'main' into 173-multithread-submission-processing
jcadam14 Apr 29, 2024
c456e6b
Fixed issues after merge and removed took out BANGs
jcadam14 Apr 29, 2024
2da74b0
Incorporated Le's changes for session
jcadam14 Apr 29, 2024
22025e7
Updated to use run_in_executor and updated pytests
jcadam14 Apr 29, 2024
ff2e79f
Forgot to add Pool Executor. Kind of important
jcadam14 Apr 29, 2024
6e95703
Merge branch 'main' into 173-multithread-submission-processing
jcadam14 Apr 29, 2024
d123f47
Updated so all repo functions expect a session
jcadam14 Apr 30, 2024
4a3acbf
Removed None from session methods
jcadam14 Apr 30, 2024
14a50ef
Updated to use 404s where agreed, changed to return Response
jcadam14 Apr 30, 2024
7968e38
Lots of updates to make things simpler and common across repo functions
jcadam14 Apr 30, 2024
afc4672
Updated regex check to throw RequestValidationError
jcadam14 May 1, 2024
2e47b3c
Linting
jcadam14 May 1, 2024
2ec5d0c
linting and fixed the regex bug
jcadam14 May 1, 2024
760ddb7
Changed pytest to fit with current use of api-commons
jcadam14 May 1, 2024
5cd7609
Changed pytest for ContactInfo regex texts and continued use of Value…
jcadam14 May 1, 2024
8951ba2
lock file
jcadam14 May 1, 2024
dd34de6
fix: NullPool to ensure db connection
lchen-2101 May 1, 2024
8183dd8
Merge branch '177-wip-204-no_content-response-likely-causing-500-erro…
jcadam14 May 2, 2024
17401ec
Merge branch 'main' into 173-multithread-submission-processing
jcadam14 May 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 0 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/.env.local
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ FS_UPLOAD_CONFIG__PROTOCOL="file"
FS_UPLOAD_CONFIG__ROOT="../upload"
FS_UPLOAD_CONFIG__MKDIR=true
FS_DOWNLOAD_CONFIG__PROTOCOL="file"
EXPIRED_SUBMISSION_CHECK_SECS=60
EXPIRED_SUBMISSION_CHECK_SECS=120
2 changes: 1 addition & 1 deletion src/.env.template
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ FS_UPLOAD_CONFIG__ROOT="../upload"
FS_UPLOAD_CONFIG__MKDIR=true
FS_DOWNLOAD_CONFIG__PROTOCOL="file"
USER_FI_API_URL=http://localhost:8881/v1/institutions/
EXPIRED_SUBMISSION_CHECK_SECS=60
EXPIRED_SUBMISSION_CHECK_SECS=120
2 changes: 1 addition & 1 deletion src/sbl_filing_api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class Settings(BaseSettings):
submission_file_extension: str = "csv"
submission_file_size: int = 2 * (1024**3)

expired_submission_check_secs: int = 60
expired_submission_check_secs: int = 120

user_fi_api_url: str = "http://sbl-project-user_fi-1:8888/v1/institutions/"

Expand Down
10 changes: 9 additions & 1 deletion src/sbl_filing_api/entities/repos/submission_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ async def get_filing_periods(session: AsyncSession) -> List[FilingPeriodDAO]:
return await query_helper(session, FilingPeriodDAO)


async def get_submission(session: AsyncSession, submission_id: int) -> SubmissionDAO:
async def get_submission(submission_id: int, incoming_session: AsyncSession = None) -> SubmissionDAO:
session = incoming_session if incoming_session else SessionLocal()
result = await query_helper(session, SubmissionDAO, id=submission_id)
return result[0] if result else None

Expand Down Expand Up @@ -109,6 +110,13 @@ async def update_submission(submission: SubmissionDAO, incoming_session: AsyncSe
return await upsert_helper(session, submission, SubmissionDAO)


async def expire_submission(submission_id: int):
session = SessionLocal()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we use the with context here as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, updated.

submission = await get_submission(submission_id, session)
submission.state = SubmissionState.VALIDATION_EXPIRED
await upsert_helper(session, submission, SubmissionDAO)


async def upsert_filing_period(session: AsyncSession, filing_period: FilingPeriodDTO) -> FilingPeriodDAO:
return await upsert_helper(session, filing_period, FilingPeriodDAO)

Expand Down
15 changes: 13 additions & 2 deletions src/sbl_filing_api/routers/filing.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import asyncio

from concurrent.futures import ProcessPoolExecutor
from fastapi import Depends, Request, UploadFile, BackgroundTasks, status
from fastapi.responses import JSONResponse, FileResponse
from multiprocessing import Manager
from regtech_api_commons.api.router_wrapper import Router
from regtech_api_commons.api.exceptions import RegTechHttpException
from sbl_filing_api.entities.models.model_enums import UserActionType
from sbl_filing_api.services import submission_processor
from sbl_filing_api.services.multithread_handler import handle_submission, check_future
from typing import Annotated, List

from sbl_filing_api.entities.engine.engine import get_session
Expand Down Expand Up @@ -31,6 +36,7 @@ async def set_db(request: Request, session: Annotated[AsyncSession, Depends(get_
request.state.db_session = session


executor = ProcessPoolExecutor()
router = Router(dependencies=[Depends(set_db), Depends(verify_user_lei_relation)])


Expand Down Expand Up @@ -170,7 +176,12 @@ async def upload_file(
name="Submission Unprocessable",
detail=f"Error while trying to process Submission {submission.id}",
) from e
background_tasks.add_task(submission_processor.validation_monitor, period_code, lei, submission, content)

exec_check = Manager().dict()
exec_check["continue"] = True
loop = asyncio.get_event_loop()
future = loop.run_in_executor(executor, handle_submission, period_code, lei, submission, content, exec_check)
background_tasks.add_task(check_future, future, submission.id, exec_check)

return submission

Expand Down Expand Up @@ -200,7 +211,7 @@ async def get_submission_latest(request: Request, lei: str, period_code: str):
@router.get("/institutions/{lei}/filings/{period_code}/submissions/{id}", response_model=SubmissionDTO)
@requires("authenticated")
async def get_submission(request: Request, id: int):
result = await repo.get_submission(request.state.db_session, id)
result = await repo.get_submission(incoming_session=request.state.db_session, submission_id=id)
if result:
return result
return JSONResponse(status_code=status.HTTP_204_NO_CONTENT, content=None)
Expand Down
30 changes: 30 additions & 0 deletions src/sbl_filing_api/services/multithread_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import asyncio
import logging

from sbl_filing_api.config import settings
from sbl_filing_api.entities.models.dao import SubmissionDAO
from sbl_filing_api.entities.repos import submission_repo as repo
from sbl_filing_api.services.submission_processor import validate_and_update_submission


logger = logging.getLogger(__name__)


def handle_submission(period_code: str, lei: str, submission: SubmissionDAO, content: bytes, exec_check):
loop = asyncio.get_event_loop()
try:
coro = validate_and_update_submission(period_code, lei, submission, content, exec_check)
loop.run_until_complete(coro)
except Exception as e:
logger.error(e, exc_info=True, stack_info=True)


async def check_future(future, submission_id, exec_check):
await asyncio.sleep(settings.expired_submission_check_secs)
if not future.done():
future.cancel()
exec_check["continue"] = False
await repo.expire_submission(submission_id)
logger.warning(
f"Validation for submission {submission_id} did not complete within the expected timeframe, will be set to VALIDATION_EXPIRED."
)
82 changes: 46 additions & 36 deletions src/sbl_filing_api/services/submission_processor.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import json
import asyncio
import pandas as pd
import importlib.metadata as imeta
import logging

from io import BytesIO
from fastapi import UploadFile
from regtech_data_validator.create_schemas import validate_phases, ValidationPhase
from regtech_data_validator.data_formatters import df_to_json, df_to_download
from regtech_data_validator.checks import Severity
import pandas as pd
import importlib.metadata as imeta
from sbl_filing_api.entities.engine.engine import SessionLocal
from sbl_filing_api.entities.models.dao import SubmissionDAO, SubmissionState
from sbl_filing_api.entities.repos.submission_repo import update_submission
from http import HTTPStatus
import logging
from fsspec import AbstractFileSystem, filesystem
from sbl_filing_api.config import FsProtocol, settings
from regtech_api_commons.api.exceptions import RegTechHttpException
Expand Down Expand Up @@ -99,47 +100,56 @@ async def get_from_storage(period_code: str, lei: str, file_identifier: str, ext
) from e


async def validate_and_update_submission(period_code: str, lei: str, submission: SubmissionDAO, content: bytes):
async def validate_and_update_submission(
period_code: str, lei: str, submission: SubmissionDAO, content: bytes, exec_check: dict
):
validator_version = imeta.version("regtech-data-validator")
submission.validation_ruleset_version = validator_version
submission.state = SubmissionState.VALIDATION_IN_PROGRESS
submission = await update_submission(submission)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why isn't this one in the with context block?

Copy link
Contributor Author

@jcadam14 jcadam14 Apr 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I was lazy? Updated. Removed the concept of the update function and the get_submission function from having a None session. Calling code will now be expected to handle that piece.

Updated the pytests now for all that, too

async with SessionLocal() as session:
try:
df = pd.read_csv(BytesIO(content), dtype=str, na_filter=False)

# Validate Phases
result = validate_phases(df, {"lei": lei})

# Update tables with response
if not result[0]:
submission.state = (
SubmissionState.VALIDATION_WITH_ERRORS
if Severity.ERROR.value in result[1]["validation_severity"].values
else SubmissionState.VALIDATION_WITH_WARNINGS
)
else:
submission.state = SubmissionState.VALIDATION_SUCCESSFUL

submission.validation_json = build_validation_results(result)
submission_report = df_to_download(result[1])
await upload_to_storage(
period_code, lei, str(submission.id) + REPORT_QUALIFIER, submission_report.encode("utf-8")
)

try:
df = pd.read_csv(BytesIO(content), dtype=str, na_filter=False)
if not exec_check["continue"]:
log.warning(f"Submission {submission.id} is expired, will not be updating final state with results.")
return

# Validate Phases
result = validate_phases(df, {"lei": lei})
await update_submission(submission, session)

# Update tables with response
if not result[0]:
submission.state = (
SubmissionState.VALIDATION_WITH_ERRORS
if Severity.ERROR.value in result[1]["validation_severity"].values
else SubmissionState.VALIDATION_WITH_WARNINGS
)
else:
submission.state = SubmissionState.VALIDATION_SUCCESSFUL
submission.validation_json = build_validation_results(result)
submission_report = df_to_download(result[1])
await upload_to_storage(
period_code, lei, str(submission.id) + REPORT_QUALIFIER, submission_report.encode("utf-8")
)
await update_submission(submission)
except RuntimeError as re:
log.error("The file is malformed", re, exc_info=True, stack_info=True)
submission.state = SubmissionState.SUBMISSION_UPLOAD_MALFORMED
await update_submission(submission, session)

except RuntimeError as re:
log.error("The file is malformed", re, exc_info=True, stack_info=True)
submission.state = SubmissionState.SUBMISSION_UPLOAD_MALFORMED
await update_submission(submission)
except Exception as e:
log.error(
f"Validation for submission {submission.id} did not complete due to an unexpected error.",
e,
exc_info=True,
stack_info=True,
)
submission.state = SubmissionState.VALIDATION_ERROR
await update_submission(submission)
except Exception as e:
log.error(
f"Validation for submission {submission.id} did not complete due to an unexpected error.",
e,
exc_info=True,
stack_info=True,
)
submission.state = SubmissionState.VALIDATION_ERROR
await update_submission(submission, session)


def build_validation_results(result):
Expand Down
27 changes: 21 additions & 6 deletions tests/api/routers/test_filing_api.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import datetime
from http import HTTPStatus
import httpx
Expand Down Expand Up @@ -25,6 +26,7 @@
from sbl_filing_api.entities.models.model_enums import UserActionType
from sbl_filing_api.routers.dependencies import verify_lei
from sbl_filing_api.services import submission_processor
from sbl_filing_api.services.multithread_handler import handle_submission, check_future

from sqlalchemy.exc import IntegrityError
from tempfile import NamedTemporaryFile
Expand Down Expand Up @@ -234,13 +236,14 @@ async def test_get_submission_by_id(self, mocker: MockerFixture, app_fixture: Fa
)

client = TestClient(app_fixture)

res = client.get("/v1/filing/institutions/1234567890ZXWVUTSR00/filings/2024/submissions/1")
mock.assert_called_with(ANY, 1)
mock.assert_called_with(incoming_session=ANY, submission_id=1)
assert res.status_code == 200

mock.return_value = None
res = client.get("/v1/filing/institutions/1234567890ZXWVUTSR00/filings/2024/submissions/1")
mock.assert_called_with(ANY, 1)
mock.assert_called_with(incoming_session=ANY, submission_id=1)
assert res.status_code == 204

def test_authed_upload_file(
Expand Down Expand Up @@ -271,10 +274,17 @@ def test_authed_upload_file(

mock_validate_file = mocker.patch("sbl_filing_api.services.submission_processor.validate_file_processable")
mock_validate_file.return_value = None

mock_upload = mocker.patch("sbl_filing_api.services.submission_processor.upload_to_storage")
mock_upload.return_value = None
mock_validate_submission = mocker.patch("sbl_filing_api.services.submission_processor.validation_monitor")
mock_validate_submission.return_value = None

mock_get_loop = mocker.patch("asyncio.get_event_loop")
mock_event_loop = Mock()
mock_get_loop.return_value = mock_event_loop
mock_event_loop.run_in_executor.return_value = asyncio.Future()

mock_background_task = mocker.patch("fastapi.BackgroundTasks.add_task")

async_mock = AsyncMock(return_value=return_sub)
mock_add_submission = mocker.patch(
"sbl_filing_api.entities.repos.submission_repo.add_submission", side_effect=async_mock
Expand All @@ -290,9 +300,14 @@ def test_authed_upload_file(

res = client.post("/v1/filing/institutions/1234567890ZXWVUTSR00/filings/2024/submissions", files=files)
mock_add_submission.assert_called_with(ANY, 1, "submission.csv", user_action_submit.id)
mock_validate_submission.assert_called_with(
"2024", "1234567890ZXWVUTSR00", return_sub, open(submission_csv, "rb").read()
mock_event_loop.run_in_executor.assert_called_with(
ANY, handle_submission, "2024", "1234567890ZXWVUTSR00", return_sub, open(submission_csv, "rb").read(), ANY
)
assert mock_event_loop.run_in_executor.call_args.args[6]["continue"]
mock_background_task.assert_called_with(
check_future, mock_event_loop.run_in_executor.return_value, return_sub.id, ANY
)
assert mock_background_task.call_args.args[3]["continue"]
assert mock_update_submission.call_args.args[0].state == SubmissionState.SUBMISSION_UPLOADED
assert res.status_code == 200
assert res.json()["id"] == 1
Expand Down
2 changes: 1 addition & 1 deletion tests/entities/repos/test_submission_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ async def test_get_latest_submission(self, query_session: AsyncSession):
assert res.validation_ruleset_version == "v1"

async def test_get_submission(self, query_session: AsyncSession):
res = await repo.get_submission(query_session, submission_id=1)
res = await repo.get_submission(incoming_session=query_session, submission_id=1)
assert res.id == 1
assert res.filing == 1
assert res.state == SubmissionState.SUBMISSION_UPLOADED
Expand Down
64 changes: 64 additions & 0 deletions tests/services/test_multithreader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import asyncio

from multiprocessing import Manager
from pytest_mock import MockerFixture
from sbl_filing_api.entities.models.dao import SubmissionDAO, SubmissionState
from sbl_filing_api.services.multithread_handler import check_future, handle_submission
from unittest.mock import Mock


class TestMultithreader:
async def mock_future(self, sleeptime):
await asyncio.sleep(sleeptime)
return

async def test_future_checker(self, mocker: MockerFixture):
exec_check = Manager().dict()
exec_check["continue"] = True

mocker.patch("sbl_filing_api.services.multithread_handler.settings.expired_submission_check_secs", 4)
repo_mock = mocker.patch("sbl_filing_api.entities.repos.submission_repo.expire_submission")
log_mock = mocker.patch("sbl_filing_api.services.multithread_handler.logger")

future = asyncio.get_event_loop().create_task(self.mock_future(5))
cancel_mock = mocker.patch.object(future, "cancel")
await check_future(future, 1, exec_check)

assert not exec_check["continue"]
cancel_mock.assert_called_once()
repo_mock.assert_called_with(1)
log_mock.warning.assert_called_with(
"Validation for submission 1 did not complete within the expected timeframe, will be set to VALIDATION_EXPIRED."
)

repo_mock.reset_mock()
log_mock.reset_mock()
cancel_mock.reset_mock()
future = asyncio.get_event_loop().create_task(self.mock_future(1))
exec_check["continue"] = True
await check_future(future, 2, exec_check)

assert exec_check["continue"]
assert not cancel_mock.called
assert not repo_mock.called
assert not log_mock.called

async def test_handler(self, mocker: MockerFixture):
mock_sub = SubmissionDAO(
id=1,
filing=1,
state=SubmissionState.SUBMISSION_UPLOADED,
filename="submission.csv",
)

validation_mock = mocker.patch("sbl_filing_api.services.multithread_handler.validate_and_update_submission")
mock_new_loop = mocker.patch("asyncio.get_event_loop")
mock_event_loop = Mock()
mock_new_loop.return_value = mock_event_loop

exec_check = Manager().dict()
exec_check["continue"] = True

handle_submission("2024", "123456789TESTBANK123", mock_sub, b"\00\00", exec_check)

validation_mock.assert_called_with("2024", "123456789TESTBANK123", mock_sub, b"\00\00", exec_check)