Skip to content

Commit

Permalink
Add comments to document main structure field names a bit better and …
Browse files Browse the repository at this point in the history
…improve the code accessibility.

Rename setSourceConsumer to setupSourceConsumer (better and matches setupMirrorConsumer).

Signed-off-by: Jean-Noël Moyne <jnmoyne@gmail.com>
  • Loading branch information
jnmoyne committed Apr 26, 2024
1 parent 8478886 commit 675e1cb
Showing 1 changed file with 103 additions and 90 deletions.
193 changes: 103 additions & 90 deletions server/stream.go
Expand Up @@ -213,37 +213,42 @@ type ExternalStream struct {
// Stream is a jetstream stream of messages. When we receive a message internally destined
// for a Stream we will direct link from the client to this structure.
type stream struct {
mu sync.RWMutex
js *jetStream
jsa *jsAccount
acc *Account
srv *Server
client *client
sysc *client
sid atomic.Uint64
pubAck []byte
outq *jsOutQ
msgs *ipQueue[*inMsg]
gets *ipQueue[*directGetReq]
store StreamStore
ackq *ipQueue[uint64]
lseq uint64
lmsgId string
consumers map[string]*consumer
numFilter int // number of filtered consumers
cfg StreamConfig
created time.Time
stype StorageType
tier string
ddmap map[string]*ddentry
ddarr []*ddentry
ddindex int
ddtmr *time.Timer
qch chan struct{}
mqch chan struct{}
active bool
ddloaded bool
closed atomic.Bool
mu sync.RWMutex // Read/write lock for the stream.
js *jetStream // The internal *jetStream for the account.
jsa *jsAccount // The JetStream account-level information.
acc *Account // The account this stream is defined in.
srv *Server // The server we are running in.
client *client // The internal JetStream client.
sysc *client // The internal JetStream system client.

// The current last subscription ID for the subscriptions through `client`.
// Those subscriptions are for the subjects filters being listened to and captured by the stream.
sid atomic.Uint64

pubAck []byte // The template (prefix) to generate the pubAck responses for this stream quickly.
outq *jsOutQ // Queue of *jsPubMsg for sending messages.
msgs *ipQueue[*inMsg] // Intra-process queue for the ingress of messages.
gets *ipQueue[*directGetReq] // Intra-process queue for the direct get requests.
store StreamStore // The storage for this stream.
ackq *ipQueue[uint64] // Intra-process queue for acks.
lseq uint64 // The sequence number of the last message stored in the stream.
lmsgId string // The de-duplication message ID of the last message stored in the stream.
consumers map[string]*consumer // The consumers for this stream.
numFilter int // The number of filtered consumers.
cfg StreamConfig // The stream's config.
created time.Time // Time the stream was created.
stype StorageType // The storage type.
tier string // The tier is the number of replicas for the stream (e.g. "R1" or "R3").
ddmap map[string]*ddentry // The dedupe map.
ddarr []*ddentry // The dedupe array.
ddindex int // The dedupe index.
ddtmr *time.Timer // The dedupe timer.
qch chan struct{} // The quit channel.
mqch chan struct{} // The monitor's quit channel.
active bool // Indicates that there are active internal subscriptions (for the subject filters)
// and/or mirror/sources consumers are scheduled to be established or already started.
ddloaded bool // set to true when the deduplication structures are been built.
closed atomic.Bool // Set to true when stop() is called on the stream.

// Mirror
mirror *sourceInfo
Expand All @@ -256,18 +261,18 @@ type stream struct {
// Indicates we have direct consumers.
directs int

// For input subject transform
// For input subject transform.
itr *subjectTransform

// For republishing.
tr *subjectTransform

// For processing consumers without main stream lock.
clsMu sync.RWMutex
cList []*consumer
sch chan struct{}
sigq *ipQueue[*cMsg]
csl *Sublist // Consumer Sublist
cList []*consumer // Consumer list.
sch chan struct{} // Channel to signal consumers.
sigq *ipQueue[*cMsg] // Intra-process queue for the messages to signal to the consumers.
csl *Sublist // Consumer subscription list.

// Leader will store seq/msgTrace in clustering mode. Used in applyStreamEntries
// to know if trace event should be sent after processing.
Expand All @@ -279,51 +284,58 @@ type stream struct {

// TODO(dlc) - Hide everything below behind two pointers.
// Clustered mode.
sa *streamAssignment
node RaftNode
catchup atomic.Bool
syncSub *subscription
infoSub *subscription
clMu sync.Mutex
clseq uint64
clfs uint64
inflight map[uint64]uint64
leader string
lqsent time.Time
catchups map[string]uint64
uch chan struct{}
compressOK bool
inMonitor bool
sa *streamAssignment // What the meta controller uses to assign streams to peers.
node RaftNode // Our RAFT node for the stream's group.
catchup atomic.Bool // Used to signal we are in catchup mode.
catchups map[string]uint64 // The number of messages that need to be caught per peer.
syncSub *subscription // Internal subscription for sync messages (on "$JSC.SYNC").
infoSub *subscription // Internal subscription for stream info requests.
clMu sync.Mutex // The mutex for clseq and clfs.
clseq uint64 // The commit log current last sequence number.
clfs uint64 // The count of the commit log failed sequence numbers.
inflight map[uint64]uint64 // Inflight message sizes per clseq.
leader string // The current leader for the RAFT group.
lqsent time.Time // The time at which the last lost quorum advisory was sent. Used to rate limit.
uch chan struct{} // The channel to signal updates to the monitor routine.
compressOK bool // True if we can do message compression in RAFT and catchup logic
inMonitor bool // True if the monitor routine has been started.

// Direct get subscription.
directSub *subscription
lastBySub *subscription

monitorWg sync.WaitGroup
monitorWg sync.WaitGroup // Wait group for the monitor routine.
}

type sourceInfo struct {
name string
iname string
cname string
sub *subscription
dsub *subscription
name string // The name of the stream being sourced.
iname string // The unique index name of this particular source.
cname string // The name of the current consumer for this source.
sub *subscription // The subscription to the consumer.

// (mirrors only) The subscription to the direct get request subject for
// the source stream's name on the `_sys_` queue group.
dsub *subscription

// (mirrors only) The subscription to the direct get last per subject request subject for
// the source stream's name on the `_sys_` queue group.
lbsub *subscription
msgs *ipQueue[*inMsg]
sseq uint64
dseq uint64
start time.Time
lag uint64
err *ApiError
fails int
last time.Time
lreq time.Time
qch chan struct{}
sip bool // setup in progress
wg sync.WaitGroup
sf string // subject filter
sfs []string // subject filters
trs []*subjectTransform // subject transforms

msgs *ipQueue[*inMsg] // Intra-process queue for incoming messages.
sseq uint64 // Last stream message sequence number seen from the source.
dseq uint64 // Last delivery (i.e. consumer's) sequence number.
start time.Time // The time of the last message recorded in the stream doing the sourcing.
lag uint64 // 0 or number of messages pending (as last reported by the consumer) - 1.
err *ApiError // The API error that caused the last consumer setup to fail.
fails int // The number of times trying to setup the consumer failed.
last time.Time // Time the consumer was created or of last message it received.
lreq time.Time // The last time setupMirrorConsumer/setupSourceConsumer was called.
qch chan struct{} // Quit channel.
sip bool // Setup in progress.
wg sync.WaitGroup // WaitGroup for the consumer's go routine.
sf string // The subject filter.
sfs []string // The subject filters.
trs []*subjectTransform // The subject transforms.
}

// For mirrors and direct get
Expand Down Expand Up @@ -371,9 +383,9 @@ const (

// Dedupe entry
type ddentry struct {
id string
seq uint64
ts int64
id string // The unique message ID provided by the client.
seq uint64 // The sequence number of the message.
ts int64 // The timestamp of the message.
}

// Replicas Range
Expand Down Expand Up @@ -698,7 +710,8 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
}

// Composes the index name. Contains the stream name, subject filter, and transform destination
// when the stream is external we will use additional information in case the (external) stream names are the same.
// when the stream is external we will use the api prefix as part of the index name
// (as the same stream name could be used in multiple JS domains)
func (ssi *StreamSource) composeIName() string {
var iName = ssi.Name

Expand Down Expand Up @@ -1834,7 +1847,7 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
}
mset.setStartingSequenceForSources(needsStartingSeqNum)
for iName := range neededCopy {
mset.setSourceConsumer(iName, mset.sources[iName].sseq+1, time.Time{})
mset.setupSourceConsumer(iName, mset.sources[iName].sseq+1, time.Time{})
}
}
}
Expand Down Expand Up @@ -2750,7 +2763,7 @@ func (mset *stream) retrySourceConsumer(iName string) {
}
}

// Same than setSourceConsumer but simply issue a debug statement indicating
// Same than setupSourceConsumer but simply issue a debug statement indicating
// that there is a retry.
//
// Lock should be held.
Expand All @@ -2759,8 +2772,8 @@ func (mset *stream) retrySourceConsumerAtSeq(iname string, seq uint64) {

s.Debugf("Retrying source consumer for '%s > %s'", mset.acc.Name, mset.cfg.Name)

// setSourceConsumer will check that the source is still configured.
mset.setSourceConsumer(iname, seq, time.Time{})
// setupSourceConsumer will check that the source is still configured.
mset.setupSourceConsumer(iname, seq, time.Time{})
}

// Lock should be held.
Expand Down Expand Up @@ -2800,13 +2813,13 @@ func (mset *stream) cancelSourceInfo(si *sourceInfo) {

const sourceConsumerRetryThreshold = 2 * time.Second

// This will schedule a call to setSourceConsumer, taking into account the last
// time it was retried and determine the soonest setSourceConsumer can be called
// This will schedule a call to setupSourceConsumer, taking into account the last
// time it was retried and determine the soonest setupSourceConsumer can be called
// without tripping the sourceConsumerRetryThreshold.
//
// Lock held on entry
func (mset *stream) scheduleSetSourceConsumerRetry(si *sourceInfo, seq uint64, startTime time.Time) {
// We are trying to figure out how soon we can retry. setSourceConsumer will reject
// We are trying to figure out how soon we can retry. setupSourceConsumer will reject
// a retry if last was done less than "sourceConsumerRetryThreshold" ago.
next := sourceConsumerRetryThreshold - time.Since(si.lreq)
if next < 0 {
Expand All @@ -2822,7 +2835,7 @@ func (mset *stream) scheduleSetSourceConsumerRetry(si *sourceInfo, seq uint64, s
mset.scheduleSetSourceConsumer(si.iname, seq, next, startTime)
}

// Simply schedules setSourceConsumer at the given delay.
// Simply schedules setupSourceConsumer at the given delay.
//
// Lock held on entry
func (mset *stream) scheduleSetSourceConsumer(iname string, seq uint64, delay time.Duration, startTime time.Time) {
Expand All @@ -2841,12 +2854,12 @@ func (mset *stream) scheduleSetSourceConsumer(iname string, seq uint64, delay ti
defer mset.mu.Unlock()

delete(mset.sourceRetries, iname)
mset.setSourceConsumer(iname, seq, startTime)
mset.setupSourceConsumer(iname, seq, startTime)
})
}

// Lock should be held.
func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.Time) {
func (mset *stream) setupSourceConsumer(iname string, seq uint64, startTime time.Time) {
// Ignore if closed.
if mset.closed.Load() {
return
Expand Down Expand Up @@ -3130,8 +3143,8 @@ func (mset *stream) processSourceMsgs(si *sourceInfo, ready *sync.WaitGroup) {
if stalled {
mset.mu.Lock()
// We don't need to schedule here, we are going to simply
// call setSourceConsumer with the current state+1.
mset.setSourceConsumer(iname, si.sseq+1, time.Time{})
// call setupSourceConsumer with the current state+1.
mset.setupSourceConsumer(iname, si.sseq+1, time.Time{})
mset.mu.Unlock()
}
}
Expand Down Expand Up @@ -3287,7 +3300,7 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool {
}
// Retry in all type of errors.
// This will make sure the source is still in mset.sources map,
// find the last sequence and then call setSourceConsumer.
// find the last sequence and then call setupSourceConsumer.
mset.retrySourceConsumer(iname)
}
return false
Expand Down Expand Up @@ -3552,7 +3565,7 @@ func (mset *stream) setupSourceConsumers() error {
// Setup our consumers at the proper starting position.
for _, ssi := range mset.cfg.Sources {
if si := mset.sources[ssi.iname]; si != nil {
mset.setSourceConsumer(ssi.iname, si.sseq+1, time.Time{})
mset.setupSourceConsumer(ssi.iname, si.sseq+1, time.Time{})
}
}

Expand Down

0 comments on commit 675e1cb

Please sign in to comment.