Skip to content

Commit

Permalink
awslogs: Prevent close from being blocked on log
Browse files Browse the repository at this point in the history
Before this change a call to `Close` could be blocked if the the channel
used to buffer logs is full.
When this happens the container state will end up wedged causing a
deadlock on anything that needs to lock the container state.

This removes the use of a channel which has semantics which are
difficult to manage to something more suitable for the situation.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
  • Loading branch information
cpuguy83 committed Apr 23, 2024
1 parent faf84d7 commit 9f0a44a
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 48 deletions.
59 changes: 37 additions & 22 deletions daemon/logger/awslogs/cloudwatchlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"regexp"
"sort"
"strconv"
"sync"
"sync/atomic"
"time"
"unicode/utf8"

Expand Down Expand Up @@ -76,10 +76,11 @@ type logStream struct {
forceFlushInterval time.Duration
multilinePattern *regexp.Regexp
client api
messages chan *logger.Message
lock sync.RWMutex
closed bool
sequenceToken *string

messages loggerutils.MessageQueue
closed atomic.Bool

sequenceToken *string
}

type logStreamConfig struct {
Expand Down Expand Up @@ -158,22 +159,21 @@ func New(info logger.Info) (logger.Logger, error) {
forceFlushInterval: containerStreamConfig.forceFlushInterval,
multilinePattern: containerStreamConfig.multilinePattern,
client: client,
messages: make(chan *logger.Message, containerStreamConfig.maxBufferedEvents),
}

containerStream.messages.Grow(containerStreamConfig.maxBufferedEvents)

creationDone := make(chan bool)
if logNonBlocking {
go func() {
backoff := 1
maxBackoff := 32
for {
// If logger is closed we are done
containerStream.lock.RLock()
if containerStream.closed {
containerStream.lock.RUnlock()
if containerStream.closed.Load() {
break
}
containerStream.lock.RUnlock()

err := containerStream.create()
if err == nil {
break
Expand Down Expand Up @@ -426,25 +426,22 @@ func (l *logStream) BufSize() int {
return maximumBytesPerEvent
}

var errClosed = errors.New("awslogs is closed")

// Log submits messages for logging by an instance of the awslogs logging driver
func (l *logStream) Log(msg *logger.Message) error {
l.lock.RLock()
defer l.lock.RUnlock()
if l.closed {
return errors.New("awslogs is closed")
// No need to check if we are closed here since the queue will be closed
// (i.e. returns false) in this case.
if !l.messages.Enqueue(msg) {
return errClosed
}
l.messages <- msg
return nil
}

// Close closes the instance of the awslogs logging driver
func (l *logStream) Close() error {
l.lock.Lock()
defer l.lock.Unlock()
if !l.closed {
close(l.messages)
}
l.closed = true
l.closed.Store(true)
l.messages.Close()
return nil
}

Expand Down Expand Up @@ -561,6 +558,23 @@ func (l *logStream) collectBatch(created chan bool) {
var eventBuffer []byte
var eventBufferTimestamp int64
batch := newEventBatch()

type dequeueVal struct {
msg *logger.Message
more bool
}

chLogs := make(chan *dequeueVal, 1)
go func() {
for {
msg, more := l.messages.Dequeue()
chLogs <- &dequeueVal{msg, more}
if !more {
return
}
}
}()

for {
select {
case t := <-ticker.C:
Expand All @@ -576,7 +590,8 @@ func (l *logStream) collectBatch(created chan bool) {
}
l.publishBatch(batch)
batch.reset()
case msg, more := <-l.messages:
case v := <-chLogs:
msg, more := v.msg, v.more
if !more {
// Flush event buffer and release resources
l.processEvent(batch, eventBuffer, eventBufferTimestamp)
Expand Down
32 changes: 6 additions & 26 deletions daemon/logger/awslogs/cloudwatchlogs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,8 @@ func TestLogClosed(t *testing.T) {
mockClient := &mockClient{}
stream := &logStream{
client: mockClient,
closed: true,
}
stream.closed.Store(true)
err := stream.Log(&logger.Message{})
assert.Check(t, err != nil)
}
Expand All @@ -369,8 +369,7 @@ func TestLogClosed(t *testing.T) {
func TestLogBlocking(t *testing.T) {
mockClient := &mockClient{}
stream := &logStream{
client: mockClient,
messages: make(chan *logger.Message),
client: mockClient,
}

errorCh := make(chan error, 1)
Expand All @@ -387,14 +386,11 @@ func TestLogBlocking(t *testing.T) {
t.Fatal("Expected stream.Log to block: ", err)
default:
}

// assuming it is blocked, we can now try to drain the internal channel and
// unblock it
select {
case <-time.After(10 * time.Millisecond):
// if we're unable to drain the channel within 10ms, something seems broken
t.Fatal("Expected to be able to read from stream.messages but was unable to")
case <-stream.messages:
}
stream.messages.Dequeue()

select {
case err := <-errorCh:
assert.NilError(t, err)
Expand All @@ -407,8 +403,7 @@ func TestLogBlocking(t *testing.T) {
func TestLogBufferEmpty(t *testing.T) {
mockClient := &mockClient{}
stream := &logStream{
client: mockClient,
messages: make(chan *logger.Message, 1),
client: mockClient,
}
err := stream.Log(&logger.Message{})
assert.NilError(t, err)
Expand Down Expand Up @@ -556,7 +551,6 @@ func TestCollectBatchSimple(t *testing.T) {
logGroupName: groupName,
logStreamName: streamName,
sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message),
}
calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
mockClient.putLogEventsFunc = func(ctx context.Context, input *cloudwatchlogs.PutLogEventsInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.PutLogEventsOutput, error) {
Expand Down Expand Up @@ -598,7 +592,6 @@ func TestCollectBatchTicker(t *testing.T) {
logGroupName: groupName,
logStreamName: streamName,
sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message),
}
calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
called := make(chan struct{}, 50)
Expand Down Expand Up @@ -666,7 +659,6 @@ func TestCollectBatchMultilinePattern(t *testing.T) {
logStreamName: streamName,
multilinePattern: multilinePattern,
sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message),
}
calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
called := make(chan struct{}, 50)
Expand Down Expand Up @@ -732,7 +724,6 @@ func BenchmarkCollectBatch(b *testing.B) {
logGroupName: groupName,
logStreamName: streamName,
sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message),
}
mockClient.putLogEventsFunc = func(ctx context.Context, input *cloudwatchlogs.PutLogEventsInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.PutLogEventsOutput, error) {
return &cloudwatchlogs.PutLogEventsOutput{
Expand Down Expand Up @@ -765,7 +756,6 @@ func BenchmarkCollectBatchMultilinePattern(b *testing.B) {
logStreamName: streamName,
multilinePattern: multilinePattern,
sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message),
}
mockClient.putLogEventsFunc = func(ctx context.Context, input *cloudwatchlogs.PutLogEventsInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.PutLogEventsOutput, error) {
return &cloudwatchlogs.PutLogEventsOutput{
Expand Down Expand Up @@ -796,7 +786,6 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) {
logStreamName: streamName,
multilinePattern: multilinePattern,
sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message),
}
calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
called := make(chan struct{}, 50)
Expand Down Expand Up @@ -870,7 +859,6 @@ func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) {
logStreamName: streamName,
multilinePattern: multilinePattern,
sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message),
}
calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
called := make(chan struct{}, 50)
Expand Down Expand Up @@ -927,7 +915,6 @@ func TestCollectBatchMultilinePatternMaxEventSize(t *testing.T) {
logStreamName: streamName,
multilinePattern: multilinePattern,
sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message),
}
calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
called := make(chan struct{}, 50)
Expand Down Expand Up @@ -987,7 +974,6 @@ func TestCollectBatchClose(t *testing.T) {
logGroupName: groupName,
logStreamName: streamName,
sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message),
}
calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
called := make(chan struct{}, 50)
Expand Down Expand Up @@ -1089,7 +1075,6 @@ func TestCollectBatchLineSplit(t *testing.T) {
logGroupName: groupName,
logStreamName: streamName,
sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message),
}
calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
called := make(chan struct{}, 50)
Expand Down Expand Up @@ -1137,7 +1122,6 @@ func TestCollectBatchLineSplitWithBinary(t *testing.T) {
logGroupName: groupName,
logStreamName: streamName,
sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message),
}
calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
called := make(chan struct{}, 50)
Expand Down Expand Up @@ -1185,7 +1169,6 @@ func TestCollectBatchMaxEvents(t *testing.T) {
logGroupName: groupName,
logStreamName: streamName,
sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message),
}
calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
called := make(chan struct{}, 50)
Expand Down Expand Up @@ -1239,7 +1222,6 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) {
logGroupName: groupName,
logStreamName: streamName,
sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message),
}
calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
called := make(chan struct{}, 50)
Expand Down Expand Up @@ -1320,7 +1302,6 @@ func TestCollectBatchMaxTotalBytesWithBinary(t *testing.T) {
logGroupName: groupName,
logStreamName: streamName,
sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message),
}
calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
called := make(chan struct{}, 50)
Expand Down Expand Up @@ -1394,7 +1375,6 @@ func TestCollectBatchWithDuplicateTimestamps(t *testing.T) {
logGroupName: groupName,
logStreamName: streamName,
sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message),
}
calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
called := make(chan struct{}, 50)
Expand Down

0 comments on commit 9f0a44a

Please sign in to comment.