Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document struct field names in stream.go #5359

Merged
merged 2 commits into from Apr 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
193 changes: 103 additions & 90 deletions server/stream.go
Expand Up @@ -213,37 +213,42 @@ type ExternalStream struct {
// Stream is a jetstream stream of messages. When we receive a message internally destined
// for a Stream we will direct link from the client to this structure.
type stream struct {
mu sync.RWMutex
js *jetStream
jsa *jsAccount
acc *Account
srv *Server
client *client
sysc *client
sid atomic.Uint64
pubAck []byte
outq *jsOutQ
msgs *ipQueue[*inMsg]
gets *ipQueue[*directGetReq]
store StreamStore
ackq *ipQueue[uint64]
lseq uint64
lmsgId string
consumers map[string]*consumer
numFilter int // number of filtered consumers
cfg StreamConfig
created time.Time
stype StorageType
tier string
ddmap map[string]*ddentry
ddarr []*ddentry
ddindex int
ddtmr *time.Timer
qch chan struct{}
mqch chan struct{}
active bool
ddloaded bool
closed atomic.Bool
mu sync.RWMutex // Read/write lock for the stream.
js *jetStream // The internal *jetStream for the account.
jsa *jsAccount // The JetStream account-level information.
acc *Account // The account this stream is defined in.
srv *Server // The server we are running in.
client *client // The internal JetStream client.
sysc *client // The internal JetStream system client.

// The current last subscription ID for the subscriptions through `client`.
// Those subscriptions are for the subjects filters being listened to and captured by the stream.
sid atomic.Uint64

pubAck []byte // The template (prefix) to generate the pubAck responses for this stream quickly.
outq *jsOutQ // Queue of *jsPubMsg for sending messages.
msgs *ipQueue[*inMsg] // Intra-process queue for the ingress of messages.
gets *ipQueue[*directGetReq] // Intra-process queue for the direct get requests.
store StreamStore // The storage for this stream.
ackq *ipQueue[uint64] // Intra-process queue for acks.
lseq uint64 // The sequence number of the last message stored in the stream.
lmsgId string // The de-duplication message ID of the last message stored in the stream.
consumers map[string]*consumer // The consumers for this stream.
numFilter int // The number of filtered consumers.
cfg StreamConfig // The stream's config.
created time.Time // Time the stream was created.
stype StorageType // The storage type.
tier string // The tier is the number of replicas for the stream (e.g. "R1" or "R3").
ddmap map[string]*ddentry // The dedupe map.
ddarr []*ddentry // The dedupe array.
ddindex int // The dedupe index.
ddtmr *time.Timer // The dedupe timer.
qch chan struct{} // The quit channel.
mqch chan struct{} // The monitor's quit channel.
active bool // Indicates that there are active internal subscriptions (for the subject filters)
// and/or mirror/sources consumers are scheduled to be established or already started.
ddloaded bool // set to true when the deduplication structures are been built.
closed atomic.Bool // Set to true when stop() is called on the stream.

// Mirror
mirror *sourceInfo
Expand All @@ -256,18 +261,18 @@ type stream struct {
// Indicates we have direct consumers.
directs int

// For input subject transform
// For input subject transform.
itr *subjectTransform

// For republishing.
tr *subjectTransform

// For processing consumers without main stream lock.
clsMu sync.RWMutex
cList []*consumer
sch chan struct{}
sigq *ipQueue[*cMsg]
csl *Sublist // Consumer Sublist
cList []*consumer // Consumer list.
sch chan struct{} // Channel to signal consumers.
sigq *ipQueue[*cMsg] // Intra-process queue for the messages to signal to the consumers.
csl *Sublist // Consumer subscription list.

// Leader will store seq/msgTrace in clustering mode. Used in applyStreamEntries
// to know if trace event should be sent after processing.
Expand All @@ -279,51 +284,58 @@ type stream struct {

// TODO(dlc) - Hide everything below behind two pointers.
// Clustered mode.
sa *streamAssignment
node RaftNode
catchup atomic.Bool
syncSub *subscription
infoSub *subscription
clMu sync.Mutex
clseq uint64
clfs uint64
inflight map[uint64]uint64
leader string
lqsent time.Time
catchups map[string]uint64
uch chan struct{}
compressOK bool
inMonitor bool
sa *streamAssignment // What the meta controller uses to assign streams to peers.
node RaftNode // Our RAFT node for the stream's group.
catchup atomic.Bool // Used to signal we are in catchup mode.
catchups map[string]uint64 // The number of messages that need to be caught per peer.
syncSub *subscription // Internal subscription for sync messages (on "$JSC.SYNC").
infoSub *subscription // Internal subscription for stream info requests.
clMu sync.Mutex // The mutex for clseq and clfs.
clseq uint64 // The current last seq being proposed to the NRG layer.
clfs uint64 // The count (offset) of the number of failed NRG sequences used to compute clseq.
inflight map[uint64]uint64 // Inflight message sizes per clseq.
leader string // The current leader for the RAFT group.
lqsent time.Time // The time at which the last lost quorum advisory was sent. Used to rate limit.
uch chan struct{} // The channel to signal updates to the monitor routine.
compressOK bool // True if we can do message compression in RAFT and catchup logic
inMonitor bool // True if the monitor routine has been started.

// Direct get subscription.
directSub *subscription
lastBySub *subscription

monitorWg sync.WaitGroup
monitorWg sync.WaitGroup // Wait group for the monitor routine.
}

type sourceInfo struct {
name string
iname string
cname string
sub *subscription
dsub *subscription
name string // The name of the stream being sourced.
iname string // The unique index name of this particular source.
cname string // The name of the current consumer for this source.
sub *subscription // The subscription to the consumer.

// (mirrors only) The subscription to the direct get request subject for
// the source stream's name on the `_sys_` queue group.
dsub *subscription

// (mirrors only) The subscription to the direct get last per subject request subject for
// the source stream's name on the `_sys_` queue group.
lbsub *subscription
msgs *ipQueue[*inMsg]
sseq uint64
dseq uint64
start time.Time
lag uint64
err *ApiError
fails int
last time.Time
lreq time.Time
qch chan struct{}
sip bool // setup in progress
wg sync.WaitGroup
sf string // subject filter
sfs []string // subject filters
trs []*subjectTransform // subject transforms

msgs *ipQueue[*inMsg] // Intra-process queue for incoming messages.
sseq uint64 // Last stream message sequence number seen from the source.
dseq uint64 // Last delivery (i.e. consumer's) sequence number.
start time.Time // The time of the last message recorded in the stream doing the sourcing.
lag uint64 // 0 or number of messages pending (as last reported by the consumer) - 1.
err *ApiError // The API error that caused the last consumer setup to fail.
fails int // The number of times trying to setup the consumer failed.
last time.Time // Time the consumer was created or of last message it received.
lreq time.Time // The last time setupMirrorConsumer/setupSourceConsumer was called.
qch chan struct{} // Quit channel.
sip bool // Setup in progress.
wg sync.WaitGroup // WaitGroup for the consumer's go routine.
sf string // The subject filter.
sfs []string // The subject filters.
trs []*subjectTransform // The subject transforms.
}

// For mirrors and direct get
Expand Down Expand Up @@ -371,9 +383,9 @@ const (

// Dedupe entry
type ddentry struct {
id string
seq uint64
ts int64
id string // The unique message ID provided by the client.
seq uint64 // The sequence number of the message.
ts int64 // The timestamp of the message.
}

// Replicas Range
Expand Down Expand Up @@ -698,7 +710,8 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
}

// Composes the index name. Contains the stream name, subject filter, and transform destination
// when the stream is external we will use additional information in case the (external) stream names are the same.
// when the stream is external we will use the api prefix as part of the index name
// (as the same stream name could be used in multiple JS domains)
func (ssi *StreamSource) composeIName() string {
var iName = ssi.Name

Expand Down Expand Up @@ -1834,7 +1847,7 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
}
mset.setStartingSequenceForSources(needsStartingSeqNum)
for iName := range neededCopy {
mset.setSourceConsumer(iName, mset.sources[iName].sseq+1, time.Time{})
mset.setupSourceConsumer(iName, mset.sources[iName].sseq+1, time.Time{})
}
}
}
Expand Down Expand Up @@ -2751,7 +2764,7 @@ func (mset *stream) retrySourceConsumer(iName string) {
}
}

// Same than setSourceConsumer but simply issue a debug statement indicating
// Same than setupSourceConsumer but simply issue a debug statement indicating
// that there is a retry.
//
// Lock should be held.
Expand All @@ -2760,8 +2773,8 @@ func (mset *stream) retrySourceConsumerAtSeq(iname string, seq uint64) {

s.Debugf("Retrying source consumer for '%s > %s'", mset.acc.Name, mset.cfg.Name)

// setSourceConsumer will check that the source is still configured.
mset.setSourceConsumer(iname, seq, time.Time{})
// setupSourceConsumer will check that the source is still configured.
mset.setupSourceConsumer(iname, seq, time.Time{})
}

// Lock should be held.
Expand Down Expand Up @@ -2801,13 +2814,13 @@ func (mset *stream) cancelSourceInfo(si *sourceInfo) {

const sourceConsumerRetryThreshold = 2 * time.Second

// This will schedule a call to setSourceConsumer, taking into account the last
// time it was retried and determine the soonest setSourceConsumer can be called
// This will schedule a call to setupSourceConsumer, taking into account the last
// time it was retried and determine the soonest setupSourceConsumer can be called
// without tripping the sourceConsumerRetryThreshold.
//
// Lock held on entry
func (mset *stream) scheduleSetSourceConsumerRetry(si *sourceInfo, seq uint64, startTime time.Time) {
// We are trying to figure out how soon we can retry. setSourceConsumer will reject
// We are trying to figure out how soon we can retry. setupSourceConsumer will reject
// a retry if last was done less than "sourceConsumerRetryThreshold" ago.
next := sourceConsumerRetryThreshold - time.Since(si.lreq)
if next < 0 {
Expand All @@ -2823,7 +2836,7 @@ func (mset *stream) scheduleSetSourceConsumerRetry(si *sourceInfo, seq uint64, s
mset.scheduleSetSourceConsumer(si.iname, seq, next, startTime)
}

// Simply schedules setSourceConsumer at the given delay.
// Simply schedules setupSourceConsumer at the given delay.
//
// Lock held on entry
func (mset *stream) scheduleSetSourceConsumer(iname string, seq uint64, delay time.Duration, startTime time.Time) {
Expand All @@ -2842,12 +2855,12 @@ func (mset *stream) scheduleSetSourceConsumer(iname string, seq uint64, delay ti
defer mset.mu.Unlock()

delete(mset.sourceRetries, iname)
mset.setSourceConsumer(iname, seq, startTime)
mset.setupSourceConsumer(iname, seq, startTime)
})
}

// Lock should be held.
func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.Time) {
func (mset *stream) setupSourceConsumer(iname string, seq uint64, startTime time.Time) {
// Ignore if closed.
if mset.closed.Load() {
return
Expand Down Expand Up @@ -3133,8 +3146,8 @@ func (mset *stream) processSourceMsgs(si *sourceInfo, ready *sync.WaitGroup) {
if stalled {
mset.mu.Lock()
// We don't need to schedule here, we are going to simply
// call setSourceConsumer with the current state+1.
mset.setSourceConsumer(iname, si.sseq+1, time.Time{})
// call setupSourceConsumer with the current state+1.
mset.setupSourceConsumer(iname, si.sseq+1, time.Time{})
si.last = time.Now()
mset.mu.Unlock()
}
Expand Down Expand Up @@ -3291,7 +3304,7 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool {
}
// Retry in all type of errors.
// This will make sure the source is still in mset.sources map,
// find the last sequence and then call setSourceConsumer.
// find the last sequence and then call setupSourceConsumer.
mset.retrySourceConsumer(iname)
}
return false
Expand Down Expand Up @@ -3556,7 +3569,7 @@ func (mset *stream) setupSourceConsumers() error {
// Setup our consumers at the proper starting position.
for _, ssi := range mset.cfg.Sources {
if si := mset.sources[ssi.iname]; si != nil {
mset.setSourceConsumer(ssi.iname, si.sseq+1, time.Time{})
mset.setupSourceConsumer(ssi.iname, si.sseq+1, time.Time{})
}
}

Expand Down