Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ecarton/investigations/3456 stricter typing #3596

Draft
wants to merge 49 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
4b54f24
add error to message packaging in DLQ
Feb 6, 2024
413ff20
tests added to include error message
Feb 6, 2024
7f87710
include error message in s3 output
Feb 6, 2024
0a467fb
merge master
Feb 6, 2024
5c270a3
changelog for curent changes
Feb 6, 2024
0af5da2
Merge branch 'master' into CUMULUS-3456-DLA-clarity
Feb 7, 2024
f41179a
body arg needs to be a string
Feb 7, 2024
3a632cc
capture extra fields and pack them in dlq
Feb 7, 2024
7a4fa3b
remove unused lambda extractor
Feb 8, 2024
1a10f74
just pass through entire sqs message in write to s3
Feb 8, 2024
d208018
? syntax to make this much more elegant
Feb 8, 2024
87aeef8
lint cleanup
Feb 8, 2024
eb62519
catch failed sqs parsing just in case
Feb 8, 2024
2e0e539
merge master
Feb 9, 2024
562c29b
small change to clarify sf lambda ingest error, very not finished
Feb 9, 2024
ed4dbdb
info not log
Feb 10, 2024
5ebb85d
misunderstood syntax fixed
Feb 12, 2024
3c3c49b
parse failure clarification log
Feb 12, 2024
09d38de
merge master
Feb 12, 2024
f4ded49
remove debug elements from sqs to db
Feb 12, 2024
b3d55f7
capture data on write-to-s3 side
Feb 13, 2024
cdb478e
need to await an async
Feb 13, 2024
30ea8ab
transition parsing of message contents to s3 side WIP
Feb 13, 2024
927a85f
cleanup and tests for moving meta capture to s3 side
Feb 14, 2024
898867b
bring error out from body to top level in s3
Feb 14, 2024
1667922
adding jsdocs
Feb 14, 2024
6f3a015
fix up of integration test for DLA
Feb 14, 2024
54f6482
linter cleanup
Feb 14, 2024
fabe8e4
merge master
Feb 14, 2024
41b04db
changelog
Feb 14, 2024
9777edd
re-packaging to message body in write-db-dlq to ensure structure prom…
Feb 14, 2024
67e2683
semicolon forgotten
Feb 14, 2024
e445c40
expanded tests to look at details in parsing
Feb 14, 2024
88ad5eb
better message in integration test
Feb 14, 2024
68bd956
mistake in jsdoc string
Feb 14, 2024
9d75e19
typo in sqs jsdoc
Feb 14, 2024
1e74ab7
moved changelog section to non-breaking
Feb 15, 2024
14aeed4
remove deprecated getS3Object
Feb 16, 2024
ce68ca6
jsdoc typing
Feb 16, 2024
b62fa97
bare form type checking in DeadLetterMessage
Feb 19, 2024
995ea46
linter error fixes
Feb 19, 2024
dee822e
loop to map
Feb 19, 2024
53d7fa3
linter fix
Feb 19, 2024
78e8ea8
remove old integration test as duplicate
Feb 19, 2024
6abfd3b
remove exports and declare function as const
Feb 19, 2024
ab3431f
linter errors
Feb 19, 2024
8f5af33
moving type checkeres to their file in messages.d.ts
Feb 19, 2024
19ab674
linter error
Feb 19, 2024
d6c4e8b
move checkers out of .d.ts files
Feb 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 5 additions & 9 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,11 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).

### Breaking Changes

- **CUMULUS-2889**
- Removed unused CloudWatch Logs AWS SDK client. This change removes the CloudWatch Logs
client from the `@cumulus/aws-client` package.
- **CUMULUS-2890**
- Removed unused CloudWatch AWS SDK client. This change removes the CloudWatch client
from the `@cumulus/aws-client` package.

### Changed

### Changed
- **CUMULUS-3456**
- Added stateMachine, collection, execution, granules, error fields to Dead Letter Archive message
- Added cumulusError field to records in sfEventSqsToDbRecordsDeadLetterQueue
- **CUMULUS-3245**
- Update `@cumulus/lzards-backup` task to either respect the `lzards_provider`
terraform configuration value or utilize `lzardsProvider` as part of the task
Expand All @@ -26,12 +22,12 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
- Update incorrect docstring

### Changed

- **CUMULUS-3497**
- Updated `example/cumulus-tf/orca.tf` to use v9.0.4
- **CUMULUS-3527**
- Added suppport for additional kex algorithms in the sftp-client.


## [v18.2.0] 2023-02-02

### Migration Notes
Expand Down
128 changes: 64 additions & 64 deletions example/spec/parallel/dbRecords/DbRecordsDLQSpec.js
Original file line number Diff line number Diff line change
@@ -1,93 +1,93 @@
'use strict';

const { fakeFileFactory, fakeGranuleFactoryV2 } = require('@cumulus/api/lib/testUtils');
const { SQS } = require('@cumulus/aws-client');
const { lambda, s3 } = require('@cumulus/aws-client/services');
const { randomString } = require('@cumulus/common/test-utils');
const {
deleteS3Object,
listS3ObjectsV2,
getObject,
getObjectStreamContents,
} = require('@cumulus/aws-client/S3');
const { waitForListObjectsV2ResultCount } = require('@cumulus/integration-tests');

const { loadConfig } = require('../../helpers/testUtils');

describe('When a bad record is sent on the DLQ', () => {
let beforeAllSucceeded = false;
describe('when a bad record is ingested', () => {
let stackName;
let systemBucket;
let dbRecordsDLQUrl;
let executionName;
let failedMessageS3Key;

let beforeAllSucceeded = false;
beforeAll(async () => {
const config = await loadConfig();
stackName = config.stackName;
systemBucket = config.bucket;

const DLQName = `${stackName}-sfEventSqsToDbRecordsDeadLetterQueue`;
dbRecordsDLQUrl = await SQS.getQueueUrlByName(DLQName);

const granuleId = randomString(10);
const files = [fakeFileFactory()];
const granule = fakeGranuleFactoryV2({ files, granuleId, published: false });

executionName = `execution-${randomString(16)}`;

const failingMessage = {
cumulus_meta: {
workflow_start_time: 122,
cumulus_version: '8.0.0',
state_machine: 'arn:aws:states:us-east-1:1234:execution:state-machine-name:execution-name',
execution_name: executionName,
},
meta: {
status: 'failed',
collection: 'bad-collection',
provider: 'fake-provider',
},
payload: {
granules: [granule],
},
};

// Send the message directly on the DLQ. Sending the message on the input queue results in an
// extremely long-duration, unreliable test (>20 mins) because updates to the redrive policy
// are very slow and unreliable and our normal visibilityTimeout and maxReceiveCount are high.
await SQS.sendSQSMessage(dbRecordsDLQUrl, JSON.stringify(failingMessage));
beforeAllSucceeded = true;
systemBucket = config.bucket;
const { $metadata } = await lambda().invoke({
FunctionName: `${stackName}-sfEventSqsToDbRecords`,
InvocationType: 'RequestResponse',
Payload: JSON.stringify({
env: {},
Records: [{
Body: JSON.stringify({
detail: {
status: 'RUNNING',
input: JSON.stringify({
cumulus_meta: {
execution_name: executionName,
},
a: 'sldkj',
}),
},
}),
}],
}),
});
if ($metadata.httpStatusCode < 400) {
beforeAllSucceeded = true;
}
});

afterAll(async () => {
await deleteS3Object(
systemBucket,
failedMessageS3Key
);
});

describe('the writeDbDlqRecordstoS3 lambda', () => {
it('takes the message off the queue and writes it to S3', async () => {
if (!beforeAllSucceeded) fail('beforeAll() failed');
else {
console.log(`Waiting for the creation of failed message for execution ${executionName}`);
const prefix = `${stackName}/dead-letter-archive/sqs/${executionName}`;
try {
await expectAsync(waitForListObjectsV2ResultCount({
bucket: systemBucket,
prefix,
desiredCount: 1,
interval: 5 * 1000,
timeout: 30 * 1000,
})).toBeResolved();
// fetch key for cleanup
const listResults = await listS3ObjectsV2({
Bucket: systemBucket,
Prefix: prefix,
});
failedMessageS3Key = listResults[0].Key;
} catch (error) {
fail(`Did not find expected S3 Object: ${error}`);
}
it('is sent to the DLA and processed to have expected metadata fields', async () => {
if (!beforeAllSucceeded) fail('beforeAll() failed');
console.log(`Waiting for the creation of failed message for execution ${executionName}`);
const prefix = `${stackName}/dead-letter-archive/sqs/${executionName}`;
try {
await expectAsync(waitForListObjectsV2ResultCount({
bucket: systemBucket,
prefix,
desiredCount: 1,
interval: 5 * 1000,
timeout: 30 * 1000,
})).toBeResolved();
// fetch key for cleanup
const listResults = await listS3ObjectsV2({
Bucket: systemBucket,
Prefix: prefix,
});
failedMessageS3Key = listResults[0].Key;
} catch (error) {
fail(`Did not find expected S3 Object: ${error}`);
}
const s3Object = await getObject(
s3(),
{
Bucket: systemBucket,
Key: failedMessageS3Key,
}
});
);
const fileBody = await getObjectStreamContents(s3Object.Body);

const parsed = JSON.parse(fileBody);
expect(parsed.stateMachine).toEqual('unknown');
expect(parsed.collection).toEqual('unknown');
expect(parsed.execution).toEqual('unknown');
expect(parsed.granules).toEqual('unknown');
expect(parsed.error).toEqual('CumulusMessageError: getMessageWorkflowStartTime on a message without a workflow_start_time');
});
});
21 changes: 20 additions & 1 deletion packages/api/lambdas/sf-event-sqs-to-db-records/index.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//@ts-check

'use strict';

const get = require('lodash/get');
Expand Down Expand Up @@ -120,7 +122,17 @@ const writeRecords = async ({
testOverrides,
});
};
/**
* @typedef {import('aws-lambda').SQSRecord} SQSRecord
*/

/**
* Lambda handler for #TODO
*
* @param {Object} event - Input payload object
* @param {Array<SQSRecord | AWS.SQS.Message>} [event.Records] set of sqsMessages
* @returns {Promise<void>}
*/
const handler = async (event) => {
const knex = await getKnexClient({
env: {
Expand All @@ -134,6 +146,7 @@ const handler = async (event) => {

await Promise.all(sqsMessages.map(async (message) => {
let cumulusMessage;

const executionEvent = parseSQSMessageBody(message);
try {
cumulusMessage = await getCumulusMessageFromExecutionEvent(executionEvent);
Expand All @@ -145,7 +158,13 @@ const handler = async (event) => {
return await writeRecords({ ...event, cumulusMessage, knex });
} catch (error) {
log.error(`Writing message failed: ${JSON.stringify(message)}`, error);
return sendSQSMessage(process.env.DeadLetterQueue, message);
return sendSQSMessage(
process.env.DeadLetterQueue,
{
...message,
error: error.toString(),
}
);
}
}));

Expand Down
108 changes: 107 additions & 1 deletion packages/api/lambdas/write-db-dlq-records-to-s3.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//@ts-check

'use strict';

const get = require('lodash/get');
Expand All @@ -8,7 +10,99 @@ const { s3PutObject } = require('@cumulus/aws-client/S3');
const { parseSQSMessageBody } = require('@cumulus/aws-client/SQS');
const { getMessageExecutionName } = require('@cumulus/message/Executions');
const { unwrapDeadLetterCumulusMessage } = require('@cumulus/message/DeadLetterMessage');
const { getCumulusMessageFromExecutionEvent } = require('@cumulus/message/StepFunctions');

/**
*
* @typedef {import('@cumulus/types').MessageGranule} MessageGranule
* @typedef {{granules: Array<MessageGranule>}} PayloadWithGranules
*/

/**
* @param {unknown} payload
* @returns {payload is PayloadWithGranules}
*/
function payloadHasGranules(payload) {
return (
payload instanceof Object
&& 'granules' in payload
&& Array.isArray(payload.granules)
);
}
/**
*
* @typedef {import('aws-lambda').EventBridgeEvent} EventBridgeEvent
*/
/**
* @param {{ [key: string]: any }} event
* @returns {event is EventBridgeEvent}
*/
const isEventBridgeLike = (event) => {
if (!(event instanceof Object && 'detail' in event && event.detail instanceof Object)) {
return false;
}
if (event.detail.status === 'RUNNING' && !('input' in event.detail)) {
return false;
}
if (event.detail.status === 'SUCCEEDED' && !('output' in event.detail)) {
return false;
}
return true;
};

/**
* Reformat object with key attributes at top level.
*
* @param {{ [key: string]: any }} messageBody - event bridge event as defined in aws-lambda
* @returns {Promise<Object>} - message packaged with metadata or 'unknown' where metadata not found
* {
* error: <errorString | 'unknown'>
* collection: <collectionName | 'unknown'>
* granules: <[granuleIds, ...] | 'unknown'>
* execution: <executionArn | 'unknown'>
* stateMachine: <stateMachineArn | 'unknown'>
* ...originalAttributes
* }
*/
async function formatCumulusDLAObject(messageBody) {
const execution = messageBody?.detail?.executionArn || 'unknown';
const stateMachine = messageBody?.detail?.stateMachineArn || 'unknown';

let cumulusMessage;
try {
if (isEventBridgeLike(messageBody)) {
cumulusMessage = await getCumulusMessageFromExecutionEvent(messageBody);
} else {
cumulusMessage = null;
}
} catch {
cumulusMessage = null;
}

const collection = cumulusMessage?.meta?.collection?.name || 'unknown';
let granules;

const payload = cumulusMessage?.payload;
if (payloadHasGranules(payload)) {
granules = payload.granules.map((granule) => granule?.granuleId || 'unknown');
} else {
granules = 'unknown';
}

return {
...messageBody,
collection,
granules,
execution,
stateMachine,
};
}
/**
* Determine execution name from body
*
* @param {Object} cumulusMessageObject - cumulus message
* @returns {string} - <executionName | 'unknown'>
*/
function determineExecutionName(cumulusMessageObject) {
try {
return getMessageExecutionName(cumulusMessageObject);
Expand All @@ -17,7 +111,17 @@ function determineExecutionName(cumulusMessageObject) {
return 'unknown';
}
}
/**
* @typedef {import('aws-lambda').SQSRecord} SQSRecord
*/

/**
* Lambda handler for saving DLQ reports to DLA in s3
*
* @param {Object} event - Input payload object
* @param {Array<SQSRecord | AWS.SQS.Message>} [event.Records] set of sqsMessages
* @returns {Promise<void>}
*/
async function handler(event) {
if (!process.env.system_bucket) throw new Error('System bucket env var is required.');
if (!process.env.stackName) throw new Error('Could not determine archive path as stackName env var is undefined.');
Expand All @@ -28,10 +132,11 @@ async function handler(event) {
const executionName = determineExecutionName(cumulusMessageObject);
// version messages with UUID as workflows can produce multiple messages that may all fail.
const s3Identifier = `${executionName}-${uuidv4()}`;
const massagedMessage = await formatCumulusDLAObject(messageBody);
await s3PutObject({
Bucket: process.env.system_bucket,
Key: `${process.env.stackName}/dead-letter-archive/sqs/${s3Identifier}.json`,
Body: sqsMessage.body,
Body: JSON.stringify(massagedMessage),
});
}));
}
Expand All @@ -40,4 +145,5 @@ module.exports = {
determineExecutionName,
handler,
unwrapDeadLetterCumulusMessage,
formatCumulusDLAObject,
};