Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka circulars #2248

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open

Kafka circulars #2248

wants to merge 4 commits into from

Conversation

dakota002
Copy link
Contributor

@dakota002 dakota002 commented May 1, 2024

Description

Adds a table-streams event for Circulars over Kafka

This is currently a draft as there is either an issue in a dependency (possibly in cpp-zst-node16-fork or bindings) or in how we are building out these table-streams functions that is causing errors.

Currently investigating how cpp-zst-node16-fork uses the bindings package and why it works in some projects and not others

Related Issue(s)

Resolves #686

Testing

To get this to work in local dev setting there are a few things that need to be modified under node_modules:

  • @remix-run/dev/dist/devServer_unstable/index.js -> line 139 sets stdio: pipe, change this to
      stdin: process.stdin,
      stdout: process.stdout,

in order to use the plugin-lambda-invoker. Relates to remix-run/remix#9293

  • Add the following lines to the try array defined on the defaults object in bindings/bindings.js (currently trying to resolve this):
// Not needed anymore
      ['module_root', 'node_modules', 'cppzst-node-16-fork', 'build', 'Debug', 'bindings'],
      ['module_root', 'node_modules', 'cppzst-node-16-fork', 'build', 'Release', 'bindings'],

Once those are in place, test by:

  • Start a basic kafka consumer pointed to the dev server
  • press i to trigger the invoker
  • select @tables-streams circulars-kafka-distribution

Edit: I may have resolved the zstd issue

@lpsinger
Copy link
Member

lpsinger commented May 1, 2024

The build job is failing with C++ compilation errors when building cpp-zst-node16-fork. Based on the compilation errors, I suspect that fixing it might be as simple as adding -std=c++11 to the compiler flags, though I don't know how to do that with node-gyp off the top of my head.

As for the binding path issues, the fundamental problem is that this Zstandard compressor/decompressor library contains a native Node addon which is our bundler (esbuild) does not know how to handle. I think it should be sufficient to copy the native addons to the appropriate build output directory in https://github.com/nasa-gcn/gcn.nasa.gov/blob/main/esbuild.js.

However, given the installation challenges, it may be worthwhile checking if there is a pure JavaScript or a WASM implementation of Zstandard that we could use instead, which would be trivial to install and bundle/tree-shake correctly.

@dakota002
Copy link
Contributor Author

dakota002 commented May 1, 2024

I think I got it working. The esbuild solution was a much easier find than the previous issues I was encountering. I adapted this https://esbuild.github.io/content-types/#file based on a comment I came across somewhere.

Edit: I am unsure about how this will function when deployed, there is a generated file that may need to be in a different directory than where it is currently getting placed

package.json Outdated Show resolved Hide resolved
package.json Outdated Show resolved Hide resolved
@dakota002 dakota002 marked this pull request as ready for review May 3, 2024 18:44
@dakota002 dakota002 requested a review from lpsinger May 3, 2024 18:44
@dakota002 dakota002 marked this pull request as draft May 3, 2024 19:18
@dakota002
Copy link
Contributor Author

Marked as ready too soon, need to fix something else first

Copy link
Member

@lpsinger lpsinger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than add a new Lambda, I would rather that we use the existing Lambda in app/table-streams/circulars. Look at app/scheduled/circulars for ideas about how to structure it. I imagine something roughly like:

// app/table-streams/circulars/actions/search.ts
export async function searchAction(...) {
  ...
}

// app/table-streams/circulars/actions/kafka
export async function kafkaAction(...) {
}

// app/table-streams/circulars/index.ts
import { kafkaAction } from './actions/kafka'
import { searchAction } from './actions/search'
import { createTriggerHandler } from '~/lib/lambdaTrigger.server'

export const handler = createTriggerHandler(
  async (record: DynamoDBRecord) => {
    Promise.all([kafkaAction, searchAction].map((action) => action(record)))
  }
)

esbuild.js Outdated Show resolved Hide resolved
@dakota002 dakota002 marked this pull request as ready for review May 16, 2024 14:28
@dakota002 dakota002 requested a review from lpsinger May 16, 2024 14:42
@dakota002 dakota002 force-pushed the KafkaCirculars branch 2 times, most recently from 249c85c to fcc7e83 Compare May 20, 2024 14:12
app/lib/lambdaTrigger.server.ts Outdated Show resolved Hide resolved
Comment on lines 23 to 33
await producer.connect()
await producer.send({
topic,
messages: [
{
key: 'message',
value: JSON.stringify(circular),
},
],
})
await producer.disconnect()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please close the Kafka connection only when the Lambda runtime is terminating. That way we can amortize the cost of opening the connection across multiple Lambda invocations, only paying it once per cold start.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this best done by not calling the disconnect?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably should call disconnect when the Lambda runtime is terminating gracefully. For Python Lambdas, AWS sends a SIGTERM signal to the function. The documentation doesn't say that it does that for Node.js Lamdbas, but it probably does.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a useful example, I will see if I can implement something similar

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I am not sure if this will work as we think. Without the disconnect, the function hangs and throws an error
image

I'll do some more testing though to be sure

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I have something in the latest commit that should work. The listener gets added in, but locally there is no SIGTERM in sandbox. I'm not 100% confident in this implementation, but it should work

app/lib/kafka.server.ts Outdated Show resolved Hide resolved
app/lib/kafka.server.ts Outdated Show resolved Hide resolved
app/lib/kafka.server.ts Show resolved Hide resolved
app/lib/kafka.server.ts Outdated Show resolved Hide resolved
app/lib/kafka.server.ts Outdated Show resolved Hide resolved
@dakota002 dakota002 requested a review from lpsinger May 21, 2024 19:05
app/lib/env.server.ts Outdated Show resolved Hide resolved
app/lib/env.server.ts Show resolved Hide resolved
app/lib/env.server.ts Show resolved Hide resolved
app/lib/kafka.server.ts Outdated Show resolved Hide resolved
app/lib/kafka.server.ts Outdated Show resolved Hide resolved
app/lib/kafka.server.ts Outdated Show resolved Hide resolved
app/lib/kafka.server.ts Outdated Show resolved Hide resolved
app/lib/kafka.server.ts Outdated Show resolved Hide resolved
app/lib/kafka.server.ts Outdated Show resolved Hide resolved
@lpsinger
Copy link
Member

Does this work now?

app/root.tsx Outdated Show resolved Hide resolved
Co-authored-by: Leo Singer <leo.p.singer@nasa.gov>

Consolidate handlers

Update esbuild to not put the generated .node file in build directory

package lock

Adds circulars option to UI

undo some moved code, document vars, update domain to use hostname

Domain functions, trying to hook into SIGTERM

Update test

domain updates and reorder kafka function args

memoizee

Clean up Kafka sending

- Type safety for getHostname
- Memoize Kafka producer connection, disconnect before exit
- Rename sendKafka to send --- it's implicit in the filename that
  this function sends Kafka records
esbuild.js Show resolved Hide resolved
…-directories

Copy node API modules to output dirs using an esbuild plugin
@lpsinger
Copy link
Member

@dakota002, does this work now?

Note that you will need #2296 so that our build pipeline installs libraries for the correct OS and CPU.

@dakota002
Copy link
Contributor Author

@dakota002, does this work now?

Note that you will need #2296 so that our build pipeline installs libraries for the correct OS and CPU.

I need to commit some of my local changes and run some testing on this, but I will let you know shortly! It was working as expected before these updates too

@dakota002
Copy link
Contributor Author

@lpsinger This still doesn't disconnect locally. It may work when deployed though since there will be different emitted events and not just an invoked function

@lpsinger
Copy link
Member

@lpsinger This still doesn't disconnect locally. It may work when deployed though since there will be different emitted events and not just an invoked function

Does it disconnect locally if you hook into the SIGTERM signal?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Distribute Circulars via Kafka
2 participants