Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick PRs for 2.10.15-RC.3 #5354

Merged
merged 17 commits into from Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions server/accounts.go
Expand Up @@ -3072,9 +3072,9 @@ func (s *Server) SetAccountResolver(ar AccountResolver) {

// AccountResolver returns the registered account resolver.
func (s *Server) AccountResolver() AccountResolver {
s.mu.Lock()
s.mu.RLock()
ar := s.accResolver
s.mu.Unlock()
s.mu.RUnlock()
return ar
}

Expand Down
17 changes: 17 additions & 0 deletions server/client.go
Expand Up @@ -4909,6 +4909,23 @@ func adjustPingInterval(kind int, d time.Duration) time.Duration {
return d
}

// This is used when a connection cannot yet start to send PINGs because
// the remote would not be able to handle them (case of compression,
// or outbound gateway, etc...), but we still want to close the connection
// if the timer has not been reset by the time we reach the time equivalent
// to have sent the max number of pings.
//
// Lock should be held
func (c *client) watchForStaleConnection(pingInterval time.Duration, pingMax int) {
c.ping.tmr = time.AfterFunc(pingInterval*time.Duration(pingMax+1), func() {
c.mu.Lock()
c.Debugf("Stale Client Connection - Closing")
c.enqueueProto([]byte(fmt.Sprintf(errProto, "Stale Connection")))
c.mu.Unlock()
c.closeConnection(StaleConnection)
})
}

// Lock should be held
func (c *client) setPingTimer() {
if c.srv == nil {
Expand Down
16 changes: 12 additions & 4 deletions server/consumer.go
Expand Up @@ -2739,9 +2739,10 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) {
}
}
}
// If nothing left set to current delivered.
// If nothing left set consumer to current delivered.
// Do not update stream.
if len(o.pending) == 0 {
o.adflr, o.asflr = o.dseq-1, o.sseq-1
o.adflr = o.dseq - 1
}
}
// We do these regardless.
Expand Down Expand Up @@ -3701,9 +3702,12 @@ func (o *consumer) checkAckFloor() {
}

// If we are here, and this should be rare, we still are off with our ack floor.
// We will make sure we are not doing un-necessary work here if only off by a bit
// since this could be normal for a high activity wq or stream.
// We will set it explicitly to 1 behind our current lowest in pending, or if
// pending is empty, to our current delivered -1.
if o.asflr < ss.FirstSeq-1 {
const minOffThreshold = 50
if o.asflr < ss.FirstSeq-minOffThreshold {
var psseq, pdseq uint64
for seq, p := range o.pending {
if psseq == 0 || seq < psseq {
Expand All @@ -3715,7 +3719,7 @@ func (o *consumer) checkAckFloor() {
psseq, pdseq = o.sseq-1, o.dseq-1
// If still not adjusted.
if psseq < ss.FirstSeq-1 {
psseq, pdseq = ss.FirstSeq-1, ss.FirstSeq-1
psseq = ss.FirstSeq - 1
}
} else {
// Since this was set via the pending, we should not include
Expand Down Expand Up @@ -4532,6 +4536,10 @@ func (o *consumer) checkPending() {
if len(o.cfg.BackOff) > 0 {
// This is ok even if o.rdc is nil, we would get dc == 0, which is what we want.
dc := int(o.rdc[seq])
if dc < 0 {
// Prevent consumer backoff from going backwards.
dc = 0
}
// This will be the index for the next backoff, will set to last element if needed.
nbi := dc + 1
if dc+1 >= len(o.cfg.BackOff) {
Expand Down
4 changes: 4 additions & 0 deletions server/filestore.go
Expand Up @@ -6908,6 +6908,10 @@ SKIP:
purged = fs.state.Msgs
}
fs.state.Msgs -= purged
if fs.state.Msgs == 0 {
fs.state.FirstSeq = fs.state.LastSeq + 1
fs.state.FirstTime = time.Time{}
}

if bytes > fs.state.Bytes {
bytes = fs.state.Bytes
Expand Down
9 changes: 9 additions & 0 deletions server/gateway.go
Expand Up @@ -907,6 +907,15 @@ func (s *Server) createGateway(cfg *gatewayCfg, url *url.URL, conn net.Conn) {
c.Debugf("TLS version %s, cipher suite %s", tlsVersion(cs.Version), tlsCipher(cs.CipherSuite))
}

// For outbound, we can't set the normal ping timer yet since the other
// side would fail with a parse error should it receive anything but the
// CONNECT protocol as the first protocol. We still want to make sure
// that the connection is not stale until the first INFO from the remote
// is received.
if solicit {
c.watchForStaleConnection(adjustPingInterval(GATEWAY, opts.PingInterval), opts.MaxPingsOut)
}

c.mu.Unlock()

// Announce ourselves again to new connections.
Expand Down
72 changes: 66 additions & 6 deletions server/gateway_test.go
Expand Up @@ -6490,15 +6490,15 @@ func TestGatewayAuthDiscovered(t *testing.T) {
waitForOutboundGateways(t, srvB, 1, time.Second)
}

func TestTLSGatewaysCertificateImplicitAllowPass(t *testing.T) {
testTLSGatewaysCertificateImplicitAllow(t, true)
func TestGatewayTLSCertificateImplicitAllowPass(t *testing.T) {
testGatewayTLSCertificateImplicitAllow(t, true)
}

func TestTLSGatewaysCertificateImplicitAllowFail(t *testing.T) {
testTLSGatewaysCertificateImplicitAllow(t, false)
func TestGatewayTLSCertificateImplicitAllowFail(t *testing.T) {
testGatewayTLSCertificateImplicitAllow(t, false)
}

func testTLSGatewaysCertificateImplicitAllow(t *testing.T, pass bool) {
func testGatewayTLSCertificateImplicitAllow(t *testing.T, pass bool) {
// Base config for the servers
cfg := createTempFile(t, "cfg")
cfg.WriteString(fmt.Sprintf(`
Expand Down Expand Up @@ -6988,7 +6988,7 @@ func (l *testMissingOCSPStapleLogger) Errorf(format string, v ...any) {
}
}

func TestOCSPGatewayMissingPeerStapleIssue(t *testing.T) {
func TestGatewayOCSPMissingPeerStapleIssue(t *testing.T) {
const (
caCert = "../test/configs/certs/ocsp/ca-cert.pem"
caKey = "../test/configs/certs/ocsp/ca-key.pem"
Expand Down Expand Up @@ -7349,3 +7349,63 @@ func TestOCSPGatewayMissingPeerStapleIssue(t *testing.T) {
waitForOutboundGateways(t, srvC, 2, 5*time.Second)
wg.Wait()
}

func TestGatewayOutboundDetectsStaleConnectionIfNoInfo(t *testing.T) {
l, err := net.Listen("tcp", "127.0.0.1:0")
require_NoError(t, err)
defer l.Close()

ch := make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
c, err := l.Accept()
if err != nil {
return
}
defer c.Close()
<-ch
}()

url := fmt.Sprintf("nats://%s", l.Addr())
o := testGatewayOptionsFromToWithURLs(t, "A", "B", []string{url})
o.gatewaysSolicitDelay = time.Millisecond
o.DisableShortFirstPing = false
o.PingInterval = 50 * time.Millisecond
o.MaxPingsOut = 3
o.NoLog = false
s, err := NewServer(o)
require_NoError(t, err)
defer s.Shutdown()

log := &captureDebugLogger{dbgCh: make(chan string, 100)}
s.SetLogger(log, true, false)
s.Start()

timeout := time.NewTimer(time.Second)
defer timeout.Stop()
for done := false; !done; {
select {
case dbg := <-log.dbgCh:
// The server should not send PING because the accept side expects
// the CONNECT as the first protocol (otherwise it would be a parse
// error if that were to happen).
if strings.Contains(dbg, "Ping Timer") {
t.Fatalf("The server should not have sent a ping, got %q", dbg)
}
// However, it should detect at one point that the connection is
// stale and close it.
if strings.Contains(dbg, "Stale") {
done = true
}
case <-timeout.C:
t.Fatalf("Did not capture the stale connection condition")
}
}

s.Shutdown()
close(ch)
wg.Wait()
s.WaitForShutdown()
}