Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I often get the error nats: no heartbeat received #1622

Open
VuongUranus opened this issue Apr 22, 2024 · 8 comments · May be fixed by #1631
Open

I often get the error nats: no heartbeat received #1622

VuongUranus opened this issue Apr 22, 2024 · 8 comments · May be fixed by #1631
Labels
defect Suspected defect such as a bug or regression

Comments

@VuongUranus
Copy link

Observed behavior

I often get the error no heartbeat received. Why do I get this error, where have I misconfigured?
When I get this error it seems like my pull consumer has been deleted, I don't know the reason but it seems to be due to exceeding the time configured in the InactiveThreshold attribute configured in the consumer config.

func  ContinuousPolling(cons jetstream.Consumer, f func([]byte)) (jetstream.MessagesContext, error) {
	iter, err := cons.Messages()
	if err != nil {
		log.Error().Msgf("Error when get message from NATS: %v", err)
		return nil, err
	}
	go func() {
		for {
			msg, err := iter.Next()
			if err != nil {
				if errors.Is(err, jetstream.ErrMsgIteratorClosed) || errors.Is(err, jetstream.ErrConsumerDeleted) {
					return
				}

				log.Error().Msgf("Error when consume message. Reson: %v",  err)
				continue
			}
			go f(msg.Data())
			msg.Ack()
		}
	}()

	return iter, nil
}
func setDefaultConsumerConfig(subjects []string, consumerName ...string) jetstream.ConsumerConfig {
	name := ""
	if len(consumerName) > 0 {
		name = consumerName[0]
	}

	return jetstream.ConsumerConfig{
		Name:              name,
		DeliverPolicy:     jetstream.DeliverNewPolicy,
		FilterSubjects:    subjects,
		AckPolicy:         jetstream.AckAllPolicy,
		Replicas:          1,
		InactiveThreshold: 1 * time.Minute,
	}
}

Expected behavior

Please explain to me and avoid this situation again.

Server and client version

nats-server version: 2.10.12

Host environment

No response

Steps to reproduce

No response

@VuongUranus VuongUranus added the defect Suspected defect such as a bug or regression label Apr 22, 2024
@Jarema
Copy link
Member

Jarema commented Apr 22, 2024

Hey!

What we don't see here, is the context in which both provided functions are called. Could you please share it?

Inactive Threshold kicks in when there is noone listening for messages from a consumer (so, no Fetch or Messages active).

This means it can happen for example if there is a long pause between creating a consumer and consuming its messages, or longer than a minute app downtime (the app that is consuming).

Moving the issue to the nats.go repo, as it's probably a client side discussion unless we find any issue.

@Jarema Jarema transferred this issue from nats-io/nats-server Apr 22, 2024
@withinboredom
Copy link

I believe this happens if you spend too long processing a message in the message loop. You need to take the message off the "queue" and return processing to the library asap (at least that is what I got when trying to debug this issue in my code).

@Jarema
Copy link
Member

Jarema commented May 13, 2024

That should not be the case, especially in such a long inactive threshold.
We will however try to replicate it by running a slow workload.

@withinboredom
Copy link

AFAICT, the issue appears to be that heartbeats are only processed when calling iter.Next() so you need to call it again asap.

@piotrpio
Copy link
Collaborator

This is not correct. The heartbeats are indeed processed in Next(), but the timer which triggers the error is only running in the context of each Next() execution. So each time you call iter.Next() the timer is reset and stopped when the method returns.

@withinboredom
Copy link

Indeed! Looking at the code, it looks like this error might be coming from networking issues as the heartbeat isn't paused/reset during reconnection:

nats.go/jetstream/pull.go

Lines 608 to 630 in 8894a27

err = retryWithBackoff(func(attempt int) (bool, error) {
isClosed := atomic.LoadUint32(&s.closed) == 1
if isClosed {
return false, nil
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err := s.consumer.Info(ctx)
if err != nil {
if errors.Is(err, ErrConsumerNotFound) {
return false, err
}
if attempt == backoffOpts.attempts-1 {
return true, fmt.Errorf("could not get consumer info after server reconnect: %w", err)
}
return true, err
}
return false, nil
}, backoffOpts)
if err != nil {
s.Stop()
return nil, err
}

@withinboredom withinboredom linked a pull request May 17, 2024 that will close this issue
@withinboredom
Copy link

I can confirm that the above PR appears to fix the issue -- at least, I haven't seen this message in awhile now.

@withinboredom
Copy link

withinboredom commented May 19, 2024

@VuongUranus try adding this to your go.mod file and see if you are also not seeing the issue anymore?

replace github.com/nats-io/nats.go => github.com/withinboredom/nats.go patch-1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants