Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Smart Producer for v2 #219

Open
Zerpet opened this issue Jul 19, 2023 · 3 comments
Open

Smart Producer for v2 #219

Zerpet opened this issue Jul 19, 2023 · 3 comments
Assignees
Labels
v2.x Applicable to v2.x issues
Milestone

Comments

@Zerpet
Copy link
Contributor

Zerpet commented Jul 19, 2023

Description

We have to implement the smart producer, similar as the v1 Producer. However, I’d like to take the chance that we are doing a major and change the API drastically. Some challenges I observe with the current implementation:

  • Producer is a concrete struct
  • Producer API is arguably broad
  • API is not consistent with Java or .NET clients (reference implementation and most popular respectively)

Producer is a concrete struct

This is problematic to provide different implementations of producers. In particular, implementations for "regular" stream and super streams. Java client provides the producer as an interface, hiding the concrete implementation and returning the right producer (stream vs super stream) based on options from the builder. .NET client uses a concrete implementation for the producer, and changes its behaviour (stream vs super stream) based on the options passed to the constructor.

I'm inclined to implement the v2 producer as API in a similar manner as the Java client. I'd like to have a very simple producer interface, and return the appropriate implementation based on the options passed to a create function in Environment.

Producer API is arguably broad

The current API has getters, sends, close, notification registration and a sort of flush. The current implementation exposes too much, IMO. In a smart client, the library should take care of all the low level logic and boilerplate code (e.g. reconnection), so that the user simply sends messages.

Given the language differences between Go a Java, most notably, the method overload, we can't have as a small API as the Java client:

public interface Producer extends AutoCloseable {
  MessageBuilder messageBuilder();
  long getLastPublishingId();
  void send(Message message, ConfirmationHandler confirmationHandler);
  void close();
}

We would have two different send functions. One for hand-holding, smart send, that will accumulate messages and send them in batch; and another one to batch send, like "pro-mode", that gives the flexibility to chose how many messages to send per batch. One notable difference between those two send functions is that batch sending will send immediately, whilst the smart send will accumulate until a certain threshold or a deadline, whichever happens first.

Proposed solution

The API could be something as simple as:

type Message interface {
	// ...
}

type Producer interface {
	Send(msg amqpMessage) error
	SendBatch(messages []amqpMessage) error
	SendWithId(publishingId uint64, msg amqpMessage) error
	GetLastPublishedId() uint64
        Close() error
}

Where Message is an AMQP 1.0 concrete type. We will provide the AMQP 1.0 implementation. Where Producer will be a smart producer, prepared to publish to a stream, or a super stream, or use any producer feature e.g. deduplication, sub-batching.

Drawbacks

This new API will break all existing users. It won't be trivial to migrate from v1 to v2, to the point that v2 may only be used for new projects/apps.

The user loses some flexibility, but I think that's acceptable as long as the new implementation provides enough benefits, like automatic connection recovery, logging and potentially metrics.

@Zerpet Zerpet added the v2.x Applicable to v2.x issues label Jul 19, 2023
@Zerpet Zerpet self-assigned this Jul 19, 2023
@Zerpet Zerpet added this to the 2.0 milestone Jul 19, 2023
@Zerpet
Copy link
Contributor Author

Zerpet commented Aug 2, 2023

Gabriele and myself had a long conversation about the Producer API, what we like from Java + .NET, what we don't like, and what the new API should look like. I'll use this comment to keep a record of the potential API design outcome.

Pros 👍 and Cons 👎 of Java + .NET

👍 for Java:

  • Super simple API. Just one send method and that's it
  • Builders return an interface, hiding away implementation to the user
    • Good for making implementation changes w/o impacting the user API
    • Easy for consumers of the library to mock/stub the producer

👎 for Java:

  • Specific combinations of parameters in builders provide different Send implementations
    • Message deduplication feature requires setting a reference + a publishing ID per message (docs)
    • Sub-entry batching send (doc)
    • Super Stream producer has a different combination of parameters (doc)
  • Impossible to provide your own aggregation mechanism (mailing list item)

👍 for .NET

👎 for .NET

  • Create methods return concrete classes, instead of interfaces

Design proposal in next comment.

@Zerpet
Copy link
Contributor Author

Zerpet commented Aug 2, 2023

Potential API design for Producer

  • Use a factory pattern
  • Return an interface
  • Make it terribly obvious what implementation of the producer you are getting

Some pseudo code:

// Environment is the entry point. It connects to rabbitmq and does topology
// operations
type Environment struct {
	// func CreateStream()
	// func DeleteStream()
}

// ProducerOpts will capture only optional parameters
type ProducerOpts struct {}

func (e Environment) CreateProducer(streamName string, opts ProducerOpts) Producer  {
        // Producer options only has optional parameters 
	// Returns the standard producer implementation
}

func (e Environment) CreateDeduplicationProducer(streamName, producerReference string, opts ProducerOpts) Producer {
        // Producer options only has optional parameters 
	// Returns the deduplicationProducer implementation
}

func (e Environment) CreateSubBatchingProducer(streamName string, subBatchSize int, opts ProducerOpts) Producer {
        // Producer options only has optional parameters 
	// Returns the subBatchingProducer implementation
}

type amqpMessage struct {
	// amqp 1.0 fields
}

type Producer interface {
	Send(msg amqpMessage) error
	SendBatch(messages []amqpMessage) error
	SendWithId(publishingId uint64, msg amqpMessage) error
	GetLastPublishedId() uint64
}

///////////////////////////////////////////////////

// Implementation for producer
type standardProducer struct {}

// Send batches messages and sends them after a pre-defined time, or when N
// messages are accumulated. The max amount of messages accumulated will be
// configurable. Internally keeps track of publishingID and auto-increments it
func (s standardProducer) Send(m amqpMessage) error {}
// SendWithId always returns an error because publishing ID is tracked internally
func (s standardProducer) SendWithId(publishingId uint64, msg amqpMessage) error {}
// SendBatch immediately sends the batch of messages. Assigns publishing IDs from the
// internally tracked publishing ID
func (s standardProducer) SendBatch(batch []amqpMessage) error {}
// GetLastPublishedId returns the last confirmed ID. Likely just informational in
// this implementation
func (s standardProducer) GetLastPublishedId() uint64 {}

///////////////////////////////////////////////////

// Implementation for producer using the message deduplication feature
type deduplicationProducer struct {}

// Send will always return an error because publishing ID is mandatory
func (p deduplicationProducer) Send(m amqpMessage) error {}
// SendBatch will always return an error because publishing ID is mandatory
func (p deduplicationProducer) SendBatch(msg []amqpMessage) error {}
// SendWithId accumulates messages with given ID, and publishes them after a
// pre-defined time, or when there are N messages accumulated. The max amount of
// messages accumulated will be configurable
func (p deduplicationProducer) SendWithId(publishingId uint64, m amqpMessage) error {}
// GetLastPublishedId Returns the last confirmed published ID. This will be
// important for applications to determine where they left off, in case of a
// disconnection
func (p deduplicationProducer) GetLastPublishedId() uint64 {}

///////////////////////////////////////////////////

// Implementation for producer using the sub-batching feature
type subBatchingProducer struct {}

// Send batches messages and sends them after a pre-defined time, or when N
// messages are accumulated. The max amount of messages accumulated will be
// configurable. Internally, it will track  a publishing ID, and it will send
// the message sub-batch with the current publishing. Compression may be used
// if it was an option in the CreateSubBatchProducer function
func (p subBatchingProducer) Send(m amqpMessage) error {}
// SendBatch immediately sends a sub-batch of messages with the current
// publishing ID
func (p subBatchingProducer) SendBatch(msg []amqpMessage) error {}
// SendWithId will always return an error because the publishing ID is tracked
// internally
func (p subBatchingProducer) SendWithId(publishingId uint64, m amqpMessage) error {}
// GetLastPublishedId returns the last confirmed published ID. This will be
// important for the internal reconnection/init process, to pick up where it left
// off, in the event of a reconnection or application restart
func (p subBatchingProducer) GetLastPublishedId() uint64 {}

Edit: updated API after follow-up conversation with Gabriele

@Zerpet
Copy link
Contributor Author

Zerpet commented Aug 2, 2023

Reserved

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
v2.x Applicable to v2.x issues
Projects
None yet
Development

No branches or pull requests

1 participant