Skip to content

Commit

Permalink
Cherry-pick PRs for 2.10.15-RC.3 (#5354)
Browse files Browse the repository at this point in the history
Includes the following:

* #5351
* #5353
* #5337
* #5356
* #5361
* #5362

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
wallyqs committed Apr 28, 2024
2 parents 45b9416 + ed64b8b commit 5cca3f1
Show file tree
Hide file tree
Showing 16 changed files with 497 additions and 111 deletions.
4 changes: 2 additions & 2 deletions server/accounts.go
Expand Up @@ -3072,9 +3072,9 @@ func (s *Server) SetAccountResolver(ar AccountResolver) {

// AccountResolver returns the registered account resolver.
func (s *Server) AccountResolver() AccountResolver {
s.mu.Lock()
s.mu.RLock()
ar := s.accResolver
s.mu.Unlock()
s.mu.RUnlock()
return ar
}

Expand Down
17 changes: 17 additions & 0 deletions server/client.go
Expand Up @@ -4909,6 +4909,23 @@ func adjustPingInterval(kind int, d time.Duration) time.Duration {
return d
}

// This is used when a connection cannot yet start to send PINGs because
// the remote would not be able to handle them (case of compression,
// or outbound gateway, etc...), but we still want to close the connection
// if the timer has not been reset by the time we reach the time equivalent
// to have sent the max number of pings.
//
// Lock should be held
func (c *client) watchForStaleConnection(pingInterval time.Duration, pingMax int) {
c.ping.tmr = time.AfterFunc(pingInterval*time.Duration(pingMax+1), func() {
c.mu.Lock()
c.Debugf("Stale Client Connection - Closing")
c.enqueueProto([]byte(fmt.Sprintf(errProto, "Stale Connection")))
c.mu.Unlock()
c.closeConnection(StaleConnection)
})
}

// Lock should be held
func (c *client) setPingTimer() {
if c.srv == nil {
Expand Down
16 changes: 12 additions & 4 deletions server/consumer.go
Expand Up @@ -2739,9 +2739,10 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) {
}
}
}
// If nothing left set to current delivered.
// If nothing left set consumer to current delivered.
// Do not update stream.
if len(o.pending) == 0 {
o.adflr, o.asflr = o.dseq-1, o.sseq-1
o.adflr = o.dseq - 1
}
}
// We do these regardless.
Expand Down Expand Up @@ -3701,9 +3702,12 @@ func (o *consumer) checkAckFloor() {
}

// If we are here, and this should be rare, we still are off with our ack floor.
// We will make sure we are not doing un-necessary work here if only off by a bit
// since this could be normal for a high activity wq or stream.
// We will set it explicitly to 1 behind our current lowest in pending, or if
// pending is empty, to our current delivered -1.
if o.asflr < ss.FirstSeq-1 {
const minOffThreshold = 50
if o.asflr < ss.FirstSeq-minOffThreshold {
var psseq, pdseq uint64
for seq, p := range o.pending {
if psseq == 0 || seq < psseq {
Expand All @@ -3715,7 +3719,7 @@ func (o *consumer) checkAckFloor() {
psseq, pdseq = o.sseq-1, o.dseq-1
// If still not adjusted.
if psseq < ss.FirstSeq-1 {
psseq, pdseq = ss.FirstSeq-1, ss.FirstSeq-1
psseq = ss.FirstSeq - 1
}
} else {
// Since this was set via the pending, we should not include
Expand Down Expand Up @@ -4532,6 +4536,10 @@ func (o *consumer) checkPending() {
if len(o.cfg.BackOff) > 0 {
// This is ok even if o.rdc is nil, we would get dc == 0, which is what we want.
dc := int(o.rdc[seq])
if dc < 0 {
// Prevent consumer backoff from going backwards.
dc = 0
}
// This will be the index for the next backoff, will set to last element if needed.
nbi := dc + 1
if dc+1 >= len(o.cfg.BackOff) {
Expand Down
4 changes: 4 additions & 0 deletions server/filestore.go
Expand Up @@ -6908,6 +6908,10 @@ SKIP:
purged = fs.state.Msgs
}
fs.state.Msgs -= purged
if fs.state.Msgs == 0 {
fs.state.FirstSeq = fs.state.LastSeq + 1
fs.state.FirstTime = time.Time{}
}

if bytes > fs.state.Bytes {
bytes = fs.state.Bytes
Expand Down
9 changes: 9 additions & 0 deletions server/gateway.go
Expand Up @@ -907,6 +907,15 @@ func (s *Server) createGateway(cfg *gatewayCfg, url *url.URL, conn net.Conn) {
c.Debugf("TLS version %s, cipher suite %s", tlsVersion(cs.Version), tlsCipher(cs.CipherSuite))
}

// For outbound, we can't set the normal ping timer yet since the other
// side would fail with a parse error should it receive anything but the
// CONNECT protocol as the first protocol. We still want to make sure
// that the connection is not stale until the first INFO from the remote
// is received.
if solicit {
c.watchForStaleConnection(adjustPingInterval(GATEWAY, opts.PingInterval), opts.MaxPingsOut)
}

c.mu.Unlock()

// Announce ourselves again to new connections.
Expand Down
72 changes: 66 additions & 6 deletions server/gateway_test.go
Expand Up @@ -6490,15 +6490,15 @@ func TestGatewayAuthDiscovered(t *testing.T) {
waitForOutboundGateways(t, srvB, 1, time.Second)
}

func TestTLSGatewaysCertificateImplicitAllowPass(t *testing.T) {
testTLSGatewaysCertificateImplicitAllow(t, true)
func TestGatewayTLSCertificateImplicitAllowPass(t *testing.T) {
testGatewayTLSCertificateImplicitAllow(t, true)
}

func TestTLSGatewaysCertificateImplicitAllowFail(t *testing.T) {
testTLSGatewaysCertificateImplicitAllow(t, false)
func TestGatewayTLSCertificateImplicitAllowFail(t *testing.T) {
testGatewayTLSCertificateImplicitAllow(t, false)
}

func testTLSGatewaysCertificateImplicitAllow(t *testing.T, pass bool) {
func testGatewayTLSCertificateImplicitAllow(t *testing.T, pass bool) {
// Base config for the servers
cfg := createTempFile(t, "cfg")
cfg.WriteString(fmt.Sprintf(`
Expand Down Expand Up @@ -6988,7 +6988,7 @@ func (l *testMissingOCSPStapleLogger) Errorf(format string, v ...any) {
}
}

func TestOCSPGatewayMissingPeerStapleIssue(t *testing.T) {
func TestGatewayOCSPMissingPeerStapleIssue(t *testing.T) {
const (
caCert = "../test/configs/certs/ocsp/ca-cert.pem"
caKey = "../test/configs/certs/ocsp/ca-key.pem"
Expand Down Expand Up @@ -7349,3 +7349,63 @@ func TestOCSPGatewayMissingPeerStapleIssue(t *testing.T) {
waitForOutboundGateways(t, srvC, 2, 5*time.Second)
wg.Wait()
}

func TestGatewayOutboundDetectsStaleConnectionIfNoInfo(t *testing.T) {
l, err := net.Listen("tcp", "127.0.0.1:0")
require_NoError(t, err)
defer l.Close()

ch := make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
c, err := l.Accept()
if err != nil {
return
}
defer c.Close()
<-ch
}()

url := fmt.Sprintf("nats://%s", l.Addr())
o := testGatewayOptionsFromToWithURLs(t, "A", "B", []string{url})
o.gatewaysSolicitDelay = time.Millisecond
o.DisableShortFirstPing = false
o.PingInterval = 50 * time.Millisecond
o.MaxPingsOut = 3
o.NoLog = false
s, err := NewServer(o)
require_NoError(t, err)
defer s.Shutdown()

log := &captureDebugLogger{dbgCh: make(chan string, 100)}
s.SetLogger(log, true, false)
s.Start()

timeout := time.NewTimer(time.Second)
defer timeout.Stop()
for done := false; !done; {
select {
case dbg := <-log.dbgCh:
// The server should not send PING because the accept side expects
// the CONNECT as the first protocol (otherwise it would be a parse
// error if that were to happen).
if strings.Contains(dbg, "Ping Timer") {
t.Fatalf("The server should not have sent a ping, got %q", dbg)
}
// However, it should detect at one point that the connection is
// stale and close it.
if strings.Contains(dbg, "Stale") {
done = true
}
case <-timeout.C:
t.Fatalf("Did not capture the stale connection condition")
}
}

s.Shutdown()
close(ch)
wg.Wait()
s.WaitForShutdown()
}

0 comments on commit 5cca3f1

Please sign in to comment.