Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry count mechanism and virus scan check to google drive transmission #205

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Expand Up @@ -44,8 +44,8 @@ dependencies {
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.17.0'
implementation 'com.opencsv:opencsv:5.9'

implementation "org.codeforamerica.platform:form-flow:${formFlowLibraryVersion}"
println "📚Using form flow library ${formFlowLibraryVersion}"
implementation "org.codeforamerica.platform:form-flow:${formFlowLibraryVersion}"

implementation 'com.amazonaws:aws-encryption-sdk-java:3.0.0'
implementation 'org.bouncycastle:bcpg-jdk15on:1.70'
Expand Down
156 changes: 117 additions & 39 deletions src/main/java/org/mdbenefits/app/cli/TransmissionCommands.java
@@ -1,6 +1,7 @@
package org.mdbenefits.app.cli;

import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.Tag;
import com.mailgun.model.message.MessageResponse;
import formflow.library.data.Submission;
import formflow.library.data.UserFile;
Expand All @@ -17,6 +18,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.units.qual.A;
import org.mdbenefits.app.data.Transmission;
import org.mdbenefits.app.data.TransmissionRepository;
import org.mdbenefits.app.data.enums.Counties;
Expand Down Expand Up @@ -48,6 +50,8 @@ public class TransmissionCommands {
@Value("${transmission.email-recipients.queen-annes-county}")
private String QUEEN_ANNES_COUNTY_EMAIL_RECIPIENTS;

private final int MAX_RETRY_COUNT = 5;

private final TransmissionRepository transmissionRepository;
private final CloudFileRepository cloudFileRepository;
private final PdfService pdfService;
Expand Down Expand Up @@ -90,7 +94,8 @@ public TransmissionCommands(
public void transmit() {
log.info("[Transmission] Checking for submissions to transmit...");

List<Transmission> queuedTransmissions = transmissionRepository.findTransmissionsByStatus("QUEUED");
List<Transmission> queuedTransmissions =
transmissionRepository.findByStatusAndRetryCountLessThanOrderByCreatedAtAsc("QUEUED", MAX_RETRY_COUNT);
if (queuedTransmissions.isEmpty()) {
log.info("[Transmission] Nothing to transmit. Exiting.");
return;
Expand Down Expand Up @@ -125,8 +130,10 @@ private void transmitBatch(List<Transmission> transmissions) {
transmissions.forEach(transmission -> {
Map<String, String> errorMap = new HashMap<>();
Submission submission = transmission.getSubmission();

log.info("[Transmission {}] Sending transmission for submission with ID: {}.", transmission.getId(),
submission.getId());

updateTransmissionStatus(transmission, TransmissionStatus.TRANSMITTING, errorMap, false);
byte[] pdfFileBytes;
try {
Expand All @@ -139,13 +146,14 @@ private void transmitBatch(List<Transmission> transmissions) {
handleError(transmission, "pdfGeneration", error, errorMap);
return;
}

String county = (String) submission.getInputData().get("county");
String folderId = getCountyFolderId(county);
String emailRecipients = getCountyEmailRecipients(county);
String confirmationNumber = (String) submission.getInputData().get("confirmationNumber");

String pdfFileName = getPdfFilename(confirmationNumber);

// delete any existing directories with the same name
List<File> existingDirectories = googleDriveClient.findDirectory(confirmationNumber, folderId);
if (!existingDirectories.isEmpty()) {
log.info(
Expand All @@ -154,17 +162,11 @@ private void transmitBatch(List<Transmission> transmissions) {
existingDirectories.size(),
confirmationNumber,
submission.getId());
// remove any already existing folders
for (File dir : existingDirectories) {
if (!googleDriveClient.trashDirectory(dir.getName(), dir.getId(), errorMap)) {
String error = String.format("Failed to delete existing Google Drive directory '%s'", dir.getId());
handleError(transmission, null, error, errorMap);
// don't return - keep going. A new folder will be created and the link to that new
// folder will be sent to caseworker's office
}
}

removeExistingGoogleDriveFolders(existingDirectories, transmission, errorMap);
}

// create google drive folder
GoogleDriveFolder newFolder = googleDriveClient.createFolder(folderId, confirmationNumber, errorMap);
if (newFolder == null || newFolder.getId() == null) {
// something is really wrong here; note the error and skip the entry
Expand All @@ -186,37 +188,64 @@ private void transmitBatch(List<Transmission> transmissions) {
return;
}

List<UserFile> userFilesForSubmission = userFileRepositoryService.findAllBySubmission(submission);

for (int count = 0; count < userFilesForSubmission.size(); count++) {
UserFile file = userFilesForSubmission.get(count);
try {
// get the file from S3
CloudFile cloudFile = cloudFileRepository.get(file.getRepositoryPath());

String fileName = getUserFileName(confirmationNumber, file, count + 1, userFilesForSubmission.size());
log.info("[Transmission {}] Uploading file {} of {} for submission with ID: {}.",
transmission.getId(),
count + 1,
userFilesForSubmission.size(),
submission.getId());
// send to google
googleDriveClient.uploadFile(newFolder.getId(), fileName, file.getMimeType(), cloudFile.getFileBytes(),
file.getFileId().toString(), errorMap);
} catch (AmazonS3Exception e) {
String error = String.format(
"Unable to upload the UserFile (ID: %s) for submission with ID: %s. Exception: %s",
file.getFileId(), submission.getId(), e.getMessage());
handleError(transmission, "fetchingS3File", error, errorMap);
}
}
sendFilesToGoogleDrive(transmission, confirmationNumber, newFolder, errorMap);

sendEmailToCaseworkers(transmission, confirmationNumber, emailRecipients, newFolder.getUrl(), errorMap);

updateTransmissionStatus(transmission, TransmissionStatus.COMPLETED, errorMap, true);
});
}

private void removeExistingGoogleDriveFolders(List<File> directories, Transmission transmission,
Map<String, String> errorMap) {
// remove any already existing folders
for (File dir : directories) {
if (!googleDriveClient.trashDirectory(dir.getName(), dir.getId(), errorMap)) {
String error = String.format("Failed to delete existing Google Drive directory '%s'", dir.getId());
handleError(transmission, null, error, errorMap);
// don't return - keep going. A new folder will be created and the link to that new
// folder will be sent to caseworker's office. The fact that this one couldn't be trashed
// doesn't mean that a new one cannot be created.
// Removing old ones just keeps it less confusing if someone in the office does a search for a particular
// folder. Then only 1 will show up.
}
}
}

private void sendFilesToGoogleDrive(Transmission transmission, String confirmationNumber, GoogleDriveFolder destFolder,
Map<String, String> errorMap) {
Submission submission = transmission.getSubmission();

List<UserFile> userFilesForSubmission = userFileRepositoryService.findAllBySubmission(submission);

for (int count = 0; count < userFilesForSubmission.size(); count++) {
UserFile file = userFilesForSubmission.get(count);
try {
// get the file from S3
CloudFile cloudFile = cloudFileRepository.get(file.getRepositoryPath());
if (!hasBeenVirusScanned(cloudFile)) {
String message = String.format("Has not been scanned for virus yet. Re-queuing submission");
handleRequeue(transmission, "fileVirusStatus", message, errorMap);
}

String fileName = getUserFileName(confirmationNumber, file, count + 1, userFilesForSubmission.size());
log.info("[Transmission {}] Uploading file {} of {} for submission with ID: {}.",
transmission.getId(),
count + 1,
userFilesForSubmission.size(),
submission.getId());
// send to google
googleDriveClient.uploadFile(destFolder.getId(), fileName, file.getMimeType(), cloudFile.getFileBytes(),
file.getFileId().toString(), errorMap);
} catch (AmazonS3Exception e) {
String error = String.format(
"Unable to upload the UserFile (ID: %s) for submission with ID: %s. Exception: %s",
file.getFileId(), submission.getId(), e.getMessage());
handleError(transmission, "fetchingS3File", error, errorMap);
}
}
}

/**
* Send email about the transmission to specified email addresses.
*
Expand Down Expand Up @@ -262,8 +291,8 @@ private void sendEmailToCaseworkers(Transmission transmission, String confirmati
*
* @param transmission the transmission to update
* @param errorKey the error key to use when recording the error in the error map
* @param errorMsg the message to put in the log and the errorMap
* @param errorMap the map of errors to get stored with the transmission in the db.
* @param errorMsg the message to put in the log and the error map
* @param errorMap the map of errors to get stored with the transmission in the db
*/
private void handleError(Transmission transmission, String errorKey, String errorMsg, Map<String, String> errorMap) {
log.error("[Transmission {}]: {}", transmission.getId(), errorMsg);
Expand All @@ -273,22 +302,48 @@ private void handleError(Transmission transmission, String errorKey, String erro
updateTransmissionStatus(transmission, TransmissionStatus.FAILED, errorMap, false);
}

/**
* This will handle the re-queuing of a transmission. It will 1) log info about it 2) mark the transmission as QUEUED and 3)
* update the transmission status in the database.
*
* @param transmission the transmission to update
* @param messageKey the message key to use when recording the message in the error map
* @param message the message to put in the log or error map
* @param errorMap the map of errors (messages) to get stored with the transmission in the db
*/
private void handleRequeue(Transmission transmission, String messageKey, String message, Map<String, String> errorMap) {
log.warn("[Transmission {}]: {}", transmission.getId(), message);
if (messageKey != null) {
errorMap.put(messageKey, message);
}
updateTransmissionStatus(transmission, TransmissionStatus.QUEUED, errorMap, false);
}

/**
* Updates the transmission's status information, including the overall status, error messages and mark it sent (if
* requested).
*
* @param transmission the transmission to update
* @param status the TransmissionStatus status
* @param errorMap a Map<String,String>
* @param markSent
* @param markSent whether this should mark the record as sent to google drive or not
*/
private void updateTransmissionStatus(Transmission transmission, TransmissionStatus status, Map<String, String> errorMap,
boolean markSent) {
transmission.setStatus(status.name());
transmission.setErrors(errorMap);
if (markSent) {
transmission.setSentAt(OffsetDateTime.now());
}

// don't increment retry when just marking as in process
if (!status.equals(TransmissionStatus.TRANSMITTING)) {
int retryCount = transmission.getRetryCount();
retryCount++;
transmission.setRetryCount(retryCount);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: if the retry count is == to the max, should we set status differently, or keep it as QUEUED? It will not get picked up again, but maybe the status should reflect this as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How should we notify ourselves if an transmission fails to send after 5 tries? We should put in some code here that causes us to be alerted. Maybe an ERROR with a specific message and then a Datadog monitor for that message??? @stabai @spokenbird @analoo?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's easiest if there's a logging level that always triggers an alert. Do we have something like CRITICAL that I can set up to do that?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... no CRITICAL that I can see.

}

transmission.setStatus(status.name());

transmissionRepository.save(transmission);
}

Expand Down Expand Up @@ -336,4 +391,27 @@ private String getCountyFolderId(String county) {
private String getCountyEmailRecipients(String county) {
return county.equals(Counties.BALTIMORE.name()) ? BALITMORE_COUNTY_EMAIL_RECIPIENTS : QUEEN_ANNES_COUNTY_EMAIL_RECIPIENTS;
}

/**
* Checks S3 tags included in the CloudFile metadata to ensure that the file has been virus scanned.
*
* @param cloudFile
* @return
*/
private boolean hasBeenVirusScanned(CloudFile cloudFile) {
Map<String, Object> metadata = cloudFile.getMetadata();
boolean scanned = false;
if (metadata != null) {
List<Tag> tags = (List<Tag>) metadata.getOrDefault("tags", List.of());
List<Tag> filteredList = tags.stream()
.filter(tag -> tag.getKey().equals("scan-result"))
.toList();
if (!filteredList.isEmpty()) {
if (filteredList.get(0).getValue().equalsIgnoreCase("clean")) {
scanned = true;
}
}
}
return scanned;
}
}
3 changes: 3 additions & 0 deletions src/main/java/org/mdbenefits/app/data/Transmission.java
Expand Up @@ -20,6 +20,7 @@
@Data
@Table(name = "transmissions")
public class Transmission {

@Id
@GeneratedValue
private UUID id;
Expand All @@ -35,6 +36,8 @@ public class Transmission {

private OffsetDateTime sentAt;

private int retryCount;

String status = "QUEUED";

@Type(JsonType.class)
Expand Down
Expand Up @@ -7,7 +7,7 @@

public interface TransmissionRepository extends CrudRepository<Transmission, UUID> {

List<Transmission> findTransmissionsByStatus(String status);
List<Transmission> findByStatusAndRetryCountLessThanOrderByCreatedAtAsc(String status, int retryCount);

Transmission findTransmissionBySubmission(Submission submission);
}
5 changes: 3 additions & 2 deletions src/main/resources/application-test.yaml
Expand Up @@ -36,8 +36,9 @@ spring:
jdbc:
initialize-schema: always
transmission:
transmission-rate-seconds: 5
transmission-initial-delay-seconds: 2
#keep these large so they do not run
transmission-rate-seconds: 10000
transmission-initial-delay-seconds: 10000
email-recipients:
baltimore-county: test@maininator.com
queen-annes-county: test@mailinator.com
Expand Down
@@ -0,0 +1,2 @@
ALTER TABLE transmissions
ADD retry_count int default 0 NOT NULL;
17 changes: 13 additions & 4 deletions src/test/java/org/mdbenefits/app/cli/TransmissionCommandsTest.java
Expand Up @@ -45,9 +45,14 @@

@Slf4j
@ActiveProfiles("test")
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@TestInstance(Lifecycle.PER_CLASS)
@TestMethodOrder(OrderAnnotation.class)
@SpringBootTest(
properties = {
"transmission.transmission-rate-seconds=5",
"transmission.transmission-initial-delay-seconds=2"
},
webEnvironment = WebEnvironment.RANDOM_PORT)
public class TransmissionCommandsTest {

@MockBean
Expand Down Expand Up @@ -126,7 +131,8 @@ void setup() {
@Test
@Order(1)
public void ensureSubmittedSubmissionsAreEnqueued() {
List<Transmission> transmissions = transmissionRepository.findTransmissionsByStatus(TransmissionStatus.QUEUED.name());
List<Transmission> transmissions = transmissionRepository.findByStatusAndRetryCountLessThanOrderByCreatedAtAsc(
TransmissionStatus.QUEUED.name(), 5);

assertThat(transmissions.size()).isEqualTo(submissionList.size());
}
Expand All @@ -139,7 +145,9 @@ public void transmitterRunsAndProcessesWork() {
() -> verify(transmissionCommands, times(2)).transmit());

// ensure that all transmissions were processed
assertThat(transmissionRepository.findTransmissionsByStatus(TransmissionStatus.QUEUED.name()).isEmpty()).isTrue();
assertThat(
transmissionRepository.findByStatusAndRetryCountLessThanOrderByCreatedAtAsc(TransmissionStatus.QUEUED.name(), 5)
.isEmpty()).isTrue();

submissionList.forEach(s -> {
Transmission transmission = transmissionRepository.findTransmissionBySubmission(s);
Expand All @@ -151,7 +159,8 @@ public void transmitterRunsAndProcessesWork() {
@Order(3)
public void transmitterRunsWhenNoWorkIsQueued() {

assertThat(transmissionRepository.findTransmissionsByStatus(TransmissionStatus.QUEUED.name())).isEmpty();
assertThat(transmissionRepository.findByStatusAndRetryCountLessThanOrderByCreatedAtAsc(TransmissionStatus.QUEUED.name(),
5)).isEmpty();

await().atMost(12, TimeUnit.SECONDS).untilAsserted(
() -> verify(transmissionCommands, times(2)).transmit());
Expand Down