-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
173 multithread submission processing #174
Changes from 13 commits
e96f824
e0da4ee
c7867d5
02b10be
f07e160
145bbb3
29e126d
5629420
c456e6b
2da74b0
22025e7
ff2e79f
6e95703
d123f47
4a3acbf
14a50ef
7968e38
afc4672
2e47b3c
2ec5d0c
760ddb7
5cd7609
8951ba2
dd34de6
8183dd8
17401ec
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
import asyncio | ||
import logging | ||
|
||
from sbl_filing_api.config import settings | ||
from sbl_filing_api.entities.models.dao import SubmissionDAO | ||
from sbl_filing_api.entities.repos import submission_repo as repo | ||
from sbl_filing_api.services.submission_processor import validate_and_update_submission | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def handle_submission(period_code: str, lei: str, submission: SubmissionDAO, content: bytes, exec_check): | ||
loop = asyncio.get_event_loop() | ||
try: | ||
coro = validate_and_update_submission(period_code, lei, submission, content, exec_check) | ||
loop.run_until_complete(coro) | ||
except Exception as e: | ||
logger.error(e, exc_info=True, stack_info=True) | ||
|
||
|
||
async def check_future(future, submission_id, exec_check): | ||
await asyncio.sleep(settings.expired_submission_check_secs) | ||
if not future.done(): | ||
future.cancel() | ||
exec_check["continue"] = False | ||
await repo.expire_submission(submission_id) | ||
logger.warning( | ||
f"Validation for submission {submission_id} did not complete within the expected timeframe, will be set to VALIDATION_EXPIRED." | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,17 +1,18 @@ | ||
import json | ||
import asyncio | ||
import pandas as pd | ||
import importlib.metadata as imeta | ||
import logging | ||
|
||
from io import BytesIO | ||
from fastapi import UploadFile | ||
from regtech_data_validator.create_schemas import validate_phases, ValidationPhase | ||
from regtech_data_validator.data_formatters import df_to_json, df_to_download | ||
from regtech_data_validator.checks import Severity | ||
import pandas as pd | ||
import importlib.metadata as imeta | ||
from sbl_filing_api.entities.engine.engine import SessionLocal | ||
from sbl_filing_api.entities.models.dao import SubmissionDAO, SubmissionState | ||
from sbl_filing_api.entities.repos.submission_repo import update_submission | ||
from http import HTTPStatus | ||
import logging | ||
from fsspec import AbstractFileSystem, filesystem | ||
from sbl_filing_api.config import FsProtocol, settings | ||
from regtech_api_commons.api.exceptions import RegTechHttpException | ||
|
@@ -99,47 +100,56 @@ async def get_from_storage(period_code: str, lei: str, file_identifier: str, ext | |
) from e | ||
|
||
|
||
async def validate_and_update_submission(period_code: str, lei: str, submission: SubmissionDAO, content: bytes): | ||
async def validate_and_update_submission( | ||
period_code: str, lei: str, submission: SubmissionDAO, content: bytes, exec_check: dict | ||
): | ||
validator_version = imeta.version("regtech-data-validator") | ||
submission.validation_ruleset_version = validator_version | ||
submission.state = SubmissionState.VALIDATION_IN_PROGRESS | ||
submission = await update_submission(submission) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why isn't this one in the with context block? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because I was lazy? Updated. Removed the concept of the update function and the get_submission function from having a None session. Calling code will now be expected to handle that piece. Updated the pytests now for all that, too |
||
async with SessionLocal() as session: | ||
try: | ||
df = pd.read_csv(BytesIO(content), dtype=str, na_filter=False) | ||
|
||
# Validate Phases | ||
result = validate_phases(df, {"lei": lei}) | ||
|
||
# Update tables with response | ||
if not result[0]: | ||
submission.state = ( | ||
SubmissionState.VALIDATION_WITH_ERRORS | ||
if Severity.ERROR.value in result[1]["validation_severity"].values | ||
else SubmissionState.VALIDATION_WITH_WARNINGS | ||
) | ||
else: | ||
submission.state = SubmissionState.VALIDATION_SUCCESSFUL | ||
|
||
submission.validation_json = build_validation_results(result) | ||
submission_report = df_to_download(result[1]) | ||
await upload_to_storage( | ||
period_code, lei, str(submission.id) + REPORT_QUALIFIER, submission_report.encode("utf-8") | ||
) | ||
|
||
try: | ||
df = pd.read_csv(BytesIO(content), dtype=str, na_filter=False) | ||
if not exec_check["continue"]: | ||
log.warning(f"Submission {submission.id} is expired, will not be updating final state with results.") | ||
return | ||
|
||
# Validate Phases | ||
result = validate_phases(df, {"lei": lei}) | ||
await update_submission(submission, session) | ||
|
||
# Update tables with response | ||
if not result[0]: | ||
submission.state = ( | ||
SubmissionState.VALIDATION_WITH_ERRORS | ||
if Severity.ERROR.value in result[1]["validation_severity"].values | ||
else SubmissionState.VALIDATION_WITH_WARNINGS | ||
) | ||
else: | ||
submission.state = SubmissionState.VALIDATION_SUCCESSFUL | ||
submission.validation_json = build_validation_results(result) | ||
submission_report = df_to_download(result[1]) | ||
await upload_to_storage( | ||
period_code, lei, str(submission.id) + REPORT_QUALIFIER, submission_report.encode("utf-8") | ||
) | ||
await update_submission(submission) | ||
except RuntimeError as re: | ||
log.error("The file is malformed", re, exc_info=True, stack_info=True) | ||
submission.state = SubmissionState.SUBMISSION_UPLOAD_MALFORMED | ||
await update_submission(submission, session) | ||
|
||
except RuntimeError as re: | ||
log.error("The file is malformed", re, exc_info=True, stack_info=True) | ||
submission.state = SubmissionState.SUBMISSION_UPLOAD_MALFORMED | ||
await update_submission(submission) | ||
except Exception as e: | ||
log.error( | ||
f"Validation for submission {submission.id} did not complete due to an unexpected error.", | ||
e, | ||
exc_info=True, | ||
stack_info=True, | ||
) | ||
submission.state = SubmissionState.VALIDATION_ERROR | ||
await update_submission(submission) | ||
except Exception as e: | ||
log.error( | ||
f"Validation for submission {submission.id} did not complete due to an unexpected error.", | ||
e, | ||
exc_info=True, | ||
stack_info=True, | ||
) | ||
submission.state = SubmissionState.VALIDATION_ERROR | ||
await update_submission(submission, session) | ||
|
||
|
||
def build_validation_results(result): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
import asyncio | ||
|
||
from multiprocessing import Manager | ||
from pytest_mock import MockerFixture | ||
from sbl_filing_api.entities.models.dao import SubmissionDAO, SubmissionState | ||
from sbl_filing_api.services.multithread_handler import check_future, handle_submission | ||
from unittest.mock import Mock | ||
|
||
|
||
class TestMultithreader: | ||
async def mock_future(self, sleeptime): | ||
await asyncio.sleep(sleeptime) | ||
return | ||
|
||
async def test_future_checker(self, mocker: MockerFixture): | ||
exec_check = Manager().dict() | ||
exec_check["continue"] = True | ||
|
||
mocker.patch("sbl_filing_api.services.multithread_handler.settings.expired_submission_check_secs", 4) | ||
repo_mock = mocker.patch("sbl_filing_api.entities.repos.submission_repo.expire_submission") | ||
log_mock = mocker.patch("sbl_filing_api.services.multithread_handler.logger") | ||
|
||
future = asyncio.get_event_loop().create_task(self.mock_future(5)) | ||
cancel_mock = mocker.patch.object(future, "cancel") | ||
await check_future(future, 1, exec_check) | ||
|
||
assert not exec_check["continue"] | ||
cancel_mock.assert_called_once() | ||
repo_mock.assert_called_with(1) | ||
log_mock.warning.assert_called_with( | ||
"Validation for submission 1 did not complete within the expected timeframe, will be set to VALIDATION_EXPIRED." | ||
) | ||
|
||
repo_mock.reset_mock() | ||
log_mock.reset_mock() | ||
cancel_mock.reset_mock() | ||
future = asyncio.get_event_loop().create_task(self.mock_future(1)) | ||
exec_check["continue"] = True | ||
await check_future(future, 2, exec_check) | ||
|
||
assert exec_check["continue"] | ||
assert not cancel_mock.called | ||
assert not repo_mock.called | ||
assert not log_mock.called | ||
|
||
async def test_handler(self, mocker: MockerFixture): | ||
mock_sub = SubmissionDAO( | ||
id=1, | ||
filing=1, | ||
state=SubmissionState.SUBMISSION_UPLOADED, | ||
filename="submission.csv", | ||
) | ||
|
||
validation_mock = mocker.patch("sbl_filing_api.services.multithread_handler.validate_and_update_submission") | ||
mock_new_loop = mocker.patch("asyncio.get_event_loop") | ||
mock_event_loop = Mock() | ||
mock_new_loop.return_value = mock_event_loop | ||
|
||
exec_check = Manager().dict() | ||
exec_check["continue"] = True | ||
|
||
handle_submission("2024", "123456789TESTBANK123", mock_sub, b"\00\00", exec_check) | ||
|
||
validation_mock.assert_called_with("2024", "123456789TESTBANK123", mock_sub, b"\00\00", exec_check) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we use the with context here as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, updated.