Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsublite: Documentation Clarification #3814

Closed
GLStephen opened this issue Mar 14, 2021 · 6 comments
Closed

pubsublite: Documentation Clarification #3814

GLStephen opened this issue Mar 14, 2021 · 6 comments
Assignees
Labels
api: pubsublite Issues related to the Pub/Sub Lite API. type: question Request for information or clarification. Not an issue.

Comments

@GLStephen
Copy link

The PubSubLite documentation lacks a touch of clarity around "recreating a publisher".

// Publish publishes `msg` to the topic asynchronously. Messages are batched and
// sent according to the client's PublishSettings. Publish never blocks.
//
// Publish returns a non-nil PublishResult which will be ready when the
// message has been sent (or has failed to be sent) to the server. Once the
// first publish error occurs, the publisher will terminate and subsequent
// PublishResults will also have an error. A new publisher must be created to
// republish failed messages.
//
// Once Stop() has been called or the publisher has failed permanently due to an
// error, future calls to Publish will immediately return a PublishResult with
// error ErrPublisherStopped. Error() returns the error that caused the
// publisher to terminate.
func (p *PublisherClient) Publish(ctx context.Context, msg *pubsub.Message) *pubsub.PublishResult {

What exactly does it mean: "A new publisher must be created to republish failed messages." Is that just replace the object like this being called again? Does everything just hook up once that is done?

oldPublisher, err = pscompat.NewPublisherClientWithSettings(pubSubCtx, pubsubTopic, settings)

Is there a "reset" or "resend old" command on the object? Does a new secondary publisher need to be made and hooked up to the "bucket" of unsent messages? Is it possible to get an example of what this really means for an operational app?

@GLStephen GLStephen added the triage me I really want to be triaged. label Mar 14, 2021
@product-auto-label product-auto-label bot added the api: pubsublite Issues related to the Pub/Sub Lite API. label Mar 14, 2021
@codyoss codyoss added the type: question Request for information or clarification. Not an issue. label Mar 15, 2021
@codyoss codyoss removed the triage me I really want to be triaged. label Mar 15, 2021
@tmdiep
Copy link
Contributor

tmdiep commented Mar 15, 2021

Hi Stephen,

Thanks for reporting this. We will improve the documentation and samples to clarify what this means and how to handle it.

The publisher implementation within the library attempts to retry publishes upon recoverable errors, so if it terminates that means a fatal error must have occurred. We expect this to be rare in operation.

In this case, a new publisher object must be created via publisher, err := pscompat.NewPublisherClientWithSettings(...) and then publisher.Publish(...) called again for every message that failed to be published.

@GLStephen
Copy link
Author

GLStephen commented Mar 16, 2021

Thanks for the response. Just asking the below mostly so you can cover some likely questions in your docs.

So, to be clear, it's not really " Once the
// first publish error occurs, the publisher will terminate and subsequent
// PublishResults will also have an error. A new publisher must be created to
// republish failed messages." but more like "once the first FATAL unrecoverable error occurs...." a fatal unrecoverable error,

  1. is there documentation on what error/message would be sent?
  2. How can we tell an unrecoverable error? What differentiates them as they flow through?
  3. What does the object looks like when instead of publish info it has an unrecoverable error? Any different?
  4. Where are the messages stored at this point? How do you get access to them?

In my other issue #3813 those errors are coming through this flow.

_, err := r.Get(pubSubCtx)
if err != nil {
	// TODO: Handle error.
	// NOTE: The publisher will terminate upon first error. Create a new
	// publisher to republish failed messages.
	log.Printf("PubSub Lite Publish Error %s", err.Error())
	createPubSubLitePublisher()
} else {
	//log.Printf("Published %s", messagePub)
	currentStats.MessageCountHandled++
}

Clearly the simple createPubSubLitePublisher() call is not sufficient, but it sounds like you are also saying that not every "err" that is not nill on this path is fatal, yes? How can we differentiate?

@tmdiep
Copy link
Contributor

tmdiep commented Mar 16, 2021

Sorry, I can see how my last comment was misleading. The library determines what is an unrecoverable error and we do not encourage user code to decide which is fatal. When the publisher decides to terminate, human intervention is normally necessary.

Having said that, there are a couple of cases that user code can handle automatically:

  1. pscompat.ErrOverflow is returned. There must be a serious issue for this to be occurring, e.g. the configured publish throughput for the topic is well below what is sufficient for publishers (or the service was persistently unavailable). Throttling publishes can mitigate this error.
  2. The service has become unavailable for more than pscompat.PublishSettings.Timeout, which is what occurred in issue pubsublite: Stalls on Sending & Delayed "The service is currently unavailable" #3813. The default timeout value of 1 minute is admittedly far too low. I have increased it to 1 hour in PR fix(pubsublite): increase default timeouts for publish and subscribe stream connections #3821.

For case 2, would it be more helpful to return a defined error, i.e. we add a pscompat.ErrUnavailable?

Where are the messages stored at this point? How do you get access to them?

The messages are buffered internally within the PublisherClient. We don't provide access to these in the pubsublite nor pubsub libraries, as it was expected that important messages would be persisted elsewhere and user code tracks which were successfully sent, e.g. to resume from task restarts/crashes.

@GLStephen
Copy link
Author

GLStephen commented Mar 16, 2021

We use PubSub and PubSubLite for tracking analytics events. As such, our messages really originate at this service from our perspective. Each message is enriched at this service with metadata, and the events originate outside our servcies, etc. If the assumption is that some other services "persists" them, I think that assumption is too broad.

We would prefer to pipe messages to another service for downtime, and quickly, like Fluentd off machine, but the expectation that they are persisted somewhere would only be in local memory and since we've already put them into PubSubLite client buffers the logical place is to turn around and "extract" them from those buffers and push into the failover service (gcs files, fluentd, etc.) then have a service get those back out when the service recovers.

I'd rather allow non fatal issues like the backend service being down have a max memory to buffer and a method to drain that buffer into a failover service. Given the rate of messages, I would like that service to be something that can go to disk, like FluentD. Then when the service returns FluentD can be drained back into the live service. If we rely on the capability to monitor and drain the buffers when in some state then all of the other capabilities in GCS become available for failure and retry.

I'm spitballing a bit in actual implementation, but I think a method to find the messages that have failed. Remove them from retry and push them somewhere else all within the context of the service makes a lot of sense. Especially in cases where the message originate at the client, which would likely be many IOT and similar types of services. In these cases message durability is important, getting the message to PubSubLite eventually is the key. Given message volumes, we generate many gigs per minute, a method that allows a service to push ultimately to disk is almost certainly needed or the Golang service will OOM pretty readily unless gobs of memory are available to account for these relatively rare events, and the typical escape valve there would be Google-FluentD or similar.

@tmdiep
Copy link
Contributor

tmdiep commented Mar 16, 2021

Thank you for explaining your use case.

We can certainly make service unavailable a defined error to make it easier to detect and handle.

As for providing access to the unsent, buffered messages, we will discuss this feature request internally and get back to you.

@tmdiep
Copy link
Contributor

tmdiep commented Mar 26, 2021

Hi Stephen,

An update on this issue - I have released v0.8 of the pubsublite library. Once pkg.go.dev/cloud.google.com/go/pubsublite updates, it will have some documentation clarifications.

To address backend unavailability, we have introduced a ErrBackendUnavailable error to detect and handle this case. We decided to increase the default connect timeouts and publish buffer sizes to very high values so that clients stay alive until the backend service recovers. If your application has low tolerance to backend unavailability, PublishSettings.Timeout should be set to a lower value and ErrBackendUnavailable handled (see this example).

We decided not to return the unsent, buffered messages and will leave it up to clients to track and handle. For example, perhaps you can wrap the pubsub.Message and pubsub.PublishResult in a struct and pass that to the channel receiving the publish results?

I will close this issue, but feel free to file new issues for additional feature requests and unclear documentation that you find. Thank you!

@tmdiep tmdiep closed this as completed Mar 26, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsublite Issues related to the Pub/Sub Lite API. type: question Request for information or clarification. Not an issue.
Projects
None yet
Development

No branches or pull requests

3 participants