Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support reading a stream backwards #47

Open
nordfjord opened this issue Nov 17, 2022 · 8 comments
Open

Support reading a stream backwards #47

nordfjord opened this issue Nov 17, 2022 · 8 comments

Comments

@nordfjord
Copy link

nordfjord commented Nov 17, 2022

Some entities have events that could be considered reset events. That is, after these events are appended the prior history ceases to matter in regards to the current state of the entity. TodoListCleared is an example of this.

It could be extremely useful to read a stream backwards in these cases as it can minimize roundtrips to the database.

I propose we add a function that allows reading a stream backwards from a position where a position of -1 indicates the end of the stream. Messages should be returned in descending position order.

Imagine a stream with 2100 events. In order to reconstitute its events we'd need three calls (given a batch size of 1000)

-- read 1000 messages backwards from end
select * from get_stream_messages_backwards('Stream-123', -1, 1000)
-- positions are inclusive so we read from (1100 - 1)
select * from get_stream_messages_backwards('Stream-123', 1099)
select * from get_stream_messages_backwards('Stream-123', 98)

An example query:

SELECT * -- replace with actual columns
FROM messages
WHERE stream_name = $1
  AND position <= $2
ORDER BY position DESC
LIMIT $3;

Reasons to avoid this:

  • Streams should short by nature, and message-db should not encourage technical solutions to modeling problems
  • Reading backwards might have worse performance characteristics due to BTree indices being optimized for sequential forward reads
@sbellware
Copy link
Contributor

Hi @nordfjord.

I'm not sure I understand the use case, and what's causing multiple roundtrips to the database when a "reset" event is encountered during projection.

I could imagine that when, for example, a TodoListCleared event is projected, that the to-do list entity's state would be affected such that it would be "cleared".

Are you describing a case where you'd like to start projecting from the most recent TodoListCleared event?

@nordfjord
Copy link
Author

what's causing multiple roundtrips to the database when a "reset" event is encountered during projection.

Sorry for not explaining it properly.

Imagine an events stream for a todo-list:

TodoAdded
TodoAdded
TodoAdded
TodoAdded
TodoAdded
TodoRemoved
TodoAdded
TodoAdded
TodoListCleared
TodoAdded

In order to get the current state of the todo list entity I really only need the last 2 events. The round-trips in question are when reading forwards. Imagine loading forwards with a batch size of 5. I'd read the first batch, then the second batch, and then I'd fold the events into the current state. However because only the last two events matter for the state I will have wasted a round-trip (the fetching of the first page was completely unnecessary)

Using backwards reads I could do something like this instead:

function fetchCurrentState(client, streamName) {
  let version = -1n
  const events = []
  // fetches in a batched manner internally
  for await (const event of client.readStreamBackwards(streamName, version)) {
    // take the version of the first event returned
    if (version === -1n) version = event.position
    events.unshift(event)
    if (Entity.isOrigin(event)) {
      return [version, Entity.fromEvents(events)]
    }
  }
  return [version, Entity.fromEvents(events)]
}

Are you describing a case where you'd like to start projecting from the most recent TodoListCleared event?

Yes.

@sbellware
Copy link
Contributor

The ultimate root cause for wanting to project in reverse is a particular approach to performance enhancement. The other approach to this is caching.

I'd posit that it's specifically because you don't have an entity snapshotting mechanism available to you that the idea of reading backward has occurred to you. Because we tend to operate with a complete set of tools, including snapshotting facilities for prolonged streams, we never worry about reading backward.

Additionally, it's largely an undesirable to only partially project an entity's stream. That entity cannot be considered authoritative except under some very specific (and rare) circumstances.

Consider this sequence of events:

Stream Name: TodoList-123

Initiated
TodoAdded
TodoAdded
TodoRemoved
TodoListCleared
TodoAdded

The Initiated event, or any event prior to some position later in the stream, can confer information to the entity that is critical to an authoritative representation of the TodoList entity.

If those earlier events are not projected, then the entity can't be considered an entity that can be counted upon to answer the questions of the code that presumes to use the entity. That code that uses the entity would have to be written under the proviso that the entity is incomplete and ultimately not completely trustworthy.

In practice, I wouldn't allow an implementation that makes an assumption that a partially-projected entity is complete. In my work, it would be a pattern that would likely be rejected if we came across it during final inspection, except under very rare conditions. I wouldn't allow the deviation from consistent norms into the implementation of a whole system when the implementation can be made "normal" just be using the same small number of mechanisms that are already used in the solution. The performance implications of the "normal" solution are so minor that they're usually irrelevant, and thus not deserving of the introduction of deviations from norm.

That said, if I were to project a stream starting from a point other than the first event in the stream, I would use the get_last_stream_message function with the optional type parameter, and then use that message's position as the starting position to start reading and projecting from. The pattern - irrespective of the implementation - is to first determine the starting position to project from, and then project from there, rather than introduce a special case of reading in reverse.

As a side note: the position -1 already has a reserved meaning in Message DB, and I'd be very hesitant to give that reserved position a different meaning, even if the usage contexts are quite distinct.

@sbellware
Copy link
Contributor

PS: These are really insightful questions that you're asking. The questions, and the subsequent conversations, may be of benefit to a larger audience if they're asked in the Eventide Project's Slack org, rather than here in GitHub issues. Also, you could get the perspective of people other than myself who may have contradictory positions.

@nordfjord
Copy link
Author

I'd posit that it's specifically because you don't have an entity snapshotting mechanism available to you that the idea of reading backward has occurred to you

For full context, this suggestion comes from having recently added message-db support to equinox where it is common to store snapshots on a rolling basis within a stream rather than outside of it. When this snapshotting strategy is employed backwards reads are essential.

I could've implemented a snapshotting strategy that stored the snapshot in a special {category}:snapshot-{stream_id} stream and used the last event in that stream as the snapshot. This would unfortunately incur an additional roundtrip to the database which is something I'm keen to avoid.

Thanks for the slack suggestion, I wasn't aware of the slack, I'll join you there 👍

@sbellware
Copy link
Contributor

sbellware commented Nov 19, 2022

I would VERY STRONGLY advise against storing snapshots (or any other mechanical data) in an entity's stream. There should never be data in an entity's stream other than the data that represents entity state transitions. It's a quick shortcut in framework development to use the same stream, but that's not a legitimate design pressure for an event sourced system. The reason to store a message in an entity's event stream is that the event is an entity's event. A snapshot is not such a thing, and thus should not be conflated with an entity's events. A snapshot record does not pertain to the nature and purpose of an event stream. A snapshot stream pertains to the particular machinations of the tooling used to retrieve events and project them into an entity.

And ultimately, a reader of an event stream should not be required to have knowledge that some tool has dumped immaterial (to the stream and its purpose) records into the stream, and that those events should be disregarded when projecting a stream. A client should not be required to make determinations of whether some events are signal and some events are noise. Inside the boundary of an event stream, it should only ever be presumed that all events are pure signal, and pertinent only to the state changes of the process that the stream models and the state machine entity that can be projected from it.

An entity and its snapshots are not the same concern. They're related and adjacent, but never the same. And entity's process and lifecycle aren't germane to snapshotting. Snapshotting is only ever a mechanical optimization strategy, and not part of an entity stream's nature.

It's utterly unnecessary to read backward to retrieve snapshots. The only necessary operation is to read approximately the last message in a snapshot's stream.

The multiple trips to the database are exceedingly unlikely to cause any noticeable performance impact. There is certainly a measurable performance impact, but what matters is whether it matters. In practice - which is the only thing that matters - the performance impact is imperceptible.

Don't forget that an entity's snapshot should only have to be retrieved once during the runtime lifetime of a service. That's not a one-time hit per projection of an entity, it's a one-time hit per first retrieval of an entity after powering on a service. After than, newer events are the only things that have to be retrieved.

If your implementation doesn't using entity caching, you won't be able to benefit from this optimization pattern. A side effect of this is that you'll end up with a far more complicated implementation with far more special variation to maintain.

It should also be understood that entity snapshots are only ever necessary for ling-lived event streams with great volumes of events, like ledgers and journals. Projecting such a long-lived stream from scratch, of course, is expected to have to make more than a single retrieval of a batch of messages from the message store. It's not considered an abnormality to have to make multiple event batch retrieval calls in an event sourced system.

Of course, if you've gotta process supper-massive volumes of events in near real-time, any of the patterns you've described may make the difference in hundredths of seconds that are needed. But then, I'd posit that Message DB and Postgres are not sufficient to the aspirations of scale and performance sought. At that level of performance aspiration, something like Kstreams and Ktables might be more appropriate, or something like Event Store DB, under the right circumstances (Event Store, being an early innovator, didn't have the benefit of experiences of those who came before, and got a lot of stuff wrong, in my opinion).

I think you're making things much harder on yourself than necessary, and embracing design patterns that introduce more difficulty for not much advantage.

That said, I'd more than welcome the opportunity to have my assertions corrected, but first I'd need to have a specific understanding of what the performance and throughput targets are, and what the tangible, measured impact is on whole system performance when one-time snapshot retrieval is a consideration of long-lived stream projection.

@nordfjord
Copy link
Author

I think it's worth spelling out some things in this issue. I want to make sure that I understand your position. The examples in F# for terseness.

My mental model goes like this:

The basic flow of an event sourced application is to fetch events in a stream, fold them into a state, and make a decision based on that state. This decision is in the form of new events appended to the same stream

let transact client fold initial streamName decide = async {
  // step 1. Load the stream
  let! version, events = client.GetStreamMessages(streamName, 0L)
  // step 2. fold them into current state
  let state = fold initial events
  // step 3. make decision based on the state
  let newEvents = decide state
  // step 4. write the decision to the stream
  return! client.WriteMessages(streamName, newEvents, version) } 

We can add caching as an optimization to this flow

let transact client cache fold initial streamName decide = async {
  // 1. get the cached state from the cache
  let version, cachedState = cache.TryGet(streamName) |> Option.defaultValue (-1L, initial)
  // 2. fetch new events from the stream. This step is optional, 
  // 2. a) an optimization is to not pay this cost unless there's a wrong expected version error
  let! version, additionalEvents = client.GetStreamMessages(streamName, version)
  // fold the newer events into the cached state
  let state = fold cachedState additionalEvents
  // 3. make decision based on the state
  let newEvents = decide state
  // 4. write the decision to the stream
  let! newVersion = client.WriteMessages(streamName, newEvents, version)
  // 5. update the value in the cache
  let newState = fold state newEvents
  cache.Add(streamName, (newVersion, newState)) 
  return newVersion } 

A further optimization in the case of long-lived streams is to introduce some form of snapshotting

let transact client cache snapshotStore fold initial streamName decide = async {
  let cachedVersion, cachedState =
    match cache.TryGet(streamName) with
    // we have a cached value, so we use it
    | Some (version, state) -> (version, state)
    | None ->
      match! snapshotStore.GetSnapshot(streamName) with
      // we have a snapshotted value so we use it
      | Some (version, state) -> (version, state)
      // no snapshot or cache, so we default to initial
      | None -> (-1L, initial)
      
  let! version, events = client.GetStreamMessages(streamName, cachedVersion)
  let state = fold cachedState events
  let newEvents = decide state
  let! newVersion = client.WriteMessages(streamName, newEvents, version)
  let newState = fold state newEvents
  cache.Add(streamName, (newVersion, newState))

  // most likely we snapshot at some interval based on the stream version, e.g. every 100 events
  if shouldSnapshot newVersion then
    do! snapshotStore.StoreSnapshot(streamName, (newVersion, newState)) } 

^ do these examples demonstrate your preferred approach?

In general I agree with, and appreciate your call for justification. All abstractions need to earn their keep.

To address some of your comments

An entity and its snapshots are not the same concern. They're related and adjacent, but never the same. And entity's process and lifecycle aren't germane to snapshotting. Snapshotting is only ever a mechanical optimization strategy, and not part of an entity stream's nature.

I whole heartedly agree, backwards reading with in-stream snapshots is an optimization strategy for when performance and round-trips really matter. It has two advantages over a separate snapshot store

  1. It can guarantee that fetching all events necessary to project current state is done in a single roundtrip
  2. It can support multiple types of snapshots / origin events
    (e.g. you could have Cleared, Snapshotted, and SnapshottedV2 in the stream and handle them seamlessly)

Don't forget that an entity's snapshot should only have to be retrieved once during the runtime lifetime of a service. That's not a one-time hit per projection of an entity, it's a one-time hit per first retrieval of an entity after powering on a service. After than, newer events are the only things that have to be retrieved.

This holds if your working set is small enough to fit in memory. Which, to be fair, most are.

It's utterly unnecessary to read backward to retrieve snapshots. The only necessary operation is to read approximately the last message in a snapshot's stream.

My one concern with reading the last message is that snapshot schemas can evolve over time.
I might know how to convert a Snapshotted event to a SnapshottedV2. So I would like to fetch the last message in the snapshot stream of either type. During rollout of a deployment and older version of the system could continue reading only Snapshotted events, but the newer version would be happy with either type. Perhaps this could be catered for in the last_message API?

It should also be understood that entity snapshots are only ever necessary for ling-lived event streams with great volumes of events, like ledgers and journals.

I also agree here, this is why I mentioned in the root of this issue that a potential rejection of this pattern is that MessageDB should not offer technical solutions to modeling problems. Most long-lived event streams can be represented alternatively as a series of small/bounded streams.

Projecting such a long-lived stream from scratch, of course, is expected to have to make more than a single retrieval of a batch of messages from the message store. It's not considered an abnormality to have to make multiple event batch retrieval calls in an event sourced system.

While not considered abnormal, it can definitely be undesirable.

But then, I'd posit that Message DB and Postgres are not sufficient to the aspirations of scale and performance sought

This is a very fair point. I agree that scenarios where backwards reading and in-stream snapshots are valuable, might also be scenarios that exclude Postgres as a backing store from the get-go.

I think you're making things much harder on yourself than necessary, and embracing design patterns that introduce more difficulty for not much advantage.

I'm not sure it was obvious from my writing, but I don't believe I'd derive benefit from this API personally. I'd rather change my model than adopt a technical band-aid. This suggestion came from a desire to have feature parity for message-db inside equinox with regards to access strategies.

@sbellware
Copy link
Contributor

do these examples demonstrate your preferred approach?

Yes. A good depiction.

This holds if your working set is small enough to fit in memory. Which, to be fair, most are.

In our experience (which, because we went from in-house work to consulting work, includes a lot of other people's implementations) cache invalidation of entity caches proves largely immaterial in practice in the wild.

In the end, cache invalidation can be effected by specifying a memory utilization threshold on a container, and allowing the container to restart when the threshold is reached. Since the implementation code is expected at a minimum to be both idempotent and safe to shut down, it turns out to be a non-issue in the end. It usually ends up being a configuration that users want, and will be happy to scale their server instances if they desire even more entities to remain in the cache.

My one concern with reading the last message is that snapshot schemas can evolve over time.

No way around it: a snapshot record is still a message, and all message schema migrations are a pain. But at least the patterns are few and pretty well-known. Either some kind of versioning and dispatching has to be used, or the schemas of the messages that have already been written to disk have to be retrofitted.

Retrofitting is one of the reasons I appreciate working with mutable message stores versus immutable message stores. There are just times where mutating message data is the right answer to an urgent operational issue. And if retrofitting can work for some scenario, then the need to maintain multiple versions of schemas, and their handlers, etc, can be mitigated.

And ultimately, in those luckiest of circumstances, if schema changes are additive, some backward compatibility should be had for free.

Most long-lived event streams can be represented alternatively as a series of small/bounded streams.

Indeed, but that's quite a bit more complex of a solution than keeping one entity in one stream in perpetuity. Even when we're building ledger apps, we'll keep a single ledger entity in a single stream for its entire life. Snapshotting is a lot less expensive to build and to maintain over the life of a system.

Given a choice, I'd always prefer to avoid the ownership cost of having multiple streams for a single ledger - even for things like "monthly close of books". For that kind of use case, I'd implement the monthly ledger as view data that would be calculated from the source ledger stream.

While not considered abnormal, it can definitely be undesirable.

For high-volume, high-frequency streams, there's usually no choice but to make multiple batched fetches to a stream in order to project all outstanding events. We've seen this in high-volume financial systems. The tradeoff is retrieving volumes of events for projection that themselves become database, network, and service host memory problems. But yeah, it can be undesirable - except when volume and frequency makes it unavoidable.

I'm not sure it was obvious from my writing, but I don't believe I'd derive benefit from this API personally. I'd rather change my model than adopt a technical band-aid. This suggestion came from a desire to have feature parity for message-db inside equinox with regards to access strategies.

Indeed. I figured that it arrises from a need to have feature parity. Ultimately, I think it's feature parity with EventStoreDB, and with other message store implementations that went ahead and implemented EventStoreDB's features without first questioning the essential necessity and historical context of EventStoreDB's features.

Like I mentioned, I feel that EventStoreDB got a number of things wrong. Of course, all of us who have the convenience of coming after EventStoreDB get to benefit from their experience, and make judgement calls based on what is probably more concrete validation of features through tangible experiences.

EventStoreDB also attempts to serve analytics use cases, which is something that is explicitly eliminated from Message DB given that all of those use cases are often better served with focused analytics products that can be populated from event streams (rather than try to make a transactional data store also serve non-transactional workloads).

After my experiences with EventStoreDB (and the experiences of other EventStoreDB users I'd spoken with over the years), I took stock of exactly the features that are essential to building transactional services, eliminated the extras, and built an implementation in a database that is (nearly) universally-supported in both cloud and on-premise data operations.

We'd considered an implementation for DynamoDB a few years ago, but none of our customers cared about it except as storage for aggregated view data (rather than message storage), ie: as a hosted store tasked with the use cases and workloads that might have otherwise been handled with Cassandra.

I would have preferred, though, that Event Store Ltd had done this exercise so that I didn't have to undertake the implementation (and maintenance) of Message DB all those years ago. But that ship has sailed, and we're entirely sold on MDB and Postrgres for our use cases and workloads now. And there's more interesting things to come in the next major version of Message DB, but that's over the horizon until next year. We may consider reverse reading, but it's honestly not a solution to a problem that I'm finding in the wild where Postgres is also a good choice.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants