Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pedantic mode #5245

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 26 additions & 9 deletions server/consumer.go
Expand Up @@ -119,9 +119,10 @@ type SequenceInfo struct {
}

type CreateConsumerRequest struct {
Stream string `json:"stream_name"`
Config ConsumerConfig `json:"config"`
Action ConsumerAction `json:"action"`
Stream string `json:"stream_name"`
Config ConsumerConfig `json:"config"`
Action ConsumerAction `json:"action"`
Pedantic bool `json:"pedantic"`
ripienaar marked this conversation as resolved.
Show resolved Hide resolved
}

type ConsumerAction int
Expand Down Expand Up @@ -436,7 +437,7 @@ const (
)

// Helper function to set consumer config defaults from above.
func setConsumerConfigDefaults(config *ConsumerConfig, streamCfg *StreamConfig, lim *JSLimitOpts, accLim *JetStreamAccountLimits) {
func setConsumerConfigDefaults(config *ConsumerConfig, streamCfg *StreamConfig, lim *JSLimitOpts, accLim *JetStreamAccountLimits, pedantic bool) *ApiError {
// Set to default if not specified.
if config.DeliverSubject == _EMPTY_ && config.MaxWaiting == 0 {
config.MaxWaiting = JSWaitQueueDefaultMax
Expand All @@ -451,12 +452,21 @@ func setConsumerConfigDefaults(config *ConsumerConfig, streamCfg *StreamConfig,
}
// If BackOff was specified that will override the AckWait and the MaxDeliver.
if len(config.BackOff) > 0 {
if pedantic && config.AckWait != config.BackOff[0] {
return NewJSPedanticError(errors.New("first backoff value has to equal batch AckWait"))
}
config.AckWait = config.BackOff[0]
}
if config.MaxAckPending == 0 {
if pedantic && streamCfg.ConsumerLimits.MaxAckPending > 0 {
return NewJSPedanticError(errors.New("max_ack_pending must be set if it's configured in stream limits"))
}
config.MaxAckPending = streamCfg.ConsumerLimits.MaxAckPending
}
if config.InactiveThreshold == 0 {
if pedantic && streamCfg.ConsumerLimits.InactiveThreshold > 0 {
return NewJSPedanticError(errors.New("inactive_threshold must be set if it's configured in stream limits"))
}
config.InactiveThreshold = streamCfg.ConsumerLimits.InactiveThreshold
}
// Set proper default for max ack pending if we are ack explicit and none has been set.
Expand All @@ -472,8 +482,12 @@ func setConsumerConfigDefaults(config *ConsumerConfig, streamCfg *StreamConfig,
}
// if applicable set max request batch size
if config.DeliverSubject == _EMPTY_ && config.MaxRequestBatch == 0 && lim.MaxRequestBatch > 0 {
if pedantic {
return NewJSPedanticError(errors.New("max_request_batch must be set if it's JetStream limits are set"))
}
config.MaxRequestBatch = lim.MaxRequestBatch
}
return nil
}

// Check the consumer config. If we are recovering don't check filter subjects.
Expand Down Expand Up @@ -701,15 +715,15 @@ func checkConsumerCfg(
return nil
}

func (mset *stream) addConsumerWithAction(config *ConsumerConfig, action ConsumerAction) (*consumer, error) {
return mset.addConsumerWithAssignment(config, _EMPTY_, nil, false, action)
func (mset *stream) addConsumerWithAction(config *ConsumerConfig, action ConsumerAction, pedantic bool) (*consumer, error) {
return mset.addConsumerWithAssignment(config, _EMPTY_, nil, false, action, pedantic)
}

func (mset *stream) addConsumer(config *ConsumerConfig) (*consumer, error) {
return mset.addConsumerWithAction(config, ActionCreateOrUpdate)
return mset.addConsumerWithAction(config, ActionCreateOrUpdate, false)
}

func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname string, ca *consumerAssignment, isRecovering bool, action ConsumerAction) (*consumer, error) {
func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname string, ca *consumerAssignment, isRecovering bool, action ConsumerAction, pedantic bool) (*consumer, error) {
// Check if this stream has closed.
if mset.closed.Load() {
return nil, NewJSStreamInvalidError()
Expand Down Expand Up @@ -742,8 +756,11 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
// Make sure we have sane defaults. Do so with the JS lock, otherwise a
// badly timed meta snapshot can result in a race condition.
mset.js.mu.Lock()
setConsumerConfigDefaults(config, &mset.cfg, srvLim, &selectedLimits)
err := setConsumerConfigDefaults(config, &mset.cfg, srvLim, &selectedLimits, pedantic)
mset.js.mu.Unlock()
if err != nil {
return nil, err
}

if err := checkConsumerCfg(config, srvLim, &cfg, acc, &selectedLimits, isRecovering); err != nil {
return nil, err
Expand Down
10 changes: 10 additions & 0 deletions server/errors.json
Expand Up @@ -1518,5 +1518,15 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSPedanticErrF",
"code": 400,
"error_code": 10154,
"description": "pedantic mode: {err}",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
4 changes: 2 additions & 2 deletions server/jetstream.go
Expand Up @@ -1442,7 +1442,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
// the consumer can reconnect. We will create it as a durable and switch it.
cfg.ConsumerConfig.Durable = ofi.Name()
}
obs, err := e.mset.addConsumerWithAssignment(&cfg.ConsumerConfig, _EMPTY_, nil, true, ActionCreateOrUpdate)
obs, err := e.mset.addConsumerWithAssignment(&cfg.ConsumerConfig, _EMPTY_, nil, true, ActionCreateOrUpdate, false)
if err != nil {
s.Warnf(" Error adding consumer %q: %v", cfg.Name, err)
continue
Expand Down Expand Up @@ -2588,7 +2588,7 @@ func (a *Account) addStreamTemplate(tc *StreamTemplateConfig) (*streamTemplate,
// FIXME(dlc) - Hacky
tcopy := tc.deepCopy()
tcopy.Config.Name = "_"
cfg, apiErr := s.checkStreamCfg(tcopy.Config, a)
cfg, apiErr := s.checkStreamCfg(tcopy.Config, a, false)
if apiErr != nil {
return nil, apiErr
}
Expand Down
28 changes: 15 additions & 13 deletions server/jetstream_api.go
Expand Up @@ -1414,7 +1414,7 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account,
return
}

var cfg StreamConfig
var cfg StreamRequest
if err := json.Unmarshal(msg, &cfg); err != nil {
resp.Error = NewJSInvalidJSONError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
Expand Down Expand Up @@ -1455,13 +1455,13 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account,
return
}

if err := acc.jsNonClusteredStreamLimitsCheck(&cfg); err != nil {
if err := acc.jsNonClusteredStreamLimitsCheck(&cfg.StreamConfig); err != nil {
resp.Error = err
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

mset, err := acc.addStream(&cfg)
mset, err := acc.addStreamPedantic(&cfg.StreamConfig, cfg.Pedantic)
if err != nil {
if IsNatsErr(err, JSStreamStoreFailedF) {
s.Warnf("Stream create failed for '%s > %s': %v", acc, streamName, err)
Expand Down Expand Up @@ -1521,14 +1521,14 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account,
}
return
}
var ncfg StreamConfig
var ncfg StreamRequest
if err := json.Unmarshal(msg, &ncfg); err != nil {
resp.Error = NewJSInvalidJSONError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

cfg, apiErr := s.checkStreamCfg(&ncfg, acc)
cfg, apiErr := s.checkStreamCfg(&ncfg.StreamConfig, acc, ncfg.Pedantic)
if apiErr != nil {
resp.Error = apiErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
Expand All @@ -1545,7 +1545,7 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account,
// Handle clustered version here.
if s.JetStreamIsClustered() {
// Always do in separate Go routine.
go s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, copyBytes(rmsg), &cfg, nil)
go s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, copyBytes(rmsg), &cfg, nil, ncfg.Pedantic)
return
}

Expand All @@ -1556,7 +1556,7 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account,
return
}

if err := mset.update(&cfg); err != nil {
if err := mset.updatePedantic(&cfg, ncfg.Pedantic); err != nil {
resp.Error = NewJSStreamUpdateError(err, Unless(err))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
Expand Down Expand Up @@ -2581,7 +2581,8 @@ func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _
streamName, accName, cfg.Replicas, s.peerSetToNames(currPeers), s.peerSetToNames(peers))

// We will always have peers and therefore never do a callout, therefore it is safe to call inline
s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers)
// We should be fine ignoring pedantic mode here. as we do not touch configuration.
s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers, false)
}

// Request to have the metaleader move a stream on a peer to another
Expand Down Expand Up @@ -2687,7 +2688,8 @@ func (s *Server) jsLeaderServerStreamCancelMoveRequest(sub *subscription, c *cli
cfg.Replicas, streamName, accName, s.peerSetToNames(currPeers), s.peerSetToNames(peers))

// We will always have peers and therefore never do a callout, therefore it is safe to call inline
s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers)
// TODO(jrm): We should be fine ignoring pedantic mode here.
s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers, false)
}

// Request to have an account purged
Expand Down Expand Up @@ -3372,7 +3374,7 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, _ *Account
}

// check stream config at the start of the restore process, not at the end
cfg, apiErr := s.checkStreamCfg(&req.Config, acc)
cfg, apiErr := s.checkStreamCfg(&req.Config, acc, false)
if apiErr != nil {
resp.Error = apiErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
Expand Down Expand Up @@ -4005,9 +4007,9 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun
// during this call, so place in Go routine to not block client.
// Router and Gateway API calls already in separate context.
if c.kind != ROUTER && c.kind != GATEWAY {
go s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action)
go s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action, req.Pedantic)
} else {
s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action)
s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action, req.Pedantic)
}
return
}
Expand All @@ -4032,7 +4034,7 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun
req.Config.PauseUntil = o.cfg.PauseUntil
}

o, err := stream.addConsumerWithAction(&req.Config, req.Action)
o, err := stream.addConsumerWithAction(&req.Config, req.Action, req.Pedantic)

if err != nil {
if IsNatsErr(err, JSConsumerStoreFailedErrF) {
Expand Down
26 changes: 16 additions & 10 deletions server/jetstream_cluster.go
Expand Up @@ -3607,7 +3607,7 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
js.mu.Unlock()
}
// Call update.
if err = mset.updateWithAdvisory(cfg, !recovering); err != nil {
if err = mset.updateWithAdvisory(cfg, !recovering, false); err != nil {
s.Warnf("JetStream cluster error updating stream %q for account %q: %v", cfg.Name, acc.Name, err)
}
// Set the new stream assignment.
Expand Down Expand Up @@ -3758,7 +3758,7 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
mset.setStreamAssignment(sa)
// Check if our config has really been updated.
if !reflect.DeepEqual(mset.config(), sa.Config) {
if err = mset.updateWithAdvisory(sa.Config, false); err != nil {
if err = mset.updateWithAdvisory(sa.Config, false, false); err != nil {
s.Warnf("JetStream cluster error updating stream %q for account %q: %v", sa.Config.Name, acc.Name, err)
if osa != nil {
// Process the raft group and make sure it's running if needed.
Expand All @@ -3777,7 +3777,7 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
}
} else if err == NewJSStreamNotFoundError() {
// Add in the stream here.
mset, err = acc.addStreamWithAssignment(sa.Config, nil, sa)
mset, err = acc.addStreamWithAssignment(sa.Config, nil, sa, false)
}
if mset != nil {
mset.setCreatedTime(sa.Created)
Expand Down Expand Up @@ -4305,7 +4305,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
var didCreate, isConfigUpdate, needsLocalResponse bool
if o == nil {
// Add in the consumer if needed.
if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, js.isMetaRecovering(), ActionCreateOrUpdate); err == nil {
if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, js.isMetaRecovering(), ActionCreateOrUpdate, false); err == nil {
didCreate = true
}
} else {
Expand Down Expand Up @@ -5999,15 +5999,15 @@ func (js *jetStream) jsClusteredStreamLimitsCheck(acc *Account, cfg *StreamConfi
return nil
}

func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, config *StreamConfig) {
func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, config *StreamRequest) {
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
return
}

var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}

ccfg, apiErr := s.checkStreamCfg(config, acc)
ccfg, apiErr := s.checkStreamCfg(&config.StreamConfig, acc, config.Pedantic)
if apiErr != nil {
resp.Error = apiErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
Expand Down Expand Up @@ -6151,7 +6151,7 @@ func sysRequest[T any](s *Server, subjFormat string, args ...any) (*T, error) {
}
}

func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, cfg *StreamConfig, peerSet []string) {
func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, cfg *StreamConfig, peerSet []string, pedantic bool) {
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
return
Expand All @@ -6177,7 +6177,7 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
var newCfg *StreamConfig
if jsa := js.accounts[acc.Name]; jsa != nil {
js.mu.Unlock()
ncfg, err := jsa.configUpdateCheck(osa.Config, cfg, s)
ncfg, err := jsa.configUpdateCheck(osa.Config, cfg, s, pedantic)
js.mu.Lock()
if err != nil {
resp.Error = NewJSStreamUpdateError(err, Unless(err))
Expand Down Expand Up @@ -6278,6 +6278,8 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
// We are adding new peers here.
if newCfg.Replicas > len(rg.Peers) {
// Check that we have the allocation available.
// TODO(jrm): Make sure we do not need pedantic mode info here.
// We should be fine, as this either errors or not, and we are not changing the stream.
if err := js.jsClusteredStreamLimitsCheck(acc, newCfg); err != nil {
resp.Error = err
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
Expand Down Expand Up @@ -7101,7 +7103,7 @@ func (cc *jetStreamCluster) createGroupForConsumer(cfg *ConsumerConfig, sa *stre
}

// jsClusteredConsumerRequest is first point of entry to create a consumer in clustered mode.
func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, stream string, cfg *ConsumerConfig, action ConsumerAction) {
func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, stream string, cfg *ConsumerConfig, action ConsumerAction, pedantic bool) {
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
return
Expand All @@ -7123,7 +7125,11 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
}
srvLim := &s.getOpts().JetStreamLimits
// Make sure we have sane defaults
setConsumerConfigDefaults(cfg, &streamCfg, srvLim, selectedLimits)
if err := setConsumerConfigDefaults(cfg, &streamCfg, srvLim, selectedLimits, pedantic); err != nil {
resp.Error = err
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}

if err := checkConsumerCfg(cfg, srvLim, &streamCfg, acc, selectedLimits, false); err != nil {
resp.Error = err
Expand Down