Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Redis 5 #228

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from

Conversation

drmontgomery
Copy link
Contributor

Hi there!

As noted in my previous pull request, I've made some changes to support redis 5.0 and the new Streams commands. This branch is based on my features/redis-4.0 branch, so please disregard the common changes. (I'll rebase this if the other pull request is accepted.)

This is still a work-in-progress, but I think there is enough here to get meaningful feedback, especially on the interface and reply decoding for stream commands. If the current approach looks good, I'll add support for consumer groups and blocking stream commands.

Thanks,
-David

@coveralls
Copy link

Coverage Status

Coverage increased (+0.4%) to 91.088% when pulling 1ff1a1f on drmontgomery:features/redis-5.0 into aac07d2 on etaty:master.


def toOptionSeqStringSeqEntry[K, F, V](mb: MultiBulk)(implicit deserializerK: ByteStringDeserializer[K], deserializerF: ByteStringDeserializer[F], deserializerV: ByteStringDeserializer[V]): Option[Seq[(K, Seq[StreamEntry[F, V]])]] =
mb.responses.map { r =>
r.map(_.asInstanceOf[MultiBulk]).map(toStringSeqEntry[K,F,V])
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could squeeze some perf by doing map only once
r.map{ case mb: MultiBulk => toStringSeqEntry[K,F,V](mb)}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I'll include this in my next pull request.

StreamEntry(id, fields)
}

def toSeqEntry[F, V](mb: MultiBulk)(implicit deserializerF: ByteStringDeserializer[F], deserializerV: ByteStringDeserializer[V]): Seq[StreamEntry[F, V]] = {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relatively similar to MultiBulkConverter.toSeqByteString

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While the method signature is similar, the underlying reply from Redis is significantly different.

The Stream commands use nested arrays in their replies to a much greater extent than commands for the other data structures. For example, XRANGE returns an array of two-element arrays, where the first element is the stream id and the second element is an array of field-value pairs. Borrowing JSON notation, this would be something like:

[ 
   [ ID1, [ FIELD1, VALUE1, FIELD2, VALUE2, ... ] ], 
   [ ID2, [ FIELD1, VALUE1, FIELD2, VALUE2, ... ] ],
   ...
]

where ID, FIELD and VALUE are all Bulk strings and [] indicates a MultiBulk array.

The XREAD reply is even more nested, with an array containing two element arrays where the first element is the stream key and the second element is a sequence of entries, similar to the XRANGE reply.

Because the replies are structured in such specific ways for Stream comamnds, I opted to put the decoding logic in a separate object (StreamEntryDecoder) rather than adding it to MultiBulkConverter.

I opted to use unsafe operations (casting to MultiBulk, accessing elements by index) for two reasons. First, I think it's best for decoding to break immediately and loudly if we get a reply that doesn't match our understanding/implementation of the Redis spec. Second, I don't see good fallback options, aside from silently dropping parts of the response entirely. Throwing an exception seems like the best response. It seems like there is precedent here in the use of head/tail in MultiBulkConverter.seqtoMapString and MultiBulkConverter.toOptionStringByteString, which will also throw exceptions if the array is too small.

How would you feel about introducing a new exception specifically for decode errors? This feels cleaner than emitting low-level exceptions like ClassCastException, NoSuchElementException, etc., and would allow callers to implement custom decode error handling if desired. I thought about reusing ReplyErrorException for this, but that seems best reserved for error messages from the Redis server itself. Perhaps ReplyDecodeException?


private [redis] object StreamEntryDecoder {
def toEntry[F, V](mb: MultiBulk)(implicit deserializerF: ByteStringDeserializer[F], deserializerV: ByteStringDeserializer[V]): StreamEntry[F, V] = {
val r = mb.responses.get
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are using "unsafe" method here and there (.get, r(1) (seq.apply(index))
I don't know how safe it is to do it here.
Maybe you could try to compare with what we did in this file

object MultiBulkConverter {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please refer to the comment above for the reasoning behind usage of "unsafe" methods.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants