Skip to content

Commit

Permalink
MN-716: process rate limiter error (#437)
Browse files Browse the repository at this point in the history
  • Loading branch information
uhzxl committed Sep 10, 2020
1 parent 93c3c56 commit d482cee
Show file tree
Hide file tree
Showing 19 changed files with 297 additions and 605 deletions.
60 changes: 59 additions & 1 deletion connectivity/connectivity.go
Expand Up @@ -6,17 +6,25 @@
package connectivity

import (
"context"
"crypto/tls"
"crypto/x509"
"net/http"
"strings"
"time"

"github.com/go-pg/pg"
"github.com/insolar/insolar/insolar"
"github.com/insolar/insolar/ledger/heavy/exporter"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"

"github.com/insolar/observer/configuration"
"github.com/insolar/observer/internal/dbconn"
"github.com/insolar/observer/internal/pkg/cycle"
"github.com/insolar/observer/observability"
)

Expand Down Expand Up @@ -62,7 +70,20 @@ func Make(cfg *configuration.Observer, obs *observability.Observability) *Connec
if cfg.Replicator.Auth.InsecureTLS {
tlsOption = grpc.WithInsecure()
}
options = []grpc.DialOption{limits, tlsOption, perRPCCred}
unaryRepeater := grpc.WithUnaryInterceptor(func(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption) (err error) {
RetryWhileLimited(ctx, func() error {
err = invoker(ctx, method, req, reply, cc)
return err
}, cfg.DB.AttemptInterval, cfg.DB.Attempts, log)
return err
})
options = []grpc.DialOption{limits, tlsOption, perRPCCred, unaryRepeater}
}

conn, err := grpc.Dial(cfg.Replicator.Addr, options...)
Expand All @@ -74,6 +95,43 @@ func Make(cfg *configuration.Observer, obs *observability.Observability) *Connec
}
}

func RetryWhileLimited(
ctx context.Context,
do func() error,
interval time.Duration,
attempts cycle.Limit,
log insolar.Logger,
) {
counter := cycle.Limit(1)
for {
select {
case <-ctx.Done():
return
default:
}

err := do()
if err != nil {
if ExporterLimited(err) && counter < attempts {
log.WithFields(map[string]interface{}{
"attempt": counter,
"attempt_limit": attempts,
}).Errorf("Exporter rate limit exceeded. Will try again in %s", interval.String())
time.Sleep(interval)
counter++
continue
}
}
return
}
}

func ExporterLimited(err error) bool {
s := status.Convert(err)
return s.Code() == codes.ResourceExhausted &&
strings.Contains(s.Message(), exporter.RateLimitExceededMsg)
}

type Connectivity struct {
pg *pg.DB
grpc *grpc.ClientConn
Expand Down
5 changes: 2 additions & 3 deletions go.mod
Expand Up @@ -15,18 +15,17 @@ require (
github.com/hashicorp/golang-lru v0.5.3
github.com/insolar/insconfig v0.0.0-20200227134411-011eca6dc866
github.com/insolar/insolar v1.8.1
github.com/insolar/mainnet v1.11.1
github.com/insolar/mainnet v1.11.2
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/labstack/echo/v4 v4.1.11
github.com/mitchellh/mapstructure v1.1.2
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/ory/dockertest/v3 v3.5.2
github.com/pelletier/go-toml v1.5.0 // indirect
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.7.1
github.com/stretchr/testify v1.6.1
go.opencensus.io v0.22.1 // indirect
golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d // indirect
gonum.org/v1/gonum v0.6.0 // indirect
google.golang.org/genproto v0.0.0-20191009194640-548a555dbc03 // indirect
google.golang.org/grpc v1.21.0
Expand Down
62 changes: 11 additions & 51 deletions go.sum

Large diffs are not rendered by default.

53 changes: 34 additions & 19 deletions internal/app/observer/grpc/pulse.go
Expand Up @@ -8,8 +8,10 @@ package grpc
import (
"context"
"io"
"time"

"github.com/insolar/observer/configuration"
"github.com/insolar/observer/connectivity"
"github.com/insolar/observer/internal/app/observer"
"github.com/insolar/observer/internal/pkg/cycle"
"github.com/insolar/observer/observability"
Expand Down Expand Up @@ -45,33 +47,46 @@ func (f *PulseFetcher) Fetch(ctx context.Context, last insolar.PulseNumber) (*ob
request := &exporter.GetPulses{Count: 1, PulseNumber: last}
f.log.Infof("Fetching %d pulses from %s", request.Count, last)
var (
resp *exporter.Pulse
resp *exporter.Pulse
attempts = cycle.Limit(0)
)

requestCtx, cancel := context.WithCancel(ctx)
defer cancel()
cycle.UntilConnectionError(func() error {
stream, err := client.Export(getCtxWithClientVersion(requestCtx), request)
if err != nil {
f.log.WithField("request", request).
Error(errors.Wrapf(err, "failed to get gRPC stream from exporter.Export method"))
return err
}

resp, err = stream.Recv()
if err != nil {
// stream is closed, no point of retrying
if err == io.EOF {
f.log.Debug("EOF received, quit")
return nil
for {
stream, err := client.Export(getCtxWithClientVersion(requestCtx), request)
if err != nil {
f.log.WithField("request", request).
Error(errors.Wrapf(err, "failed to get gRPC stream from exporter.Export method"))
return err
}
detectedDeprecatedVersion(err, f.log)

f.log.WithField("request", request).
Error(errors.Wrapf(err, "received error value from pulses gRPC stream"))
resp, err = stream.Recv()
if err != nil {
// stream is closed, no point of retrying
if err == io.EOF {
f.log.Debug("EOF received, quit")
return nil
}
detectedDeprecatedVersion(err, f.log)

f.log.WithField("request", request).
Error(errors.Wrapf(err, "received error value from pulses gRPC stream"))

if connectivity.ExporterLimited(err) && attempts < f.cfg.Replicator.Attempts {
f.log.WithFields(map[string]interface{}{
"attempt": attempts,
"attempt_limit": f.cfg.Replicator.Attempts,
}).Errorf("Exporter rate limit exceeded. Will try again in %s",
f.cfg.Replicator.AttemptInterval.String())
time.Sleep(f.cfg.Replicator.AttemptInterval)
attempts++
continue
}
}
return err
}

return err
}, f.cfg.Replicator.AttemptInterval, f.cfg.Replicator.Attempts, f.log)

if resp == nil {
Expand Down
27 changes: 25 additions & 2 deletions internal/app/observer/grpc/record.go
Expand Up @@ -8,20 +8,24 @@ package grpc
import (
"context"
"io"
"time"

"github.com/insolar/insolar/insolar"
"github.com/insolar/insolar/ledger/heavy/exporter"
"github.com/pkg/errors"

"github.com/insolar/observer/configuration"
"github.com/insolar/observer/connectivity"
"github.com/insolar/observer/internal/app/observer"
"github.com/insolar/observer/internal/pkg/cycle"
"github.com/insolar/observer/observability"
)

//go:generate minimock -g -i github.com/insolar/insolar/ledger/heavy/exporter.RecordExporterClient -s "_mock.go"

type RecordFetcher struct {
log insolar.Logger
cfg *configuration.Observer
client exporter.RecordExporterClient
records observer.RecordStorage //nolint: unused,structcheck
request *exporter.GetRecords
Expand All @@ -35,6 +39,7 @@ func NewRecordFetcher(
request := &exporter.GetRecords{Count: cfg.Replicator.BatchSize}
return &RecordFetcher{
log: obs.Log(),
cfg: cfg,
client: client,
request: request,
}
Expand All @@ -56,11 +61,15 @@ func (f *RecordFetcher) Fetch(
f.request.RecordNumber = 0

batch := make(map[uint32]*exporter.Record)
var counter uint32
shouldIterateFrom := insolar.PulseNumber(0)
var (
counter uint32
shouldIterateFrom = insolar.PulseNumber(0)
attempts = cycle.Limit(0)
)
// Get pulse batches
for {
counter = 0
limitExceeded := false
f.log.Debug("Data request: ", f.request)
stream, err := client.Export(getCtxWithClientVersion(ctx), f.request)

Expand All @@ -79,6 +88,17 @@ func (f *RecordFetcher) Fetch(
if err != nil {
detectedDeprecatedVersion(err, f.log)
f.log.Debugf("received error value from records gRPC stream %v", f.request)
if connectivity.ExporterLimited(err) && attempts < f.cfg.Replicator.Attempts {
limitExceeded = true
f.log.WithFields(map[string]interface{}{
"attempt": attempts,
"attempt_limit": f.cfg.Replicator.Attempts,
}).Errorf("Exporter rate limit exceeded. Will try again in %s",
f.cfg.Replicator.AttemptInterval.String())
time.Sleep(f.cfg.Replicator.AttemptInterval)
attempts++
break
}
return batch, shouldIterateFrom, errors.Wrapf(err, "received error value from records gRPC stream %v", f.request)
}

Expand Down Expand Up @@ -107,6 +127,9 @@ func (f *RecordFetcher) Fetch(
}

f.log.Debug("go to next round, fetched: ", len(batch))
if limitExceeded {
continue
}
// If we get less than batch size, then stop
if counter < f.request.Count {
f.log.Debugf("Exiting: counter %+v", uint32(len(batch)))
Expand Down
5 changes: 3 additions & 2 deletions internal/app/observer/postgres/supply_stats_test.go
Expand Up @@ -9,11 +9,12 @@ package postgres_test

import (
"context"
"testing"
"time"

"github.com/insolar/observer/internal/app/observer"
"github.com/insolar/observer/internal/testutils"
"github.com/insolar/observer/observability"
"testing"
"time"

"github.com/insolar/insolar/insolar/gen"
"github.com/stretchr/testify/require"
Expand Down

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit d482cee

Please sign in to comment.