Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Commanded.aggregate_state does not work when aggregate identity has a prefix #516

Open
dvic opened this issue Dec 8, 2022 · 10 comments
Open

Comments

@dvic
Copy link
Contributor

dvic commented Dec 8, 2022

I noticed that Commanded.aggregate_state does not work with aggregates that have an identity prefix defined in the router. The returned value is an empty aggregate, because it thinks it's a non-started aggregate.

There are few things here I'd like to discuss:

  • can this "bug" even be fixed? with only the application and aggregate we don't necessarily have access to the routing info right? so is the "fix" then to change the argument from aggregate_uuid :: Aggregate.uuid() to stream_uuid :: String.t()? (so that the caller has to prefix the aggregate themselves)

  • shouldn't this function raise or return an error when an invalid ID is given instead of an empty aggregate?

@danhawkins
Copy link

You can just call aggregate_state with the prefixed id, so instead of calling aggregate_state(AggregateModule, "<uuid>") use aggregate_state(AggregateModule, "<prefix>-<uuid>")

@dvic
Copy link
Contributor Author

dvic commented Dec 8, 2022

yeah exactly, that's also my workaround, but then the docs and typespec of that function need to be changed

@dvic
Copy link
Contributor Author

dvic commented Dec 8, 2022

But now i'm also a bit confused about whether or not having a router on a commanded application is required or not, it seems that it's technically not required for dispatching but if you don't do this some features don't work (fetching of state but also EventAssertions.wait_for_event)

@dvic
Copy link
Contributor Author

dvic commented Dec 8, 2022

Ok, I have reached the conclusion that basically Commanded expects that the routing info is present on the application because otherwise process managers can't dispatch commands. It does make me wonder though why the public dispatch api even exists on routers directly (this seems to suggest applications don't have to have the routing info per se).

In other words: I think we can assume that the routing information is present on the application and thus I can submit a PR that looks up the prefix from the application composite router (from a compile-time exposed __registered_prefixes__ function or something like that).

@slashdotdash
Copy link
Member

It does make me wonder though why the public dispatch api even exists on routers directly (this seems to suggest applications don't have to have the routing info per se).

The command router module existed before Commanded added the application module to support multiple applications and event stores.

It is still possible to dispatch commands directly via a router but you must pass the Commanded application as an option:

:ok = BankRouter.dispatch(command, application: BankApp)

@dvic
Copy link
Contributor Author

dvic commented Dec 8, 2022

Yeah, that's true, but still, you can't use Process managers. Should I open a PR to document this? Or should we allow process managers to be mounted on routers? I guess they only need the dispatch info?

@slashdotdash
Copy link
Member

To fix the issue we'd need to add an optional argument to the function to specify the aggregate prefix as it is not possible to get the prefix except via a command registered with a router.

@dvic
Copy link
Contributor Author

dvic commented Dec 8, 2022

To fix the issue we'd need to add an optional argument to the function to specify the aggregate prefix as it is not possible to get the prefix except via a command registered with a router.

If we can't make the assumption generally that the router is embedded on the application, then indeed it's not possible. But if we have the router then we can expose the identities at compile time just like __registered_commands__ right? Then we could make it work. The only questions (also a more general one): do we want to keep the current behaviour and document it, with this I mean that process managers and Commanded.aggregate_state do not work if you don't embed a router on your application. The latter could be fixed by extending the parameter to stream_uuid instead of aggregate_uuid (i.e., make the caller responsible for prepending the prefix).

@dvic
Copy link
Contributor Author

dvic commented Dec 9, 2022

Regarding the process manager, I see #517 was opened.

@thetamind
Copy link
Contributor

thetamind commented Dec 24, 2022

Follow me along the journey I took to use Commanded.aggregate_state with identity_prefix...

Terminology

If I understand correctly...
The term aggregate_uuid refers to the aggregate identity including the optional prefix.
Reading the middleware documentation...

identity_prefix` - an optional prefix to the aggregate's identity.

And the result of joining the optional prefix is also called the "aggregate's identity". In code the value is again labeled aggregate_uuid.

The router identify macro configures how the aggregate_uuid is derived from a command.

identify BankAccount,
  by: :account_number,
  prefix: "bank-account-"

The command is dispatched and the default middleware ExtractAggregateIdentity reads the config (identity and identity_prefix) from the Pipeline. The value is labeled aggregate_uuid before and after the prefix has been applied. Then it is assigned to the middleware pipeline under the key aggregate_uuid. From this point forward prefix is no longer a concept and aggregate_uuid is the full identity for the aggregate and associated stream.

def before_dispatch(%Pipeline{} = pipeline) do
    with aggregate_uuid when aggregate_uuid not in [nil, ""] <- extract_aggregate_uuid(pipeline),
         aggregate_uuid when is_binary(aggregate_uuid) <- identity_to_string(aggregate_uuid),
         aggregate_uuid when is_binary(aggregate_uuid) <- prefix(aggregate_uuid, pipeline) do
      assign(pipeline, :aggregate_uuid, aggregate_uuid)

The aggregate_uuid is used to open the aggregate.

{:ok, ^aggregate_uuid} =
      Commanded.Aggregates.Supervisor.open_aggregate(
        application,
        aggregate_module,
        aggregate_uuid
      )

And aggregate_uuid directly populates stream_uuid at the event store boundary.

case EventStore.stream_forward(
        application,
        aggregate_uuid,
        aggregate_version + 1,
        @read_event_batch_size
      ) do

Documentation may be unclear

This paragraph under 'Define aggregate identity' in the router docs could produce misunderstanding.

The prefix is used as the stream identity when appending and reading the
aggregate's events: "<identity_prefix><aggregate_uuid>". It can be a string or
a zero arity function returning a string.

The previous paragraph describes the aggregate identity. In the simple case, the identity is typically an aggregate id field, hence aggregate_uuid here could be a confusing term. Would this be clearer?

aggregate_uuid = "<identity_prefix><identity>" or aggregate identity = "[<prefix>]<identity>"

And change "The prefix is used as the stream identity" to correct and break into two concepts:

The aggregate identity is composed of the optional prefix and the identity.

The aggregate identity is used as the stream identity when appending and reading the
aggregate's events
OR
The aggregate identity is used to identify the aggregate and is the name of the event stream.
OR
The aggregate identity both the aggregate_uuid of the aggregate and the stream_uuid of the event stream.

So then is the typespec correct because the aggregate_uuid and stream_uuid are equivalent?

I think the identity documentation under the Commands guide is more thorough. Maybe the module docs could link to the guide to better understand the why.

The prefix is used as the stream identity when appending, and reading, the aggregate's events (e.g. <instance_identity>). Note you must not change the stream prefix once you have events persisted in your event store, otherwise the aggregate's events cannot be read from the event store and its state cannot be rebuilt since the stream name will be different.

Oh, I see the reasoning better now. The prefix labels the group of event streams that belong to a certain aggregate module. The rest of the identity (often an UUID from an id field) reflects the specific instance of the aggregate. So the documentation could be clarified regarding a module vs an instance of an aggregate:

  1. prefix: a label for an aggregate module; an aggregate type, a category
  2. identity or id: the identifier of an instance of an aggregate and event stream; commonly the value of an uuid field
  3. These are joined into a string and called the aggregate's identity, aggregate_uuid in code, and stream_uuid in event store

Access aggregate state during command dispatch is direct

  1. After command dispatch

    option returning: :aggregate_state or returning: :execution_result to grab aggregate_uuid.

  2. Dispatch middleware

    Default middleware ExtractAggregateIdentity assigns :aggregate_uuid to the middleware Pipeline in before_dispatch. (identity and identity_prefix config are there too)

Access from elsewhere such as a Process manager is not direct

In the simple case the caller joins the prefix string to provide the aggregate_uuid.
In the complex case the identity config supports zero arity functions. The logic to use the config to extract the aggregate_uuid is trapped inside ExtractAggregateIdentity and can't be reused.

My solution

  1. Simple case: helper function
defmodule Aggregates.UserThing do
  def aggregate_identity(%{user_uuid: user_uuid}) do
    "user-thing-#{user_uuid}"
  end
end

# Can even configure identity in the router now without using prefix key
identify(UserThing, by: &UserThing.aggregate_identity/1)
  1. Complex case: move identity config outside of the router; clone extraction logic
defmodule Aggregates.UserThing do
  def identity_config do
    [
      by: :user_uuid,
      prefix: "user-thing-"
    ]
  end
end

# And then use it in the router
identify(UserThing, UserThing.identity_config())

# Copy logic from Commanded.Middleware.ExtractAggregateIdentity into own module
defmodule MyApp.CommandedIdentity do
  @moduledoc """
  Extracts the target aggregate's identity.
  Logic from `Commanded.Middleware.ExtractAggregateIdentity`.
  """

  defmodule Config do
    @enforce_keys [:command, :identity, :identity_prefix]
    defstruct [:command, :identity, :identity_prefix]
  end

  alias MyApp.CommandedIdentity.Config

  def aggregate_uuid(message, opts) when is_list(opts) do
    opts =
      Keyword.new(opts, fn
        {:by, value} -> {:identity, value}
        {:prefix, value} -> {:identity_prefix, value}
      end)
      |> Keyword.put(:command, message)

    config = struct!(Config, opts)
    aggregate_uuid(config)
  end

  def aggregate_uuid(%Config{} = config) do
[...]
  end
end

To use it.

aggregate_uuid = MyApp.CommandedIdentity.aggregate_uuid(command_or_query, UserThing.identity_config())
state = CommandedApp.aggregate_state(UserThing, aggregate_uuid)

I determined I didn't need the complex case complications. I went with option 1 to query a simple aggregate state.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

4 participants