Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Seems like eventsByTag query misses events (at buffer's boundary). #55

Open
LoneEngineer opened this issue Aug 17, 2018 · 3 comments
Open
Labels

Comments

@LoneEngineer
Copy link

Hi,

I use Akka 2.5.14 and akka-persistence-inmemory 2.5.1.1, and seems like I fall into following issue:
I emit quite fast a lot of events (more that default max-buffer-size) and sometimes 101th, 202th and 303th events are not pushed into stream.
I tried to read code, and following place I consider as suspicious:

  override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
    Source.unfoldAsync[Offset, Seq[EventEnvelope]](offset) { (from: Offset) =>
      def nextFromOffset(xs: Seq[EventEnvelope]): Offset = {
        if (xs.isEmpty) from else xs.last.offset match {
          case Sequence(n)         => Sequence(n)
          case TimeBasedUUID(time) => TimeBasedUUID(UUIDs.startOf(UUIDs.unixTimestamp(time) + 1))
        }
      }
      ticker.flatMapConcat(_ => currentEventsByTag(tag, from)
        .take(maxBufferSize)).runWith(Sink.seq).map { xs =>
        val next = nextFromOffset(xs)
        Some((next, xs))
      }
    }.mapConcat(identity)

Let consider an example (real values from test, sorry I cannot share test yet):
100th event has TimeBasedUUID(fa7225e0-a223-11e8-b71e-e9435a127f49)
101th event has TimeBasedUUID(fa7225e1-a223-11e8-b71e-e9435a127f49)

According to my logs, my stream processing logic never got 101th event. If we run code from nextFromOffset:

@ val time1 = UUID.fromString("fa7225e0-a223-11e8-b71e-e9435a127f49") 
time1: UUID = fa7225e0-a223-11e8-b71e-e9435a127f49
@ val next1 = TimeBasedUUID(UUIDs.startOf(UUIDs.unixTimestamp(time1) + 1)) 
next1: TimeBasedUUID = TimeBasedUUID(fa724cf0-a223-11e8-8080-808080808080)
@ val time2 = UUID.fromString("fa7225e1-a223-11e8-b71e-e9435a127f49") 
time2: UUID = fa7225e1-a223-11e8-b71e-e9435a127f49
@ TimeBasedUUID(time2).compare(next1) 
res9: Int = -1

So if currentEventsByTag returned 101 events, last one is dropped by take(100) and next offset returned to unfold will be fa724cf0-a223-11e8-8080-808080808080 which is after 101th event's timestamp.

If I set max-buffer-size to value above number of events my test can generate - everything works fine.

What you think about my thoughts?

@dnvriend
Copy link
Owner

I think you hit an edge case. The solution of setting the max-buffer-size seems reasonable.

@LoneEngineer
Copy link
Author

I did a fix and created pull request
#64
Please review.

@LoneEngineer
Copy link
Author

LoneEngineer commented Dec 28, 2018

Just copy description from commit message:
The problem happened then buffer's bound stops in between of events with timeuuid like:
...
e3e99ed0-a21d-11e8-b31a-e9435a127f49 // A: last event which put into a buffer
e3e99ed1-a21d-11e8-b31a-e9435a127f49 // B: next one
...

InMemoryReadJournal::eventsByTag::nextFromOffset uses unix timestamp to calculate 'next' event:

case TimeBasedUUID(time) => TimeBasedUUID(UUIDs.startOf(UUIDs.unixTimestamp(time) + 1))

and it skips event B because both of them have same unix timestamp: 1534510989629, and 'next' uuid will be:
e3e9c5e0-a21d-11e8-b31a-e9435a127f49

The difference in nanoseconds:
137538037896290000 for A
137538037896290001 for B

Where it comes from ?
InMemoryAsyncWriteJournal uses following functions to generate timeuuid for an event:

def nowUuid: UUID = UUIDs.timeBased()
def getTimeBasedUUID: TimeBasedUUID = TimeBasedUUID(nowUuid)
def timeBased(): UUID = {
   new UUID(makeMSB(UUIDUtil.getCurrentTimestamp()), ClockSeqAndNode)
}

If we will take a look on UUIDUtil.getCurrentTimestamp more closely, we can see following:

public static final AtomicLong lastTimestamp = new AtomicLong(0L);
   ...
       long now = fromUnixTimestamp(System.currentTimeMillis());
       long last = lastTimestamp.get();
       if (now > last) { 
          ...
        } else { 
          ...
          long candidate = last + 1;

So if two (or more) events are persisted in same millisecond, nanoseconds will be added to timeuuid. But they are not
taken into account when events are read from a journal.

PS: I also added the test for that scenario, unfortunately test is very depended on timing (performance)
and may NOT fail even with broken implementation.
I was able to choose parameters which gives me like ~100% failure rate.
I mean, the test never passed successfully with original implementation on my box
but I cannot guarantee that for other boxes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants