Skip to content

Commit

Permalink
WIP: Pedantic mode
Browse files Browse the repository at this point in the history
Pedantic mode intention has been added to all paths.
Now it need to be properly handled and tested

Signed-off-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
Jarema committed Apr 23, 2024
1 parent f1c4a6a commit b9165ad
Show file tree
Hide file tree
Showing 11 changed files with 1,120 additions and 60 deletions.
35 changes: 26 additions & 9 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,10 @@ type SequenceInfo struct {
}

type CreateConsumerRequest struct {
Stream string `json:"stream_name"`
Config ConsumerConfig `json:"config"`
Action ConsumerAction `json:"action"`
Stream string `json:"stream_name"`
Config ConsumerConfig `json:"config"`
Action ConsumerAction `json:"action"`
Pedantic bool `json:"pedantic"`
}

type ConsumerAction int
Expand Down Expand Up @@ -437,7 +438,7 @@ const (
)

// Helper function to set consumer config defaults from above.
func setConsumerConfigDefaults(config *ConsumerConfig, streamCfg *StreamConfig, lim *JSLimitOpts, accLim *JetStreamAccountLimits) {
func setConsumerConfigDefaults(config *ConsumerConfig, streamCfg *StreamConfig, lim *JSLimitOpts, accLim *JetStreamAccountLimits, pedantic bool) *ApiError {
// Set to default if not specified.
if config.DeliverSubject == _EMPTY_ && config.MaxWaiting == 0 {
config.MaxWaiting = JSWaitQueueDefaultMax
Expand All @@ -452,12 +453,21 @@ func setConsumerConfigDefaults(config *ConsumerConfig, streamCfg *StreamConfig,
}
// If BackOff was specified that will override the AckWait and the MaxDeliver.
if len(config.BackOff) > 0 {
if pedantic && config.AckWait != config.BackOff[0] {
return NewJSPedanticError(errors.New("first backoff value has to equal batch AckWait"))
}
config.AckWait = config.BackOff[0]
}
if config.MaxAckPending == 0 {
if pedantic && streamCfg.ConsumerLimits.MaxAckPending > 0 {
return NewJSPedanticError(errors.New("max_ack_pending must be set if it's configured in stream limits"))
}
config.MaxAckPending = streamCfg.ConsumerLimits.MaxAckPending
}
if config.InactiveThreshold == 0 {
if pedantic && streamCfg.ConsumerLimits.InactiveThreshold > 0 {
return NewJSPedanticError(errors.New("inactive_threshold must be set if it's configured in stream limits"))
}
config.InactiveThreshold = streamCfg.ConsumerLimits.InactiveThreshold
}
// Set proper default for max ack pending if we are ack explicit and none has been set.
Expand All @@ -473,8 +483,12 @@ func setConsumerConfigDefaults(config *ConsumerConfig, streamCfg *StreamConfig,
}
// if applicable set max request batch size
if config.DeliverSubject == _EMPTY_ && config.MaxRequestBatch == 0 && lim.MaxRequestBatch > 0 {
if pedantic {
return NewJSPedanticError(errors.New("max_request_batch must be set if it's JetStream limits are set"))
}
config.MaxRequestBatch = lim.MaxRequestBatch
}
return nil
}

// Check the consumer config. If we are recovering don't check filter subjects.
Expand Down Expand Up @@ -702,15 +716,15 @@ func checkConsumerCfg(
return nil
}

func (mset *stream) addConsumerWithAction(config *ConsumerConfig, action ConsumerAction) (*consumer, error) {
return mset.addConsumerWithAssignment(config, _EMPTY_, nil, false, action)
func (mset *stream) addConsumerWithAction(config *ConsumerConfig, action ConsumerAction, pedantic bool) (*consumer, error) {
return mset.addConsumerWithAssignment(config, _EMPTY_, nil, false, action, pedantic)
}

func (mset *stream) addConsumer(config *ConsumerConfig) (*consumer, error) {
return mset.addConsumerWithAction(config, ActionCreateOrUpdate)
return mset.addConsumerWithAction(config, ActionCreateOrUpdate, false)
}

func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname string, ca *consumerAssignment, isRecovering bool, action ConsumerAction) (*consumer, error) {
func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname string, ca *consumerAssignment, isRecovering bool, action ConsumerAction, pedantic bool) (*consumer, error) {
// Check if this stream has closed.
if mset.closed.Load() {
return nil, NewJSStreamInvalidError()
Expand Down Expand Up @@ -743,8 +757,11 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
// Make sure we have sane defaults. Do so with the JS lock, otherwise a
// badly timed meta snapshot can result in a race condition.
mset.js.mu.Lock()
setConsumerConfigDefaults(config, &mset.cfg, srvLim, &selectedLimits)
err := setConsumerConfigDefaults(config, &mset.cfg, srvLim, &selectedLimits, pedantic)
mset.js.mu.Unlock()
if err != nil {
return nil, err
}

if err := checkConsumerCfg(config, srvLim, &cfg, acc, &selectedLimits, isRecovering); err != nil {
return nil, err
Expand Down
10 changes: 10 additions & 0 deletions server/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -1518,5 +1518,15 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSPedanticErrF",
"code": 400,
"error_code": 10154,
"description": "pedantic mode: {err}",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
4 changes: 2 additions & 2 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1442,7 +1442,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
// the consumer can reconnect. We will create it as a durable and switch it.
cfg.ConsumerConfig.Durable = ofi.Name()
}
obs, err := e.mset.addConsumerWithAssignment(&cfg.ConsumerConfig, _EMPTY_, nil, true, ActionCreateOrUpdate)
obs, err := e.mset.addConsumerWithAssignment(&cfg.ConsumerConfig, _EMPTY_, nil, true, ActionCreateOrUpdate, false)
if err != nil {
s.Warnf(" Error adding consumer %q: %v", cfg.Name, err)
continue
Expand Down Expand Up @@ -2588,7 +2588,7 @@ func (a *Account) addStreamTemplate(tc *StreamTemplateConfig) (*streamTemplate,
// FIXME(dlc) - Hacky
tcopy := tc.deepCopy()
tcopy.Config.Name = "_"
cfg, apiErr := s.checkStreamCfg(tcopy.Config, a)
cfg, apiErr := s.checkStreamCfg(tcopy.Config, a, false)
if apiErr != nil {
return nil, apiErr
}
Expand Down
28 changes: 15 additions & 13 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1414,7 +1414,7 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account,
return
}

var cfg StreamConfig
var cfg StreamRequest
if err := json.Unmarshal(msg, &cfg); err != nil {
resp.Error = NewJSInvalidJSONError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
Expand Down Expand Up @@ -1455,13 +1455,13 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account,
return
}

if err := acc.jsNonClusteredStreamLimitsCheck(&cfg); err != nil {
if err := acc.jsNonClusteredStreamLimitsCheck(&cfg.StreamConfig); err != nil {
resp.Error = err
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

mset, err := acc.addStream(&cfg)
mset, err := acc.addStreamPedantic(&cfg.StreamConfig, cfg.Pedantic)
if err != nil {
if IsNatsErr(err, JSStreamStoreFailedF) {
s.Warnf("Stream create failed for '%s > %s': %v", acc, streamName, err)
Expand Down Expand Up @@ -1521,14 +1521,14 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account,
}
return
}
var ncfg StreamConfig
var ncfg StreamRequest
if err := json.Unmarshal(msg, &ncfg); err != nil {
resp.Error = NewJSInvalidJSONError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

cfg, apiErr := s.checkStreamCfg(&ncfg, acc)
cfg, apiErr := s.checkStreamCfg(&ncfg.StreamConfig, acc, ncfg.Pedantic)
if apiErr != nil {
resp.Error = apiErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
Expand All @@ -1545,7 +1545,7 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account,
// Handle clustered version here.
if s.JetStreamIsClustered() {
// Always do in separate Go routine.
go s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, copyBytes(rmsg), &cfg, nil)
go s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, copyBytes(rmsg), &cfg, nil, ncfg.Pedantic)
return
}

Expand All @@ -1556,7 +1556,7 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account,
return
}

if err := mset.update(&cfg); err != nil {
if err := mset.updatePedantic(&cfg, ncfg.Pedantic); err != nil {
resp.Error = NewJSStreamUpdateError(err, Unless(err))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
Expand Down Expand Up @@ -2581,7 +2581,8 @@ func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _
streamName, accName, cfg.Replicas, s.peerSetToNames(currPeers), s.peerSetToNames(peers))

// We will always have peers and therefore never do a callout, therefore it is safe to call inline
s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers)
// We should be fine ignoring pedantic mode here. as we do not touch configuration.
s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers, false)
}

// Request to have the metaleader move a stream on a peer to another
Expand Down Expand Up @@ -2687,7 +2688,8 @@ func (s *Server) jsLeaderServerStreamCancelMoveRequest(sub *subscription, c *cli
cfg.Replicas, streamName, accName, s.peerSetToNames(currPeers), s.peerSetToNames(peers))

// We will always have peers and therefore never do a callout, therefore it is safe to call inline
s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers)
// TODO(jrm): We should be fine ignoring pedantic mode here.
s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers, false)
}

// Request to have an account purged
Expand Down Expand Up @@ -3372,7 +3374,7 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, _ *Account
}

// check stream config at the start of the restore process, not at the end
cfg, apiErr := s.checkStreamCfg(&req.Config, acc)
cfg, apiErr := s.checkStreamCfg(&req.Config, acc, false)
if apiErr != nil {
resp.Error = apiErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
Expand Down Expand Up @@ -4005,9 +4007,9 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun
// during this call, so place in Go routine to not block client.
// Router and Gateway API calls already in separate context.
if c.kind != ROUTER && c.kind != GATEWAY {
go s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action)
go s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action, req.Pedantic)
} else {
s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action)
s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action, req.Pedantic)
}
return
}
Expand All @@ -4032,7 +4034,7 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun
req.Config.PauseUntil = o.cfg.PauseUntil
}

o, err := stream.addConsumerWithAction(&req.Config, req.Action)
o, err := stream.addConsumerWithAction(&req.Config, req.Action, req.Pedantic)

if err != nil {
if IsNatsErr(err, JSConsumerStoreFailedErrF) {
Expand Down
26 changes: 16 additions & 10 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3585,7 +3585,7 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
js.mu.Unlock()
}
// Call update.
if err = mset.updateWithAdvisory(cfg, !recovering); err != nil {
if err = mset.updateWithAdvisory(cfg, !recovering, false); err != nil {
s.Warnf("JetStream cluster error updating stream %q for account %q: %v", cfg.Name, acc.Name, err)
}
// Set the new stream assignment.
Expand Down Expand Up @@ -3736,7 +3736,7 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
mset.setStreamAssignment(sa)
// Check if our config has really been updated.
if !reflect.DeepEqual(mset.config(), sa.Config) {
if err = mset.updateWithAdvisory(sa.Config, false); err != nil {
if err = mset.updateWithAdvisory(sa.Config, false, false); err != nil {
s.Warnf("JetStream cluster error updating stream %q for account %q: %v", sa.Config.Name, acc.Name, err)
if osa != nil {
// Process the raft group and make sure it's running if needed.
Expand All @@ -3755,7 +3755,7 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
}
} else if err == NewJSStreamNotFoundError() {
// Add in the stream here.
mset, err = acc.addStreamWithAssignment(sa.Config, nil, sa)
mset, err = acc.addStreamWithAssignment(sa.Config, nil, sa, false)
}
if mset != nil {
mset.setCreatedTime(sa.Created)
Expand Down Expand Up @@ -4283,7 +4283,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
var didCreate, isConfigUpdate, needsLocalResponse bool
if o == nil {
// Add in the consumer if needed.
if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, js.isMetaRecovering(), ActionCreateOrUpdate); err == nil {
if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, js.isMetaRecovering(), ActionCreateOrUpdate, false); err == nil {
didCreate = true
}
} else {
Expand Down Expand Up @@ -5977,15 +5977,15 @@ func (js *jetStream) jsClusteredStreamLimitsCheck(acc *Account, cfg *StreamConfi
return nil
}

func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, config *StreamConfig) {
func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, config *StreamRequest) {
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
return
}

var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}

ccfg, apiErr := s.checkStreamCfg(config, acc)
ccfg, apiErr := s.checkStreamCfg(&config.StreamConfig, acc, config.Pedantic)
if apiErr != nil {
resp.Error = apiErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
Expand Down Expand Up @@ -6122,7 +6122,7 @@ func sysRequest[T any](s *Server, subjFormat string, args ...any) (*T, error) {
}
}

func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, cfg *StreamConfig, peerSet []string) {
func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, cfg *StreamConfig, peerSet []string, pedantic bool) {
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
return
Expand All @@ -6148,7 +6148,7 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
var newCfg *StreamConfig
if jsa := js.accounts[acc.Name]; jsa != nil {
js.mu.Unlock()
ncfg, err := jsa.configUpdateCheck(osa.Config, cfg, s)
ncfg, err := jsa.configUpdateCheck(osa.Config, cfg, s, pedantic)
js.mu.Lock()
if err != nil {
resp.Error = NewJSStreamUpdateError(err, Unless(err))
Expand Down Expand Up @@ -6249,6 +6249,8 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
// We are adding new peers here.
if newCfg.Replicas > len(rg.Peers) {
// Check that we have the allocation available.
// TODO(jrm): Make sure we do not need pedantic mode info here.
// We should be fine, as this either errors or not, and we are not changing the stream.
if err := js.jsClusteredStreamLimitsCheck(acc, newCfg); err != nil {
resp.Error = err
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
Expand Down Expand Up @@ -7072,7 +7074,7 @@ func (cc *jetStreamCluster) createGroupForConsumer(cfg *ConsumerConfig, sa *stre
}

// jsClusteredConsumerRequest is first point of entry to create a consumer in clustered mode.
func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, stream string, cfg *ConsumerConfig, action ConsumerAction) {
func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, stream string, cfg *ConsumerConfig, action ConsumerAction, pedantic bool) {
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
return
Expand All @@ -7094,7 +7096,11 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
}
srvLim := &s.getOpts().JetStreamLimits
// Make sure we have sane defaults
setConsumerConfigDefaults(cfg, &streamCfg, srvLim, selectedLimits)
if err := setConsumerConfigDefaults(cfg, &streamCfg, srvLim, selectedLimits, pedantic); err != nil {
resp.Error = err
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}

if err := checkConsumerCfg(cfg, srvLim, &streamCfg, acc, selectedLimits, false); err != nil {
resp.Error = err
Expand Down

0 comments on commit b9165ad

Please sign in to comment.