Skip to content

Commit

Permalink
Add retry count mechanism and virus scan check to google drive
Browse files Browse the repository at this point in the history
transmission.
  • Loading branch information
bseeger committed Apr 10, 2024
1 parent c91616a commit bbaf55f
Show file tree
Hide file tree
Showing 9 changed files with 259 additions and 48 deletions.
9 changes: 7 additions & 2 deletions build.gradle
Expand Up @@ -44,8 +44,13 @@ dependencies {
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.17.0'
implementation 'com.opencsv:opencsv:5.9'

implementation "org.codeforamerica.platform:form-flow:${formFlowLibraryVersion}"
println "📚Using form flow library ${formFlowLibraryVersion}"
if (profile == 'dev' || useLocalLibrary == 'true') {
implementation fileTree(dir: "$rootDir/../form-flow/build/libs", include: '*.jar')
println "📦 Using local library"
} else {
implementation "org.codeforamerica.platform:form-flow:${formFlowLibraryVersion}"
println "📚Using form flow library ${formFlowLibraryVersion}"
}

implementation 'com.amazonaws:aws-encryption-sdk-java:3.0.0'
implementation 'org.bouncycastle:bcpg-jdk15on:1.70'
Expand Down
156 changes: 117 additions & 39 deletions src/main/java/org/mdbenefits/app/cli/TransmissionCommands.java
@@ -1,6 +1,7 @@
package org.mdbenefits.app.cli;

import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.Tag;
import com.mailgun.model.message.MessageResponse;
import formflow.library.data.Submission;
import formflow.library.data.UserFile;
Expand All @@ -17,6 +18,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.units.qual.A;
import org.mdbenefits.app.data.Transmission;
import org.mdbenefits.app.data.TransmissionRepository;
import org.mdbenefits.app.data.enums.Counties;
Expand Down Expand Up @@ -48,6 +50,8 @@ public class TransmissionCommands {
@Value("${transmission.email-recipients.queen-annes-county}")
private String QUEEN_ANNES_COUNTY_EMAIL_RECIPIENTS;

private final int MAX_RETRY_COUNT = 5;

private final TransmissionRepository transmissionRepository;
private final CloudFileRepository cloudFileRepository;
private final PdfService pdfService;
Expand Down Expand Up @@ -90,7 +94,8 @@ public TransmissionCommands(
public void transmit() {
log.info("[Transmission] Checking for submissions to transmit...");

List<Transmission> queuedTransmissions = transmissionRepository.findTransmissionsByStatus("QUEUED");
List<Transmission> queuedTransmissions =
transmissionRepository.findByStatusAndRetryCountLessThanOrderByCreatedAtAsc("QUEUED", MAX_RETRY_COUNT);
if (queuedTransmissions.isEmpty()) {
log.info("[Transmission] Nothing to transmit. Exiting.");
return;
Expand Down Expand Up @@ -125,8 +130,10 @@ private void transmitBatch(List<Transmission> transmissions) {
transmissions.forEach(transmission -> {
Map<String, String> errorMap = new HashMap<>();
Submission submission = transmission.getSubmission();

log.info("[Transmission {}] Sending transmission for submission with ID: {}.", transmission.getId(),
submission.getId());

updateTransmissionStatus(transmission, TransmissionStatus.TRANSMITTING, errorMap, false);
byte[] pdfFileBytes;
try {
Expand All @@ -139,13 +146,14 @@ private void transmitBatch(List<Transmission> transmissions) {
handleError(transmission, "pdfGeneration", error, errorMap);
return;
}

String county = (String) submission.getInputData().get("county");
String folderId = getCountyFolderId(county);
String emailRecipients = getCountyEmailRecipients(county);
String confirmationNumber = (String) submission.getInputData().get("confirmationNumber");

String pdfFileName = getPdfFilename(confirmationNumber);

// delete any existing directories with the same name
List<File> existingDirectories = googleDriveClient.findDirectory(confirmationNumber, folderId);
if (!existingDirectories.isEmpty()) {
log.info(
Expand All @@ -154,17 +162,11 @@ private void transmitBatch(List<Transmission> transmissions) {
existingDirectories.size(),
confirmationNumber,
submission.getId());
// remove any already existing folders
for (File dir : existingDirectories) {
if (!googleDriveClient.trashDirectory(dir.getName(), dir.getId(), errorMap)) {
String error = String.format("Failed to delete existing Google Drive directory '%s'", dir.getId());
handleError(transmission, null, error, errorMap);
// don't return - keep going. A new folder will be created and the link to that new
// folder will be sent to caseworker's office
}
}

removeExistingGoogleDriveFolders(existingDirectories, transmission, errorMap);
}

// create google drive folder
GoogleDriveFolder newFolder = googleDriveClient.createFolder(folderId, confirmationNumber, errorMap);
if (newFolder == null || newFolder.getId() == null) {
// something is really wrong here; note the error and skip the entry
Expand All @@ -186,37 +188,64 @@ private void transmitBatch(List<Transmission> transmissions) {
return;
}

List<UserFile> userFilesForSubmission = userFileRepositoryService.findAllBySubmission(submission);

for (int count = 0; count < userFilesForSubmission.size(); count++) {
UserFile file = userFilesForSubmission.get(count);
try {
// get the file from S3
CloudFile cloudFile = cloudFileRepository.get(file.getRepositoryPath());

String fileName = getUserFileName(confirmationNumber, file, count + 1, userFilesForSubmission.size());
log.info("[Transmission {}] Uploading file {} of {} for submission with ID: {}.",
transmission.getId(),
count + 1,
userFilesForSubmission.size(),
submission.getId());
// send to google
googleDriveClient.uploadFile(newFolder.getId(), fileName, file.getMimeType(), cloudFile.getFileBytes(),
file.getFileId().toString(), errorMap);
} catch (AmazonS3Exception e) {
String error = String.format(
"Unable to upload the UserFile (ID: %s) for submission with ID: %s. Exception: %s",
file.getFileId(), submission.getId(), e.getMessage());
handleError(transmission, "fetchingS3File", error, errorMap);
}
}
sendFilesToGoogleDrive(transmission, confirmationNumber, newFolder, errorMap);

sendEmailToCaseworkers(transmission, confirmationNumber, emailRecipients, newFolder.getUrl(), errorMap);

updateTransmissionStatus(transmission, TransmissionStatus.COMPLETED, errorMap, true);
});
}

private void removeExistingGoogleDriveFolders(List<File> directories, Transmission transmission,
Map<String, String> errorMap) {
// remove any already existing folders
for (File dir : directories) {
if (!googleDriveClient.trashDirectory(dir.getName(), dir.getId(), errorMap)) {
String error = String.format("Failed to delete existing Google Drive directory '%s'", dir.getId());
handleError(transmission, null, error, errorMap);
// don't return - keep going. A new folder will be created and the link to that new
// folder will be sent to caseworker's office. The fact that this one couldn't be trashed
// doesn't mean that a new one cannot be created.
// Removing old ones just keeps it less confusing if someone in the office does a search for a particular
// folder. Then only 1 will show up.
}
}
}

private void sendFilesToGoogleDrive(Transmission transmission, String confirmationNumber, GoogleDriveFolder destFolder,
Map<String, String> errorMap) {
Submission submission = transmission.getSubmission();

List<UserFile> userFilesForSubmission = userFileRepositoryService.findAllBySubmission(submission);

for (int count = 0; count < userFilesForSubmission.size(); count++) {
UserFile file = userFilesForSubmission.get(count);
try {
// get the file from S3
CloudFile cloudFile = cloudFileRepository.get(file.getRepositoryPath());
if (!hasBeenVirusScanned(cloudFile)) {
String message = String.format("Has not been scanned for virus yet. Re-queuing submission");
handleRequeue(transmission, "fileVirusStatus", message, errorMap);
}

String fileName = getUserFileName(confirmationNumber, file, count + 1, userFilesForSubmission.size());
log.info("[Transmission {}] Uploading file {} of {} for submission with ID: {}.",
transmission.getId(),
count + 1,
userFilesForSubmission.size(),
submission.getId());
// send to google
googleDriveClient.uploadFile(destFolder.getId(), fileName, file.getMimeType(), cloudFile.getFileBytes(),
file.getFileId().toString(), errorMap);
} catch (AmazonS3Exception e) {
String error = String.format(
"Unable to upload the UserFile (ID: %s) for submission with ID: %s. Exception: %s",
file.getFileId(), submission.getId(), e.getMessage());
handleError(transmission, "fetchingS3File", error, errorMap);
}
}
}

/**
* Send email about the transmission to specified email addresses.
*
Expand Down Expand Up @@ -262,8 +291,8 @@ private void sendEmailToCaseworkers(Transmission transmission, String confirmati
*
* @param transmission the transmission to update
* @param errorKey the error key to use when recording the error in the error map
* @param errorMsg the message to put in the log and the errorMap
* @param errorMap the map of errors to get stored with the transmission in the db.
* @param errorMsg the message to put in the log and the error map
* @param errorMap the map of errors to get stored with the transmission in the db
*/
private void handleError(Transmission transmission, String errorKey, String errorMsg, Map<String, String> errorMap) {
log.error("[Transmission {}]: {}", transmission.getId(), errorMsg);
Expand All @@ -273,22 +302,48 @@ private void handleError(Transmission transmission, String errorKey, String erro
updateTransmissionStatus(transmission, TransmissionStatus.FAILED, errorMap, false);
}

/**
* This will handle the re-queuing of a transmission. It will 1) log info about it 2) mark the transmission as QUEUED and 3)
* update the transmission status in the database.
*
* @param transmission the transmission to update
* @param messageKey the message key to use when recording the message in the error map
* @param message the message to put in the log or error map
* @param errorMap the map of errors (messages) to get stored with the transmission in the db
*/
private void handleRequeue(Transmission transmission, String messageKey, String message, Map<String, String> errorMap) {
log.warn("[Transmission {}]: {}", transmission.getId(), message);
if (messageKey != null) {
errorMap.put(messageKey, message);
}
updateTransmissionStatus(transmission, TransmissionStatus.QUEUED, errorMap, false);
}

/**
* Updates the transmission's status information, including the overall status, error messages and mark it sent (if
* requested).
*
* @param transmission the transmission to update
* @param status the TransmissionStatus status
* @param errorMap a Map<String,String>
* @param markSent
* @param markSent whether this should mark the record as sent to google drive or not
*/
private void updateTransmissionStatus(Transmission transmission, TransmissionStatus status, Map<String, String> errorMap,
boolean markSent) {
transmission.setStatus(status.name());
transmission.setErrors(errorMap);
if (markSent) {
transmission.setSentAt(OffsetDateTime.now());
}

// don't increment retry when just marking as in process
if (!status.equals(TransmissionStatus.TRANSMITTING)) {
int retryCount = transmission.getRetryCount();
retryCount++;
transmission.setRetryCount(retryCount);
}

transmission.setStatus(status.name());

transmissionRepository.save(transmission);
}

Expand Down Expand Up @@ -336,4 +391,27 @@ private String getCountyFolderId(String county) {
private String getCountyEmailRecipients(String county) {
return county.equals(Counties.BALTIMORE.name()) ? BALITMORE_COUNTY_EMAIL_RECIPIENTS : QUEEN_ANNES_COUNTY_EMAIL_RECIPIENTS;
}

/**
* Checks S3 tags included in the CloudFile metadata to ensure that the file has been virus scanned.
*
* @param cloudFile
* @return
*/
private boolean hasBeenVirusScanned(CloudFile cloudFile) {
Map<String, Object> metadata = cloudFile.getMetadata();
boolean scanned = false;
if (metadata != null) {
List<Tag> tags = (List<Tag>) metadata.getOrDefault("tags", List.of());
List<Tag> filteredList = tags.stream()
.filter(tag -> tag.getKey().equals("scan-result"))
.toList();
if (!filteredList.isEmpty()) {
if (filteredList.get(0).getValue().equalsIgnoreCase("clean")) {
scanned = true;
}
}
}
return scanned;
}
}
3 changes: 3 additions & 0 deletions src/main/java/org/mdbenefits/app/data/Transmission.java
Expand Up @@ -20,6 +20,7 @@
@Data
@Table(name = "transmissions")
public class Transmission {

@Id
@GeneratedValue
private UUID id;
Expand All @@ -35,6 +36,8 @@ public class Transmission {

private OffsetDateTime sentAt;

private int retryCount;

String status = "QUEUED";

@Type(JsonType.class)
Expand Down
Expand Up @@ -7,7 +7,7 @@

public interface TransmissionRepository extends CrudRepository<Transmission, UUID> {

List<Transmission> findTransmissionsByStatus(String status);
List<Transmission> findByStatusAndRetryCountLessThanOrderByCreatedAtAsc(String status, int retryCount);

Transmission findTransmissionBySubmission(Submission submission);
}
5 changes: 3 additions & 2 deletions src/main/resources/application-test.yaml
Expand Up @@ -36,8 +36,9 @@ spring:
jdbc:
initialize-schema: always
transmission:
transmission-rate-seconds: 5
transmission-initial-delay-seconds: 2
#keep these large so they do not run
transmission-rate-seconds: 10000
transmission-initial-delay-seconds: 10000
email-recipients:
baltimore-county: test@maininator.com
queen-annes-county: test@mailinator.com
Expand Down
@@ -0,0 +1,2 @@
ALTER TABLE transmissions
ADD retry_count int default 0 NOT NULL;
17 changes: 13 additions & 4 deletions src/test/java/org/mdbenefits/app/cli/TransmissionCommandsTest.java
Expand Up @@ -45,9 +45,14 @@

@Slf4j
@ActiveProfiles("test")
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@TestInstance(Lifecycle.PER_CLASS)
@TestMethodOrder(OrderAnnotation.class)
@SpringBootTest(
properties = {
"transmission.transmission-rate-seconds=5",
"transmission.transmission-initial-delay-seconds=2"
},
webEnvironment = WebEnvironment.RANDOM_PORT)
public class TransmissionCommandsTest {

@MockBean
Expand Down Expand Up @@ -126,7 +131,8 @@ void setup() {
@Test
@Order(1)
public void ensureSubmittedSubmissionsAreEnqueued() {
List<Transmission> transmissions = transmissionRepository.findTransmissionsByStatus(TransmissionStatus.QUEUED.name());
List<Transmission> transmissions = transmissionRepository.findByStatusAndRetryCountLessThanOrderByCreatedAtAsc(
TransmissionStatus.QUEUED.name(), 5);

assertThat(transmissions.size()).isEqualTo(submissionList.size());
}
Expand All @@ -139,7 +145,9 @@ public void transmitterRunsAndProcessesWork() {
() -> verify(transmissionCommands, times(2)).transmit());

// ensure that all transmissions were processed
assertThat(transmissionRepository.findTransmissionsByStatus(TransmissionStatus.QUEUED.name()).isEmpty()).isTrue();
assertThat(
transmissionRepository.findByStatusAndRetryCountLessThanOrderByCreatedAtAsc(TransmissionStatus.QUEUED.name(), 5)
.isEmpty()).isTrue();

submissionList.forEach(s -> {
Transmission transmission = transmissionRepository.findTransmissionBySubmission(s);
Expand All @@ -151,7 +159,8 @@ public void transmitterRunsAndProcessesWork() {
@Order(3)
public void transmitterRunsWhenNoWorkIsQueued() {

assertThat(transmissionRepository.findTransmissionsByStatus(TransmissionStatus.QUEUED.name())).isEmpty();
assertThat(transmissionRepository.findByStatusAndRetryCountLessThanOrderByCreatedAtAsc(TransmissionStatus.QUEUED.name(),
5)).isEmpty();

await().atMost(12, TimeUnit.SECONDS).untilAsserted(
() -> verify(transmissionCommands, times(2)).transmit());
Expand Down

0 comments on commit bbaf55f

Please sign in to comment.