Skip to content

Commit

Permalink
feat: add instance dialing to health check (#871)
Browse files Browse the repository at this point in the history
* Add health check flags to README
* Dial all specified instances in non-fuse modes
* Add sample yaml config w/ probes
* Validate instances in fuse mode
* Add end to end tests for successful dialing

Co-authored-by: Mona Zhang <monazhang@google.com>
  • Loading branch information
tifftoff and monazhn committed Aug 12, 2021
1 parent 758d9a0 commit eca3793
Show file tree
Hide file tree
Showing 10 changed files with 313 additions and 49 deletions.
9 changes: 7 additions & 2 deletions cmd/cloud_sql_proxy/cloud_sql_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,9 +570,14 @@ func runProxy() int {

var hc *healthcheck.Server
if *useHTTPHealthCheck {
hc, err = healthcheck.NewServer(proxyClient, *healthCheckPort)
// Extract a list of all instances specified statically. List is empty when in fuse mode.
var insts []string
for _, cfg := range cfgs {
insts = append(insts, cfg.Instance)
}
hc, err = healthcheck.NewServer(proxyClient, *healthCheckPort, insts)
if err != nil {
logging.Errorf("Could not initialize health check server: %v", err)
logging.Errorf("[Health Check] Could not initialize health check server: %v", err)
return 1
}
defer hc.Close(ctx)
Expand Down
64 changes: 49 additions & 15 deletions cmd/cloud_sql_proxy/internal/healthcheck/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ type Server struct {
port string
// srv is a pointer to the HTTP server used to communicate proxy health.
srv *http.Server
// instances is a list of all instances specified statically (e.g. as flags to the binary)
instances []string
}

// NewServer initializes a Server and exposes HTTP endpoints used to
// communicate proxy health.
func NewServer(c *proxy.Client, port string) (*Server, error) {
func NewServer(c *proxy.Client, port string, staticInst []string) (*Server, error) {
mux := http.NewServeMux()

srv := &http.Server{
Expand All @@ -57,10 +59,11 @@ func NewServer(c *proxy.Client, port string) (*Server, error) {
}

hcServer := &Server{
started: make(chan struct{}),
once: &sync.Once{},
port: port,
srv: srv,
started: make(chan struct{}),
once: &sync.Once{},
port: port,
srv: srv,
instances: staticInst,
}

mux.HandleFunc(startupPath, func(w http.ResponseWriter, _ *http.Request) {
Expand All @@ -74,7 +77,9 @@ func NewServer(c *proxy.Client, port string) (*Server, error) {
})

mux.HandleFunc(readinessPath, func(w http.ResponseWriter, _ *http.Request) {
if !isReady(c, hcServer) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if !isReady(ctx, c, hcServer) {
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte("error"))
return
Expand All @@ -100,7 +105,7 @@ func NewServer(c *proxy.Client, port string) (*Server, error) {

go func() {
if err := srv.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) {
logging.Errorf("Failed to start health check HTTP server: %v", err)
logging.Errorf("[Health Check] Failed to serve: %v", err)
}
}()

Expand Down Expand Up @@ -132,22 +137,51 @@ func isLive() bool {
return true
}

// isReady will check the following criteria before determining whether the
// proxy is ready for new connections.
// isReady will check the following criteria:
// 1. Finished starting up / been sent the 'Ready for Connections' log.
// 2. Not yet hit the MaxConnections limit, if applicable.
func isReady(c *proxy.Client, s *Server) bool {
// Not ready until we reach the 'Ready for Connections' log
// 2. Not yet hit the MaxConnections limit, if set.
// 3. Able to dial all specified instances without error.
func isReady(ctx context.Context, c *proxy.Client, s *Server) bool {
// Not ready until we reach the 'Ready for Connections' log.
if !s.proxyStarted() {
logging.Errorf("Readiness failed because proxy has not finished starting up.")
logging.Errorf("[Health Check] Readiness failed because proxy has not finished starting up.")
return false
}

// Not ready if the proxy is at the optional MaxConnections limit.
if !c.AvailableConn() {
logging.Errorf("Readiness failed because proxy has reached the maximum connections limit (%d).", c.MaxConnections)
logging.Errorf("[Health Check] Readiness failed because proxy has reached the maximum connections limit (%v).", c.MaxConnections)
return false
}

return true
// Not ready if one or more instances cannot be dialed.
instances := s.instances
if s.instances == nil { // Proxy is in fuse mode.
instances = c.GetInstances()
}

canDial := true
var once sync.Once
var wg sync.WaitGroup

for _, inst := range instances {
wg.Add(1)
go func(inst string) {
defer wg.Done()
conn, err := c.DialContext(ctx, inst)
if err != nil {
logging.Errorf("[Health Check] Readiness failed because proxy couldn't connect to %q: %v", inst, err)
once.Do(func() { canDial = false })
return
}

err = conn.Close()
if err != nil {
logging.Errorf("[Health Check] Readiness: error while closing connection: %v", err)
}
}(inst)
}
wg.Wait()

return canDial
}
118 changes: 87 additions & 31 deletions cmd/cloud_sql_proxy/internal/healthcheck/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ package healthcheck_test

import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"net"
"net/http"
"testing"
"time"

"github.com/GoogleCloudPlatform/cloudsql-proxy/cmd/cloud_sql_proxy/internal/healthcheck"
"github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/proxy"
Expand All @@ -30,9 +35,23 @@ const (
testPort = "8090"
)

type fakeCertSource struct{}

func (cs *fakeCertSource) Local(instance string) (tls.Certificate, error) {
return tls.Certificate{
Leaf: &x509.Certificate{
NotAfter: time.Date(9999, 0, 0, 0, 0, 0, 0, time.UTC),
},
}, nil
}

func (cs *fakeCertSource) Remote(instance string) (cert *x509.Certificate, addr, name, version string, err error) {
return &x509.Certificate{}, "fake address", "fake name", "fake version", nil
}

// Test to verify that when the proxy client is up, the liveness endpoint writes http.StatusOK.
func TestLiveness(t *testing.T) {
s, err := healthcheck.NewServer(&proxy.Client{}, testPort)
s, err := healthcheck.NewServer(&proxy.Client{}, testPort, nil)
if err != nil {
t.Fatalf("Could not initialize health check: %v", err)
}
Expand All @@ -43,62 +62,62 @@ func TestLiveness(t *testing.T) {
t.Fatalf("HTTP GET failed: %v", err)
}
if resp.StatusCode != http.StatusOK {
t.Errorf("Got status code %v instead of %v", resp.StatusCode, http.StatusOK)
t.Errorf("want %v, got %v", http.StatusOK, resp.StatusCode)
}
}

// Test to verify that when startup has NOT finished, the startup and readiness endpoints write
// http.StatusServiceUnavailable.
func TestStartupFail(t *testing.T) {
s, err := healthcheck.NewServer(&proxy.Client{}, testPort)
// Test to verify that when startup HAS finished (and MaxConnections limit not specified),
// the startup and readiness endpoints write http.StatusOK.
func TestStartupPass(t *testing.T) {
s, err := healthcheck.NewServer(&proxy.Client{}, testPort, nil)
if err != nil {
t.Fatalf("Could not initialize health check: %v\n", err)
t.Fatalf("Could not initialize health check: %v", err)
}
defer s.Close(context.Background())

// Simulate the proxy client completing startup.
s.NotifyStarted()

resp, err := http.Get("http://localhost:" + testPort + startupPath)
if err != nil {
t.Fatalf("HTTP GET failed: %v\n", err)
t.Fatalf("HTTP GET failed: %v", err)
}
if resp.StatusCode != http.StatusServiceUnavailable {
t.Errorf("%v returned status code %v instead of %v", startupPath, resp.StatusCode, http.StatusServiceUnavailable)
if resp.StatusCode != http.StatusOK {
t.Errorf("%v: want %v, got %v", startupPath, http.StatusOK, resp.StatusCode)
}

resp, err = http.Get("http://localhost:" + testPort + readinessPath)
if err != nil {
t.Fatalf("HTTP GET failed: %v\n", err)
t.Fatalf("HTTP GET failed: %v", err)
}
if resp.StatusCode != http.StatusServiceUnavailable {
t.Errorf("%v returned status code %v instead of %v", readinessPath, resp.StatusCode, http.StatusServiceUnavailable)
if resp.StatusCode != http.StatusOK {
t.Errorf("%v: want %v, got %v", readinessPath, http.StatusOK, resp.StatusCode)
}
}

// Test to verify that when startup HAS finished (and MaxConnections limit not specified),
// the startup and readiness endpoints write http.StatusOK.
func TestStartupPass(t *testing.T) {
s, err := healthcheck.NewServer(&proxy.Client{}, testPort)
// Test to verify that when startup has NOT finished, the startup and readiness endpoints write
// http.StatusServiceUnavailable.
func TestStartupFail(t *testing.T) {
s, err := healthcheck.NewServer(&proxy.Client{}, testPort, nil)
if err != nil {
t.Fatalf("Could not initialize health check: %v\n", err)
t.Fatalf("Could not initialize health check: %v", err)
}
defer s.Close(context.Background())

// Simulate the proxy client completing startup.
s.NotifyStarted()

resp, err := http.Get("http://localhost:" + testPort + startupPath)
if err != nil {
t.Fatalf("HTTP GET failed: %v\n", err)
t.Fatalf("HTTP GET failed: %v", err)
}
if resp.StatusCode != http.StatusOK {
t.Errorf("%v returned status code %v instead of %v", startupPath, resp.StatusCode, http.StatusOK)
if resp.StatusCode != http.StatusServiceUnavailable {
t.Errorf("%v: want %v, got %v", startupPath, http.StatusOK, resp.StatusCode)
}

resp, err = http.Get("http://localhost:" + testPort + readinessPath)
if err != nil {
t.Fatalf("HTTP GET failed: %v\n", err)
t.Fatalf("HTTP GET failed: %v", err)
}
if resp.StatusCode != http.StatusOK {
t.Errorf("%v returned status code %v instead of %v", readinessPath, resp.StatusCode, http.StatusOK)
if resp.StatusCode != http.StatusServiceUnavailable {
t.Errorf("%v: want %v, got %v", readinessPath, http.StatusOK, resp.StatusCode)
}
}

Expand All @@ -108,7 +127,7 @@ func TestMaxConnectionsReached(t *testing.T) {
c := &proxy.Client{
MaxConnections: 1,
}
s, err := healthcheck.NewServer(c, testPort)
s, err := healthcheck.NewServer(c, testPort, nil)
if err != nil {
t.Fatalf("Could not initialize health check: %v", err)
}
Expand All @@ -122,14 +141,51 @@ func TestMaxConnectionsReached(t *testing.T) {
t.Fatalf("HTTP GET failed: %v", err)
}
if resp.StatusCode != http.StatusServiceUnavailable {
t.Errorf("Got status code %v instead of %v", resp.StatusCode, http.StatusServiceUnavailable)
t.Errorf("want %v, got %v", http.StatusServiceUnavailable, resp.StatusCode)
}
}

// Test to verify that when dialing instance(s) returns an error, the readiness endpoint
// writes http.StatusServiceUnavailable.
func TestDialFail(t *testing.T) {
tests := map[string]struct {
insts []string
}{
"Single instance": {insts: []string{"project:region:instance"}},
"Multiple instances": {insts: []string{"project:region:instance-1", "project:region:instance-2", "project:region:instance-3"}},
}

c := &proxy.Client{
Certs: &fakeCertSource{},
Dialer: func(string, string) (net.Conn, error) {
return nil, errors.New("error")
},
}

for name, test := range tests {
func() {
s, err := healthcheck.NewServer(c, testPort, test.insts)
if err != nil {
t.Fatalf("%v: Could not initialize health check: %v", name, err)
}
defer s.Close(context.Background())
s.NotifyStarted()

resp, err := http.Get("http://localhost:" + testPort + readinessPath)
if err != nil {
t.Fatalf("%v: HTTP GET failed: %v", name, err)
}
if resp.StatusCode != http.StatusServiceUnavailable {
t.Errorf("want %v, got %v", http.StatusServiceUnavailable, resp.StatusCode)
}
}()
}
}

// Test to verify that after closing a healthcheck, its liveness endpoint serves
// an error.
func TestCloseHealthCheck(t *testing.T) {
s, err := healthcheck.NewServer(&proxy.Client{}, testPort)
s, err := healthcheck.NewServer(&proxy.Client{}, testPort, nil)
if err != nil {
t.Fatalf("Could not initialize health check: %v", err)
}
Expand All @@ -140,7 +196,7 @@ func TestCloseHealthCheck(t *testing.T) {
t.Fatalf("HTTP GET failed: %v", err)
}
if resp.StatusCode != http.StatusOK {
t.Errorf("Got status code %v instead of %v", resp.StatusCode, http.StatusOK)
t.Errorf("want %v, got %v", http.StatusOK, resp.StatusCode)
}

err = s.Close(context.Background())
Expand Down
13 changes: 13 additions & 0 deletions proxy/proxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,19 @@ func ParseInstanceConnectionName(instance string) (string, string, string, []str
return proj, region, name, args, nil
}

// GetInstances iterates through the client cache, returning a list of previously dialed
// instances.
func (c *Client) GetInstances() []string {
var insts []string
c.cacheL.Lock()
cfgCache := c.cfgCache
c.cacheL.Unlock()
for i := range cfgCache {
insts = append(insts, i)
}
return insts
}

// AvailableConn returns false if MaxConnections has been reached, true otherwise.
// When MaxConnections is 0, there is no limit.
func (c *Client) AvailableConn() bool {
Expand Down
2 changes: 1 addition & 1 deletion proxy/proxy/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"unsafe"
)

const instance = "instance-name"
const instance = "project:region:instance"

var (
sentinelError = errors.New("sentinel error")
Expand Down

0 comments on commit eca3793

Please sign in to comment.