Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/services: add Ticker #311

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 39 additions & 0 deletions pkg/services/ticker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package services

import (
"time"

"github.com/smartcontractkit/chainlink-common/pkg/timeutil"
)

// DefaultJitter is +/-10%
const DefaultJitter timeutil.JitterPct = 0.1

// NewTicker returns a new timeutil.Ticker configured to:
// - fire the first tick immediately
// - apply DefaultJitter to each period
func NewTicker(period time.Duration) *timeutil.Ticker {
return TickerConfig{JitterPct: DefaultJitter}.NewTicker(period)
}

type TickerConfig struct {
// Initial delay before the first tick.
Initial time.Duration
// JitterPct to apply to each period.
JitterPct timeutil.JitterPct
}

func (c TickerConfig) NewTicker(period time.Duration) *timeutil.Ticker {
first := true
return timeutil.NewTicker(func() time.Duration {
if first {
first = false
return c.Initial
}
p := period
if c.JitterPct != 0.0 {
p = c.JitterPct.Apply(p)
}
return p
})
}
22 changes: 22 additions & 0 deletions pkg/timeutil/jitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package timeutil

import (
mrand "math/rand"
"time"
)

// JitterPct is a percent by which to scale a duration up or down.
// For example, 0.1 will result in +/- 10%.
type JitterPct float64

func (p JitterPct) Apply(d time.Duration) time.Duration {
// #nosec
if d == 0 {
return 0
}
// ensure non-zero arg to Intn to avoid panic
ub := max(1, int(float64(d.Abs())*float64(p)))
// #nosec - non critical randomness
jitter := mrand.Intn(2*ub) - ub
return time.Duration(int(d) + jitter)
}
29 changes: 29 additions & 0 deletions pkg/timeutil/jitter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package timeutil

import (
"testing"
"time"
)

func TestJitterPct(t *testing.T) {
for _, tt := range []struct {
pct JitterPct
dur time.Duration
from, to time.Duration
}{
{0.1, 0, 0, 0},
{0.1, time.Second, 900 * time.Millisecond, 1100 * time.Millisecond},
{0.1, time.Minute, 54 * time.Second, 66 * time.Second},
{0.1, 24 * time.Hour, 21*time.Hour + 36*time.Minute, 26*time.Hour + 24*time.Minute},
} {
t.Run(tt.dur.String(), func(t *testing.T) {
for i := 0; i < 100; i++ {
got := tt.pct.Apply(tt.dur)
t.Logf("%d: %s", i, got)
if got < tt.from || got > tt.to {
t.Errorf("expected duration %s with jitter to be between (%s, %s) but got: %s", tt.dur, tt.from, tt.to, got)
}
}
})
}
}
58 changes: 58 additions & 0 deletions pkg/timeutil/ticker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package timeutil

import (
"time"
)

// Ticker is like time.Ticker, but with a variable period.
type Ticker struct {
C <-chan time.Time
stop chan struct{}
reset chan struct{}
}

// NewTicker returns a started Ticker which calls nextDur for each period.
// Ticker.Stop should be called to prevent goroutine leaks.
func NewTicker(nextDur func() time.Duration) *Ticker {
c := make(chan time.Time) // unbuffered so we block and delay if not being handled
t := Ticker{C: c, stop: make(chan struct{}), reset: make(chan struct{})}
go t.run(c, nextDur)
return &t
}

// Stop permanently stops the Ticker. It cannot be Reset.
func (t *Ticker) Stop() { close(t.stop) }

func (t *Ticker) run(c chan<- time.Time, nextDur func() time.Duration) {
for {
timer := time.NewTimer(nextDur())
select {
case <-t.stop:
timer.Stop()
return

case <-t.reset:
timer.Stop()

case <-timer.C:
timer.Stop()
select {
case <-t.stop:
return
case c <- time.Now():
case <-t.reset:
}
}
}
}

// Reset starts a new period.
func (t *Ticker) Reset() {
select {
case <-t.stop:
case t.reset <- struct{}{}:
default:
// unnecessary
return
}
}
24 changes: 9 additions & 15 deletions pkg/utils/mailbox/mailbox_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"
)

var mailboxLoad = promauto.NewGaugeVec(prometheus.GaugeOpts{
Expand All @@ -30,33 +29,26 @@ type Monitor struct {
lggr logger.Logger

mailboxes sync.Map
stop func()
stopCh services.StopChan
done chan struct{}
}

func NewMonitor(appID string, lggr logger.Logger) *Monitor {
return &Monitor{appID: appID, lggr: logger.Named(lggr, "Monitor")}
return &Monitor{appID: appID, lggr: logger.Named(lggr, "Monitor"), stopCh: make(services.StopChan), done: make(chan struct{})}
}

func (m *Monitor) Name() string { return m.lggr.Name() }

func (m *Monitor) Start(context.Context) error {
return m.StartOnce("Monitor", func() error {
t := time.NewTicker(utils.WithJitter(mailboxPromInterval))
ctx, cancel := context.WithCancel(context.Background())
m.stop = func() {
t.Stop()
cancel()
}
m.done = make(chan struct{})
go m.monitorLoop(ctx, t.C)
go m.monitorLoop()
return nil
})
}

func (m *Monitor) Close() error {
return m.StopOnce("Monitor", func() error {
m.stop()
close(m.stopCh)
<-m.done
return nil
})
Expand All @@ -66,13 +58,15 @@ func (m *Monitor) HealthReport() map[string]error {
return map[string]error{m.Name(): m.Healthy()}
}

func (m *Monitor) monitorLoop(ctx context.Context, c <-chan time.Time) {
func (m *Monitor) monitorLoop() {
defer close(m.done)
t := services.NewTicker(mailboxPromInterval)
defer t.Stop()
for {
select {
case <-ctx.Done():
case <-m.stopCh:
return
case <-c:
case <-t.C:
m.mailboxes.Range(func(k, v any) bool {
name, mb := k.(string), v.(mailbox)
c, p := mb.load()
Expand Down
19 changes: 4 additions & 15 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,16 @@ package utils
import (
"context"
"fmt"
"math"
mrand "math/rand"
"sync"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/timeutil"
)

// WithJitter adds +/- 10% to a duration
func WithJitter(d time.Duration) time.Duration {
// #nosec
if d == 0 {
return 0
}
// ensure non-zero arg to Intn to avoid panic
max := math.Max(float64(d.Abs())/5.0, 1.)
// #nosec - non critical randomness
jitter := mrand.Intn(int(max))
jitter = jitter - (jitter / 2)
return time.Duration(int(d) + jitter)
}
// WithJitter adds +/- 10% to a duration.
// Deprecated: use timeutil.WithJitter
func WithJitter(d time.Duration) time.Duration { return timeutil.JitterPct(0.1).Apply(d) }

// ContextFromChan creates a context that finishes when the provided channel
// receives or is closed.
Expand Down