Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add max connection limit #50413

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions pilot/pkg/bootstrap/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,17 @@ var (
"pilot_info",
"Pilot version and build information.",
)

connectionTotal = monitoring.NewGauge(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicate with

xdsClients = monitoring.NewGauge(

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think their meanings are different. xdsClients represents how many xds streams. connectionTotal represents the number of grpc sessions.

What do you think?

"pilot_connection_total",
"Total number of grpc connections established.",
)
)

func init() {
pilotVersion.With(versionTag.Value(version.Info.String())).Record(1)

connectionTotal.RecordInt(0)
}

func addMonitor(mux *http.ServeMux) error {
Expand Down
20 changes: 20 additions & 0 deletions pilot/pkg/bootstrap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@ import (
"time"

"github.com/fsnotify/fsnotify"
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus"
"golang.org/x/net/http2"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/reflection"
grpcstatus "google.golang.org/grpc/status"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/rest"

Expand Down Expand Up @@ -171,6 +174,8 @@ type Server struct {
statusManager *status.Manager
// RWConfigStore is the configstore which allows updates, particularly for status.
RWConfigStore model.ConfigStoreController

connectionCounter atomic.Int64
}

type readinessFlags struct {
Expand Down Expand Up @@ -731,6 +736,7 @@ func (s *Server) initGrpcServer(options *istiokeepalive.Options) {
grpcprom.UnaryServerInterceptor,
}
grpcOptions := istiogrpc.ServerOptions(options, interceptors...)
grpcOptions = append(grpcOptions, grpc.StreamInterceptor(middleware.ChainStreamServer(s.StreamServerOverloadInterceptor)))
s.grpcServer = grpc.NewServer(grpcOptions...)
s.XDSServer.Register(s.grpcServer)
reflection.Register(s.grpcServer)
Expand Down Expand Up @@ -780,6 +786,7 @@ func (s *Server) initSecureDiscoveryService(args *PilotArgs) error {
}
opts := istiogrpc.ServerOptions(args.KeepaliveOptions, interceptors...)
opts = append(opts, grpc.Creds(tlsCreds))
opts = append(opts, grpc.StreamInterceptor(middleware.ChainStreamServer(s.StreamServerOverloadInterceptor)))

s.secureGrpcServer = grpc.NewServer(opts...)
s.XDSServer.Register(s.secureGrpcServer)
Expand Down Expand Up @@ -1348,3 +1355,16 @@ func (s *Server) initReadinessProbes() {
s.addReadinessProbe(name, probe)
}
}

func (s *Server) StreamServerOverloadInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
current := s.connectionCounter.Add(1)
connectionTotal.RecordInt(current)
defer func() {
connectionTotal.RecordInt(s.connectionCounter.Add(-1))
}()
if features.ConnectionLimit > 0 && int64(features.ConnectionLimit) < current {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use that DiscoveryServer.adsClientCount

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DiscoveryServer.adsClients change in stream handler(initConnection). It is not synchronized with grpc's dial. I think this will cause some problems.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need such precise limits? The grpc's dial time should be very short, and PILOT_MAX_REQUESTS_PER_SECOND can limit burst connections. This PR is just to limit the continued increase in connections, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is just to limit the continued increase in connections, right?

Yes.
There is a gap between initConnection and grpc.dial. If use DiscoveryServer.adsClients, ,this feature will invalid in extreme cases. I feel it's better to be more precise. What do you think about that? Thanks.

return grpcstatus.Errorf(codes.ResourceExhausted, "connection limit exceeded")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incredibly unsafe. Unless you have a fixed number of istiod pods and clients, having hard limits around connections without having autoscaling tied to this limit is a recipe to trigger an outage where all connections are denied. It also makes a DOS attack trivial

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Curious how much memory you have allocated for Istiod and how many total clients you have + and how many clients connected to single Istiod caused Istiod to get OOMKilled?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our test cluster, each instance is limited to 16c40g.(replicas 10+, fixed number) Each instance usually maintains more than 700 connections, and the memory usage is 22g-26g. When the cluster is thrashing (some failures), it is easy for a single instance to reach the memory limit. Our production cluster has many more clients.

So I think: when the resources of an instance are fixed, the upper limit of clients it can serve is also fixed.
This feature(pr) is disabled by default

}
err := handler(srv, ss)
return err
}
235 changes: 235 additions & 0 deletions pilot/pkg/bootstrap/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,31 @@ import (
"context"
"crypto/tls"
"fmt"
"math/rand"
"net"
"net/http"
"os"
"path/filepath"
"strconv"
"sync"
"testing"
"time"

discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
. "github.com/onsi/gomega"
"golang.org/x/net/http2"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
cert "k8s.io/api/certificates/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"istio.io/istio/pilot/pkg/features"
"istio.io/istio/pilot/pkg/keycertbundle"
"istio.io/istio/pilot/pkg/server"
kubecontroller "istio.io/istio/pilot/pkg/serviceregistry/kube/controller"
"istio.io/istio/pilot/pkg/xds"
v3 "istio.io/istio/pilot/pkg/xds/v3"
"istio.io/istio/pkg/config/constants"
"istio.io/istio/pkg/filewatcher"
"istio.io/istio/pkg/kube"
Expand Down Expand Up @@ -767,3 +776,229 @@ func TestGetDNSNames(t *testing.T) {
})
}
}

func TestMaxConnection(t *testing.T) {
tests := []struct {
name string
limit int
}{
{
name: "small limit",
limit: 4,
},
{
name: "medium limit",
limit: 100,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
testMaxConnection(t, tc.limit)
})
}
}

func testMaxConnection(t *testing.T, limit int) {

features.ConnectionLimit = limit
features.RequestLimit = 100

t.Logf("test max connection use limit: %d, qps: %d\n", limit, int(features.RequestLimit))

g := NewGomegaWithT(t)
configDir := t.TempDir()
args := NewPilotArgs(func(p *PilotArgs) {
p.Namespace = "istio-system"
p.ServerOptions = DiscoveryServerOptions{
HTTPAddr: ":8080",
HTTPSAddr: ":15017",
GRPCAddr: ":15010",
SecureGRPCAddr: ":15012",
MonitoringAddr: ":15014",
}
p.RegistryOptions = RegistryOptions{
FileDir: configDir,
}

p.ShutdownDuration = 1 * time.Millisecond

})

s, err := NewServer(args, func(s *Server) {
s.kubeClient = kube.NewFakeClient()
})
g.Expect(err).To(Succeed())
stop := make(chan struct{})
g.Expect(s.Start(stop)).To(Succeed())
defer func() {
close(stop)
s.WaitUntilCompletion()
}()

limiter := rate.NewLimiter(rate.Limit(50), int(10))
dataPlane := NewFakeDataPlane()
defer dataPlane.Clear()

// init part connection
g.Expect(dataPlane.AddAgentN(t, "localhost:15010", features.ConnectionLimit/2+1, limiter)).To(Succeed())
t.Logf("Initialize %d connections", dataPlane.Size())

// burst connection
var wg sync.WaitGroup
for i := 0; i < features.ConnectionLimit; i++ {
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(time.Second * time.Duration(rand.Intn(4)))
_, err := dataPlane.AddAgent(t, "localhost:15010", v3.EndpointType)
if err != nil {
return
}
}()
}
wg.Wait()

t.Logf("after burst: %d connections", dataPlane.Size())

if dataPlane.Size() > features.ConnectionLimit {
t.Fatalf("ConnectionLimit %d exceeded", limit)
}

dataPlane.KeepAlive(t)
t.Logf("%d connections keep alive", dataPlane.Size())

if dataPlane.Size() < features.ConnectionLimit/2 || dataPlane.Size() > features.ConnectionLimit {
t.Fatalf("connection is not as expected %d:%d", dataPlane.Size(), features.ConnectionLimit)
} else if dataPlane.Size() != len(s.XDSServer.AllClients()) {
t.Fatalf("inconsistent connection")
} else {
t.Logf("establish %d connections", dataPlane.Size())
}

}

type FakeDataPlane struct {
locker sync.RWMutex
agents map[string]*xds.AdsTest
counter int

ctx context.Context
cancel context.CancelFunc
}

func NewFakeDataPlane() *FakeDataPlane {
datePlane := &FakeDataPlane{
agents: make(map[string]*xds.AdsTest),
counter: 0,
}
datePlane.ctx, datePlane.cancel = context.WithTimeout(context.TODO(), time.Minute*5)
return datePlane
}

func (p *FakeDataPlane) Size() int {
p.locker.RLock()
defer p.locker.RUnlock()
return len(p.agents)
}

func (p *FakeDataPlane) Clear() {
p.cancel()
p.locker.Lock()
defer p.locker.Unlock()
for _, a := range p.agents {
a.Cleanup()
}
p.agents = nil
}

func (p *FakeDataPlane) KeepAlive(t test.Failer) int {
p.locker.RLock()
var agents []*xds.AdsTest
for _, a := range p.agents {
agents = append(agents, a)
}
p.locker.RUnlock()

var wg sync.WaitGroup
for _, a := range agents {
wg.Add(1)
go func(agent *xds.AdsTest) {
defer wg.Done()
if err := DoRequest(t, agent, NewDiscoveryRequest("fake-cluster")); err != nil {
p.locker.Lock()
defer p.locker.Unlock()
delete(p.agents, agent.ID)
}
}(a)
}
wg.Wait()

return p.Size()
}

func (p *FakeDataPlane) AddAgentN(t test.Failer, discoveryAddress string, n int, limiter *rate.Limiter) error {
for i := 0; i < n; i++ {
if err := limiter.Wait(p.ctx); err != nil {
return err
}
_, _ = p.AddAgent(t, discoveryAddress, v3.EndpointType)
}
return nil
}

func (p *FakeDataPlane) AddAgent(t test.Failer, discoveryAddress string, typeURL string) (*xds.AdsTest, error) {
conn, err := grpc.DialContext(p.ctx, discoveryAddress,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
if err != nil {
t.Fatalf("failed to connect: %v", err)
}

id := p.nextId()
ads := xds.NewAdsTest(t, conn)
ads.WithType(typeURL).WithID(id).WithTimeout(time.Second * 2)

if err := DoRequest(t, ads, NewDiscoveryRequest("fake-cluster")); err != nil {
return nil, err
}

p.locker.Lock()
defer p.locker.Unlock()
p.agents[id] = ads

return ads, nil
}

func (p *FakeDataPlane) nextId() string {
defer func() {
p.counter++
}()
return "sidecar~1.1.1.1~test.default." + strconv.Itoa(p.counter) + "~default.svc.cluster.local"
}

func NewDiscoveryRequest(resource string) *discovery.DiscoveryRequest {
req := &discovery.DiscoveryRequest{}

if len(resource) > 0 {
req.ResourceNames = []string{resource}
}

return req
}

func DoRequest(t test.Failer, ads *xds.AdsTest, req *discovery.DiscoveryRequest) error {
if err := ads.HasError(t); err != nil {
return err
}
ads.Request(t, req)
resp, err := ads.HasResponse(t)
if err != nil {
return err
}
req.ResponseNonce = resp.Nonce
req.VersionInfo = resp.VersionInfo
ads.Request(t, req)
return nil
}
7 changes: 7 additions & 0 deletions pilot/pkg/features/tuning.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ import (

// Define performance tuning related features here.
var (
ConnectionLimit = env.RegisterIntVar(
"PILOT_MAX_CONNECTION",
0,
"Limits the number of incoming ADS connection. "+
"If set to 0 or unset, unlimited connections",
).Get()

MaxConcurrentStreams = env.Register(
"ISTIO_GPRC_MAXSTREAMS",
100000,
Expand Down
28 changes: 28 additions & 0 deletions pilot/pkg/xds/adstest.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package xds

import (
"context"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -147,6 +148,22 @@ func (a *AdsTest) ExpectResponse(t test.Failer) *discovery.DiscoveryResponse {
return nil
}

// HasResponse waits until a response is received and returns it, no panic
func (a *AdsTest) HasResponse(t test.Failer) (*discovery.DiscoveryResponse, error) {
t.Helper()
select {
case <-time.After(a.timeout):
return nil, fmt.Errorf("did not get response in time")
case resp := <-a.responses:
if resp == nil || len(resp.Resources) == 0 {
t.Fatalf("got empty response")
}
return resp, nil
case err := <-a.error:
return nil, err
}
}

// ExpectError waits until an error is received and returns it
func (a *AdsTest) ExpectError(t test.Failer) error {
t.Helper()
Expand All @@ -159,6 +176,17 @@ func (a *AdsTest) ExpectError(t test.Failer) error {
return nil
}

// HasError waits until an error is received and returns it, no panic
func (a *AdsTest) HasError(t test.Failer) error {
t.Helper()
select {
case <-time.After(a.timeout):
return nil
case err := <-a.error:
return err
}
}

// ExpectNoResponse waits a short period of time and ensures no response is received
func (a *AdsTest) ExpectNoResponse(t test.Failer) {
t.Helper()
Expand Down