Skip to content

Commit

Permalink
CBG-3247 turn channel send into blocking call (#6349) (#6372)
Browse files Browse the repository at this point in the history
* CBG-3247 turn channel send into blocking call

* Add comments and reorder structs for memory optimization (#6353)
  • Loading branch information
torcolvin committed Aug 21, 2023
1 parent d26ad31 commit 4c0d321
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 24 deletions.
8 changes: 1 addition & 7 deletions base/dcp_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,13 +497,7 @@ func (dc *DCPClient) openStreamRequest(vbID uint16) error {
if err == nil {
err = dc.verifyFailoverLog(vbID, f)
if err == nil {
e := streamOpenEvent{
streamEventCommon: streamEventCommon{
vbID: vbID,
},
failoverLogs: f,
}
dc.workerForVbno(vbID).Send(e)
dc.metadata.SetFailoverEntries(vbID, f)
}
}
openStreamError <- err
Expand Down
35 changes: 20 additions & 15 deletions base/dcp_client_stream_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,43 +15,44 @@ import (
sgbucket "github.com/couchbase/sg-bucket"
)

// streamEvent is an interface for events that can be sent on a DCP stream.
type streamEvent interface {
VbID() uint16
}

// streamEventCommon is a struct that contains common fields for all stream events.
type streamEventCommon struct {
vbID uint16
streamID uint16
}

// VbID return the vBucket ID for the event.
func (sec streamEventCommon) VbID() uint16 {
return sec.vbID
}

// snapshotEvent represents a DCP snapshot event (opcode 0x56).
type snapshotEvent struct {
streamEventCommon
startSeq uint64
endSeq uint64
snapshotType gocbcore.SnapshotState
streamEventCommon
}

// mutationEvent represents a DCP mutation event (opcode 0x57).
type mutationEvent struct {
streamEventCommon
key []byte
value []byte
seq uint64
cas uint64
flags uint32
expiry uint32
cas uint64
datatype uint8
collection uint32
key []byte
value []byte
}

type streamOpenEvent struct {
streamEventCommon
failoverLogs []gocbcore.FailoverEntry
datatype uint8
}

// asFeedEvent converts a mutationEvent to a sgbucket.FeedEvent.
func (e mutationEvent) asFeedEvent() sgbucket.FeedEvent {
return sgbucket.FeedEvent{
Opcode: sgbucket.FeedOpMutation,
Expand All @@ -67,16 +68,18 @@ func (e mutationEvent) asFeedEvent() sgbucket.FeedEvent {
}
}

// deletionEvent represents a DCP deletion event (opcode 0x58).
type deletionEvent struct {
streamEventCommon
key []byte
value []byte
seq uint64
cas uint64
datatype uint8
collection uint32
key []byte
value []byte
datatype uint8
streamEventCommon
}

// asFeedEvent converts a deletionEvent to a sgbucket.FeedEvent.
func (e deletionEvent) asFeedEvent() sgbucket.FeedEvent {
return sgbucket.FeedEvent{
Opcode: sgbucket.FeedOpDeletion,
Expand All @@ -90,11 +93,13 @@ func (e deletionEvent) asFeedEvent() sgbucket.FeedEvent {
}
}

// endStreamEvent represents a DCP end stream event, and the error associated with the stream end (opcode 0x55).
type endStreamEvent struct {
streamEventCommon
err error
streamEventCommon
}

// seqnoAdvancedEvent represents a DCP Seqno advanced event (opcode 0x64).
type seqnoAdvancedEvent struct {
streamEventCommon
seq uint64
Expand Down
2 changes: 0 additions & 2 deletions base/dcp_client_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,6 @@ func (w *DCPWorker) Start(wg *sync.WaitGroup) {
case event := <-w.eventFeed:
vbID := event.VbID()
switch e := event.(type) {
case streamOpenEvent:
w.metadata.SetFailoverEntries(e.vbID, e.failoverLogs)
case snapshotEvent:
// Set pending snapshot - don't persist to meta until we receive first sequence in the snapshot,
// to avoid attempting to restart with a new snapshot and old sequence value
Expand Down

0 comments on commit 4c0d321

Please sign in to comment.