Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove OTEL tracing from our code #1886

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 0 additions & 2 deletions backend/src/api/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { getUnleashClient } from '@crowd/feature-flags'
import { getServiceLogger } from '@crowd/logging'
import { getOpensearchClient } from '@crowd/opensearch'
import { getRedisClient, getRedisPubSubPair, RedisPubSubReceiver } from '@crowd/redis'
import { getServiceTracer } from '@crowd/tracing'
import { ApiWebsocketMessage, Edition } from '@crowd/types'
import bodyParser from 'body-parser'
import bunyanMiddleware from 'bunyan-middleware'
Expand Down Expand Up @@ -37,7 +36,6 @@ import WebSockets from './websockets'
import { databaseInit } from '@/database/databaseConnection'

const serviceLogger = getServiceLogger()
getServiceTracer()

const app = express()

Expand Down
71 changes: 29 additions & 42 deletions backend/src/bin/discord-ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { processPaginated, timeout } from '@crowd/common'
import { RedisCache, getRedisClient } from '@crowd/redis'
import { getChildLogger, getServiceLogger } from '@crowd/logging'
import { PlatformType } from '@crowd/types'
import { SpanStatusCode, getServiceTracer } from '@crowd/tracing'
import { DISCORD_CONFIG, REDIS_CONFIG } from '../conf'
import SequelizeRepository from '../database/repositories/sequelizeRepository'
import IntegrationRepository from '../database/repositories/integrationRepository'
Expand All @@ -15,7 +14,6 @@ import {
getIntegrationStreamWorkerEmitter,
} from '@/serverless/utils/serviceSQS'

const tracer = getServiceTracer()
const log = getServiceLogger()

async function executeIfNotExists(
Expand Down Expand Up @@ -58,51 +56,40 @@ async function spawnClient(

logger.info({ payload }, 'Processing Discord WS Message!')

await tracer.startActiveSpan('ProcessDiscordWSMessage', async (span) => {
try {
const integration = (await IntegrationRepository.findByIdentifier(
guildId,
PlatformType.DISCORD,
)) as any
try {
const integration = (await IntegrationRepository.findByIdentifier(
guildId,
PlatformType.DISCORD,
)) as any

const result = await repo.create({
tenantId: integration.tenantId,
integrationId: integration.id,
type: WebhookType.DISCORD,
payload,
})
const result = await repo.create({
tenantId: integration.tenantId,
integrationId: integration.id,
type: WebhookType.DISCORD,
payload,
})

const streamEmitter = await getIntegrationStreamWorkerEmitter()
const streamEmitter = await getIntegrationStreamWorkerEmitter()

await streamEmitter.triggerWebhookProcessing(
integration.tenantId,
integration.platform,
result.id,
await streamEmitter.triggerWebhookProcessing(
integration.tenantId,
integration.platform,
result.id,
)
} catch (err) {
if (err.code === 404) {
logger.warn({ guildId }, 'No integration found for incoming Discord WS Message!')
} else {
logger.error(
err,
{
discordPayload: JSON.stringify(payload),
guildId,
},
'Error processing Discord WS Message!',
)
span.setStatus({
code: SpanStatusCode.OK,
})
} catch (err) {
if (err.code === 404) {
logger.warn({ guildId }, 'No integration found for incoming Discord WS Message!')
} else {
span.setStatus({
code: SpanStatusCode.ERROR,
message: err,
})
logger.error(
err,
{
discordPayload: JSON.stringify(payload),
guildId,
},
'Error processing Discord WS Message!',
)
}
} finally {
span.end()
}
})
}
}

const client = new Client({
Expand Down
25 changes: 6 additions & 19 deletions backend/src/bin/job-generator.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,19 @@
import { CronJob } from 'cron'
import { getServiceLogger } from '@crowd/logging'
import { SpanStatusCode, getServiceTracer } from '@crowd/tracing'
import jobs from './jobs'

const tracer = getServiceTracer()
const log = getServiceLogger()

for (const job of jobs) {
const cronJob = new CronJob(
job.cronTime,
async () => {
await tracer.startActiveSpan(`ProcessingJob:${job.name}`, async (span) => {
log.info({ job: job.name }, 'Triggering job.')
try {
await job.onTrigger(log)
span.setStatus({
code: SpanStatusCode.OK,
})
} catch (err) {
span.setStatus({
code: SpanStatusCode.ERROR,
message: err,
})
log.error(err, { job: job.name }, 'Error while executing a job!')
} finally {
span.end()
}
})
log.info({ job: job.name }, 'Triggering job.')
try {
await job.onTrigger(log)
} catch (err) {
log.error(err, { job: job.name }, 'Error while executing a job!')
}
},
null,
true,
Expand Down
207 changes: 88 additions & 119 deletions backend/src/bin/nodejs-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import {
receiveMessage,
sendMessage,
} from '@crowd/sqs'
import { SpanStatusCode, getServiceTracer } from '@crowd/tracing'
import moment from 'moment'
import { SQS_CONFIG } from '../conf'
import { processDbOperationsMessage } from '../serverless/dbOperations/workDispatcher'
Expand All @@ -21,7 +20,6 @@ import { SQS_CLIENT } from '@/serverless/utils/serviceSQS'

/* eslint-disable no-constant-condition */

const tracer = getServiceTracer()
const serviceLogger = getServiceLogger()

let exiting = false
Expand Down Expand Up @@ -70,55 +68,38 @@ async function handleDelayedMessages() {
const message = await receive(true)

if (message) {
await tracer.startActiveSpan('ProcessDelayedMessage', async (span) => {
try {
const msg: NodeWorkerMessageBase = JSON.parse(message.Body)
const messageLogger = getChildLogger('messageHandler', serviceLogger, {
messageId: message.MessageId,
type: msg.type,
})

if (message.MessageAttributes && message.MessageAttributes.remainingDelaySeconds) {
// re-delay
const newDelay = parseInt(
message.MessageAttributes.remainingDelaySeconds.StringValue,
10,
)
const tenantId = message.MessageAttributes.tenantId.StringValue
messageLogger.debug({ newDelay, tenantId }, 'Re-delaying message!')
await sendNodeWorkerMessage(tenantId, msg, newDelay)
} else {
// just emit to the normal queue for processing
const tenantId = message.MessageAttributes.tenantId.StringValue

if (message.MessageAttributes.targetQueueUrl) {
const targetQueueUrl = message.MessageAttributes.targetQueueUrl.StringValue
messageLogger.debug({ tenantId, targetQueueUrl }, 'Successfully delayed a message!')
await sendMessage(SQS_CLIENT(), {
QueueUrl: targetQueueUrl,
MessageGroupId: tenantId,
MessageDeduplicationId: `${tenantId}-${moment().valueOf()}`,
MessageBody: JSON.stringify(msg),
})
} else {
messageLogger.debug({ tenantId }, 'Successfully delayed a message!')
await sendNodeWorkerMessage(tenantId, msg)
}
}
const msg: NodeWorkerMessageBase = JSON.parse(message.Body)
const messageLogger = getChildLogger('messageHandler', serviceLogger, {
messageId: message.MessageId,
type: msg.type,
})

await removeFromQueue(message.ReceiptHandle, true)
span.setStatus({
code: SpanStatusCode.OK,
})
} catch (err) {
span.setStatus({
code: SpanStatusCode.ERROR,
message: err,
if (message.MessageAttributes && message.MessageAttributes.remainingDelaySeconds) {
// re-delay
const newDelay = parseInt(message.MessageAttributes.remainingDelaySeconds.StringValue, 10)
const tenantId = message.MessageAttributes.tenantId.StringValue
messageLogger.debug({ newDelay, tenantId }, 'Re-delaying message!')
await sendNodeWorkerMessage(tenantId, msg, newDelay)
} else {
// just emit to the normal queue for processing
const tenantId = message.MessageAttributes.tenantId.StringValue

if (message.MessageAttributes.targetQueueUrl) {
const targetQueueUrl = message.MessageAttributes.targetQueueUrl.StringValue
messageLogger.debug({ tenantId, targetQueueUrl }, 'Successfully delayed a message!')
await sendMessage(SQS_CLIENT(), {
QueueUrl: targetQueueUrl,
MessageGroupId: tenantId,
MessageDeduplicationId: `${tenantId}-${moment().valueOf()}`,
MessageBody: JSON.stringify(msg),
})
} finally {
span.end()
} else {
messageLogger.debug({ tenantId }, 'Successfully delayed a message!')
await sendNodeWorkerMessage(tenantId, msg)
}
})
}

await removeFromQueue(message.ReceiptHandle, true)
} else {
delayedHandlerLogger.trace('No message received!')
}
Expand All @@ -143,83 +124,71 @@ async function handleMessages() {
handlerLogger.info('Listening for messages!')

const processSingleMessage = async (message: SqsMessage): Promise<void> => {
await tracer.startActiveSpan('ProcessMessage', async (span) => {
const msg: NodeWorkerMessageBase = JSON.parse(message.Body)

const messageLogger = getChildLogger('messageHandler', serviceLogger, {
messageId: message.MessageId,
type: msg.type,
})
const msg: NodeWorkerMessageBase = JSON.parse(message.Body)

try {
if (
msg.type === NodeWorkerMessageType.NODE_MICROSERVICE &&
(msg as any).service === 'enrich_member_organizations'
) {
messageLogger.warn(
'Skipping enrich_member_organizations message! Purging the queue because they are not needed anymore!',
)
await removeFromQueue(message.ReceiptHandle)
return
}
const messageLogger = getChildLogger('messageHandler', serviceLogger, {
messageId: message.MessageId,
type: msg.type,
})

messageLogger.info(
{ messageType: msg.type, messagePayload: JSON.stringify(msg) },
'Received a new queue message!',
try {
if (
msg.type === NodeWorkerMessageType.NODE_MICROSERVICE &&
(msg as any).service === 'enrich_member_organizations'
) {
messageLogger.warn(
'Skipping enrich_member_organizations message! Purging the queue because they are not needed anymore!',
)
await removeFromQueue(message.ReceiptHandle)
return
}

let processFunction: (msg: NodeWorkerMessageBase, logger?: Logger) => Promise<void>

switch (msg.type) {
case NodeWorkerMessageType.INTEGRATION_PROCESS:
processFunction = processIntegration
break
case NodeWorkerMessageType.NODE_MICROSERVICE:
processFunction = processNodeMicroserviceMessage
break
case NodeWorkerMessageType.DB_OPERATIONS:
processFunction = processDbOperationsMessage
break
case NodeWorkerMessageType.PROCESS_WEBHOOK:
processFunction = processWebhook
break

default:
messageLogger.error('Error while parsing queue message! Invalid type.')
}

if (processFunction) {
await logExecutionTimeV2(
async () => {
// remove the message from the queue as it's about to be processed
await removeFromQueue(message.ReceiptHandle)
messagesInProgress.set(message.MessageId, msg)
try {
await processFunction(msg, messageLogger)
} catch (err) {
messageLogger.error(err, 'Error while processing queue message!')
} finally {
messagesInProgress.delete(message.MessageId)
}
},
messageLogger,
'Processing queue message!',
)
}
messageLogger.info(
{ messageType: msg.type, messagePayload: JSON.stringify(msg) },
'Received a new queue message!',
)

let processFunction: (msg: NodeWorkerMessageBase, logger?: Logger) => Promise<void>

switch (msg.type) {
case NodeWorkerMessageType.INTEGRATION_PROCESS:
processFunction = processIntegration
break
case NodeWorkerMessageType.NODE_MICROSERVICE:
processFunction = processNodeMicroserviceMessage
break
case NodeWorkerMessageType.DB_OPERATIONS:
processFunction = processDbOperationsMessage
break
case NodeWorkerMessageType.PROCESS_WEBHOOK:
processFunction = processWebhook
break

default:
messageLogger.error('Error while parsing queue message! Invalid type.')
}

span.setStatus({
code: SpanStatusCode.OK,
})
} catch (err) {
span.setStatus({
code: SpanStatusCode.ERROR,
message: err,
})
messageLogger.error(err, { payload: msg }, 'Error while processing queue message!')
} finally {
span.end()
if (processFunction) {
await logExecutionTimeV2(
async () => {
// remove the message from the queue as it's about to be processed
await removeFromQueue(message.ReceiptHandle)
messagesInProgress.set(message.MessageId, msg)
try {
await processFunction(msg, messageLogger)
} catch (err) {
messageLogger.error(err, 'Error while processing queue message!')
} finally {
messagesInProgress.delete(message.MessageId)
}
},
messageLogger,
'Processing queue message!',
)
}
})
} catch (err) {
messageLogger.error(err, { payload: msg }, 'Error while processing queue message!')
}
}

// noinspection InfiniteLoopJS
Expand Down