Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRAFT: dmx using khepri and RA #253

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft

Conversation

SimonUnge
Copy link
Collaborator

@SimonUnge SimonUnge commented Oct 4, 2023

Proposed Changes

NOTE THE CODE IS NOT UP FOR REVIEW AT THIS POINT.

High level design:

  • store msg metadata in khepri
    • unique key
    • time when to deliver msg
    • Exchange info
  • store msg binary in local KV storage
    • Currently using leveled on each node
    • Currently using RA to distribute the msg binary for local storage in KV store (not a good solution)
  • Periodic check if any msg needs to be routed
    • Check khepri for any keys with a time value that has passet its due time
    • For each key, fetch the msg binary from local KV storage and route
    • Delete entry

TODO:

  • Have one process for accepting new DMXs and one process for periodically check for msg to route
  • Use streams to distribute the msg binary for storage in local KV store (So we need a process that consumes the stream msg, stores it in the KV, and keeps track of the index
  • Should all nodes do the check, and route msgs for queues they are the owner of?
  • Figure out how to 'delete' a message in KV store in the cluster. (periodic clean?)
  • Clean up the code heavily
  • Tooling! More and better meta data
  • Tooling! Inspect one or subset of messages.
  • Tooling! Update and delete delayed messages
  • Use Khepri PROJECTIONS?

Types of Changes

What types of changes does your code introduce to this project?
Put an x in the boxes that apply

  • Bug fix (non-breaking change which fixes issue #NNNN)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause an observable behavior change in existing systems)
  • Documentation improvements (corrections, new content, etc)
  • Cosmetic change (whitespace, formatting, etc)

Checklist

Put an x in the boxes that apply. You can also fill these out after creating
the PR. If you're unsure about any of them, don't hesitate to ask on the
mailing list. We're here to help! This is simply a reminder of what we are
going to look for before merging your code.

  • I have read the CONTRIBUTING.md document
  • I have signed the CA (see https://cla.pivotal.io/sign/rabbitmq)
  • All tests pass locally with my changes
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)
  • Any dependent changes have been merged and published in related repositories

Further Comments

If this is a relatively large or complex change, kick off the discussion by
explaining why you chose the solution you did and what alternatives you
considered, etc.

@SimonUnge SimonUnge changed the title WIP: draft DRAFT: dmx using khepri and RA Oct 4, 2023
case khepri:start(?RA_SYSTEM, RaServerConfig) of
{ok, ?STORE_ID} ->
wait_for_leader(),
%register_projections() would this be needed for dmx?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we need a projection into an ordered ets table to have efficient lookup on the timestamp comparison?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this is a very very very rough PR, so I did not expect to see any comments on the code :)! Hence the draft state. I am more interested in comments on the description!

@@ -50,28 +48,12 @@
delay()) ->
nodelay | {ok, t_reference()}.

-define(TABLE_NAME, append_to_atom(?MODULE, node())).
-define(INDEX_TABLE_NAME, append_to_atom(?TABLE_NAME, "_index")).
-define(Timeout, 5000).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wonder if timeout should be 1 second as that is the resolution of the x-delay parameter. (Maybe some users would be surprised/unhappy if their 1 second delay is delivered after 5 seconds instead)
The previous solution of keeping track of the closest timestamp was also clever. Do you think it added too much complexity? (It definitely makes it necessary to keep the writer and reader in the same process)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeout can be configurable. It does not add complexity (other than an upper limit), but is a different design compared to the one in the design doc.

ok.


maybe_resize_cluster() ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case of stream_coordinator the leader is moved from a node which is put in maintenance, maybe that behaviour should also be replicated here

delivery = '_', ref = '_'},
Delays = mnesia:dirty_select(?TABLE_NAME, [{MatchHead, [], [true]}]),
messages_delayed(_Exchange) ->
%% ExchangeName = Exchange#exchange.name,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this used to be a major performance culprit, that needed a full table scan. I think we should keep track of counts in separate counters. (not sure if counters should be persisted or they can be memory-only but calculated when the data is loaded at startup)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is just old code. This is very very very much an early DRAFT.

Comments are more than welcome, but would prefer if they were on the higher level design (basically the description)

end.
Key = make_key(DelayTS, Exchange),
Mod:put([delayed_message_exchange, Key, delivery_time], DelayTS),
Mod:put([delayed_message_exchange, Key, exchange], Exchange),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure how leveled works (I assume it does not keep all data in memory) although exchange is a metadata, to save some memory it could be stored in the kv_store along with the Message.
(Alternatively if exchange is stored in khepri, expired messages could be fetched grouped by exchange. Then looking up routing for an exchange can be done only once, sparing a few lookups (I think that's what the old implementation also did - might be just premature optimisation))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants