Skip to content

Commit

Permalink
feat: add health checks to proxy (#859)
Browse files Browse the repository at this point in the history
## Change Description

Introduce health checks to the Cloud SQL proxy client, allowing for the proactive and automatic mitigation of health-related issues. The health checks consist of **startup**, **liveness**, and **readiness** probing, with requests against the proxy container issued via HTTP.

## Checklist

- [x] Make sure to open an issue as a [bug/issue](https://github.com/GoogleCloudPlatform/cloudsql-proxy/issues/new/choose) 
  before writing your code!  That way we can discuss the change, evaluate 
  designs, and agree on the general idea.
- [x] Ensure the tests and linter pass
- [x] Appropriate documentation is updated (if necessary)

## Relevant issues:

- Fixes #137
  • Loading branch information
monazhn committed Aug 3, 2021
1 parent 2c2bc8a commit ea62bdd
Show file tree
Hide file tree
Showing 7 changed files with 528 additions and 0 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,15 @@ message and optionally stacktrace. For example, the startup message looks like:
{"severity":"INFO","timestamp":"2020-10-12T07:20:50.52Z","caller":"cloud_sql_proxy/cloud_sql_proxy.go:510","message":"Using gcloud's active project: [my-project-id]"}
```

#### `-use_http_health_check`

Enables HTTP health checks for the proxy, including startup, liveness, and readiness probing.
Requires that you configure the Kubernetes container with HTTP probes ([instructions][health-check-example]).

#### `-health_check_port=8090`

Specifies the port that the health check server listens and serves on. Defaults to 8090.

## Running as a Kubernetes Sidecar

See the [example here][sidecar-example] as well as [Connecting from Google
Expand Down Expand Up @@ -334,6 +343,7 @@ Install via Nuget, follow these
[connect-to-k8s]: https://cloud.google.com/sql/docs/mysql/connect-kubernetes-engine
[connection-overview]: https://cloud.google.com/sql/docs/mysql/connect-overview
[contributing]: CONTRIBUTING.md
[health-check-example]: https://github.com/GoogleCloudPlatform/cloudsql-proxy/tree/main/examples/k8s-health-check#cloud-sql-proxy-health-checks
[iam-auth]: https://cloud.google.com/sql/docs/postgres/authentication
[pkg-badge]: https://pkg.go.dev/badge/github.com/GoogleCloudPlatform/cloudsql-proxy.svg
[pkg-docs]: https://pkg.go.dev/github.com/GoogleCloudPlatform/cloudsql-proxy
Expand Down
19 changes: 19 additions & 0 deletions cmd/cloud_sql_proxy/cloud_sql_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"syscall"
"time"

"github.com/GoogleCloudPlatform/cloudsql-proxy/cmd/cloud_sql_proxy/internal/healthcheck"
"github.com/GoogleCloudPlatform/cloudsql-proxy/logging"
"github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/certs"
"github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/fuse"
Expand Down Expand Up @@ -131,6 +132,10 @@ unavailable.`,
`When set, the proxy uses this host as the base API path. Example:
https://sqladmin.googleapis.com`,
)

// Settings for healthcheck
useHTTPHealthCheck = flag.Bool("use_http_health_check", false, "When set, creates an HTTP server that checks and communicates the health of the proxy client.")
healthCheckPort = flag.String("health_check_port", "8090", "When applicable, health checks take place on this port number. Defaults to 8090.")
)

const (
Expand Down Expand Up @@ -580,6 +585,16 @@ func main() {
RefreshCfgBuffer: refreshCfgBuffer,
}

var hc *healthcheck.Server
if *useHTTPHealthCheck {
hc, err = healthcheck.NewServer(proxyClient, *healthCheckPort)
if err != nil {
logging.Errorf("Could not initialize health check server: %v", err)
os.Exit(1)
}
defer hc.Close(ctx)
}

// Initialize a source of new connections to Cloud SQL instances.
var connSrc <-chan proxy.Conn
if *useFuse {
Expand Down Expand Up @@ -619,6 +634,10 @@ func main() {

logging.Infof("Ready for new connections")

if hc != nil {
hc.NotifyStarted()
}

signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)

Expand Down
153 changes: 153 additions & 0 deletions cmd/cloud_sql_proxy/internal/healthcheck/healthcheck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright 2021 Google LLC All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package healthcheck tests and communicates the health of the Cloud SQL Auth proxy.
package healthcheck

import (
"context"
"errors"
"net"
"net/http"
"sync"

"github.com/GoogleCloudPlatform/cloudsql-proxy/logging"
"github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/proxy"
)

const (
startupPath = "/startup"
livenessPath = "/liveness"
readinessPath = "/readiness"
)

// Server is a type used to implement health checks for the proxy.
type Server struct {
// started is used to indicate whether the proxy has finished starting up.
// If started is open, startup has not finished. If started is closed,
// startup is complete.
started chan struct{}
// once ensures that started can only be closed once.
once *sync.Once
// port designates the port number on which Server listens and serves.
port string
// srv is a pointer to the HTTP server used to communicate proxy health.
srv *http.Server
}

// NewServer initializes a Server and exposes HTTP endpoints used to
// communicate proxy health.
func NewServer(c *proxy.Client, port string) (*Server, error) {
mux := http.NewServeMux()

srv := &http.Server{
Addr: ":" + port,
Handler: mux,
}

hcServer := &Server{
started: make(chan struct{}),
once: &sync.Once{},
port: port,
srv: srv,
}

mux.HandleFunc(startupPath, func(w http.ResponseWriter, _ *http.Request) {
if !hcServer.proxyStarted() {
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte("error"))
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
})

mux.HandleFunc(readinessPath, func(w http.ResponseWriter, _ *http.Request) {
if !isReady(c, hcServer) {
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte("error"))
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
})

mux.HandleFunc(livenessPath, func(w http.ResponseWriter, _ *http.Request) {
if !isLive() { // Because isLive() always returns true, this case should not be reached.
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte("error"))
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
})

ln, err := net.Listen("tcp", srv.Addr)
if err != nil {
return nil, err
}

go func() {
if err := srv.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) {
logging.Errorf("Failed to start health check HTTP server: %v", err)
}
}()

return hcServer, nil
}

// Close gracefully shuts down the HTTP server belonging to the Server.
func (s *Server) Close(ctx context.Context) error {
return s.srv.Shutdown(ctx)
}

// NotifyStarted tells the Server that the proxy has finished startup.
func (s *Server) NotifyStarted() {
s.once.Do(func() { close(s.started) })
}

// proxyStarted returns true if started is closed, false otherwise.
func (s *Server) proxyStarted() bool {
select {
case <-s.started:
return true
default:
return false
}
}

// isLive returns true as long as the proxy is running.
func isLive() bool {
return true
}

// isReady will check the following criteria before determining whether the
// proxy is ready for new connections.
// 1. Finished starting up / been sent the 'Ready for Connections' log.
// 2. Not yet hit the MaxConnections limit, if applicable.
func isReady(c *proxy.Client, s *Server) bool {
// Not ready until we reach the 'Ready for Connections' log
if !s.proxyStarted() {
logging.Errorf("Readiness failed because proxy has not finished starting up.")
return false
}

// Not ready if the proxy is at the optional MaxConnections limit.
if !c.AvailableConn() {
logging.Errorf("Readiness failed because proxy has reached the maximum connections limit (%d).", c.MaxConnections)
return false
}

return true
}
155 changes: 155 additions & 0 deletions cmd/cloud_sql_proxy/internal/healthcheck/healthcheck_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright 2021 Google LLC All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package healthcheck_test

import (
"context"
"net/http"
"testing"

"github.com/GoogleCloudPlatform/cloudsql-proxy/cmd/cloud_sql_proxy/internal/healthcheck"
"github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/proxy"
)

const (
startupPath = "/startup"
livenessPath = "/liveness"
readinessPath = "/readiness"
testPort = "8090"
)

// Test to verify that when the proxy client is up, the liveness endpoint writes http.StatusOK.
func TestLiveness(t *testing.T) {
s, err := healthcheck.NewServer(&proxy.Client{}, testPort)
if err != nil {
t.Fatalf("Could not initialize health check: %v", err)
}
defer s.Close(context.Background())

resp, err := http.Get("http://localhost:" + testPort + livenessPath)
if err != nil {
t.Fatalf("HTTP GET failed: %v", err)
}
if resp.StatusCode != http.StatusOK {
t.Errorf("Got status code %v instead of %v", resp.StatusCode, http.StatusOK)
}
}

// Test to verify that when startup has NOT finished, the startup and readiness endpoints write
// http.StatusServiceUnavailable.
func TestStartupFail(t *testing.T) {
s, err := healthcheck.NewServer(&proxy.Client{}, testPort)
if err != nil {
t.Fatalf("Could not initialize health check: %v\n", err)
}
defer s.Close(context.Background())

resp, err := http.Get("http://localhost:" + testPort + startupPath)
if err != nil {
t.Fatalf("HTTP GET failed: %v\n", err)
}
if resp.StatusCode != http.StatusServiceUnavailable {
t.Errorf("%v returned status code %v instead of %v", startupPath, resp.StatusCode, http.StatusServiceUnavailable)
}

resp, err = http.Get("http://localhost:" + testPort + readinessPath)
if err != nil {
t.Fatalf("HTTP GET failed: %v\n", err)
}
if resp.StatusCode != http.StatusServiceUnavailable {
t.Errorf("%v returned status code %v instead of %v", readinessPath, resp.StatusCode, http.StatusServiceUnavailable)
}
}

// Test to verify that when startup HAS finished (and MaxConnections limit not specified),
// the startup and readiness endpoints write http.StatusOK.
func TestStartupPass(t *testing.T) {
s, err := healthcheck.NewServer(&proxy.Client{}, testPort)
if err != nil {
t.Fatalf("Could not initialize health check: %v\n", err)
}
defer s.Close(context.Background())

// Simulate the proxy client completing startup.
s.NotifyStarted()

resp, err := http.Get("http://localhost:" + testPort + startupPath)
if err != nil {
t.Fatalf("HTTP GET failed: %v\n", err)
}
if resp.StatusCode != http.StatusOK {
t.Errorf("%v returned status code %v instead of %v", startupPath, resp.StatusCode, http.StatusOK)
}

resp, err = http.Get("http://localhost:" + testPort + readinessPath)
if err != nil {
t.Fatalf("HTTP GET failed: %v\n", err)
}
if resp.StatusCode != http.StatusOK {
t.Errorf("%v returned status code %v instead of %v", readinessPath, resp.StatusCode, http.StatusOK)
}
}

// Test to verify that when startup has finished, but MaxConnections has been reached,
// the readiness endpoint writes http.StatusServiceUnavailable.
func TestMaxConnectionsReached(t *testing.T) {
c := &proxy.Client{
MaxConnections: 1,
}
s, err := healthcheck.NewServer(c, testPort)
if err != nil {
t.Fatalf("Could not initialize health check: %v", err)
}
defer s.Close(context.Background())

s.NotifyStarted()
c.ConnectionsCounter = c.MaxConnections // Simulate reaching the limit for maximum number of connections

resp, err := http.Get("http://localhost:" + testPort + readinessPath)
if err != nil {
t.Fatalf("HTTP GET failed: %v", err)
}
if resp.StatusCode != http.StatusServiceUnavailable {
t.Errorf("Got status code %v instead of %v", resp.StatusCode, http.StatusServiceUnavailable)
}
}

// Test to verify that after closing a healthcheck, its liveness endpoint serves
// an error.
func TestCloseHealthCheck(t *testing.T) {
s, err := healthcheck.NewServer(&proxy.Client{}, testPort)
if err != nil {
t.Fatalf("Could not initialize health check: %v", err)
}
defer s.Close(context.Background())

resp, err := http.Get("http://localhost:" + testPort + livenessPath)
if err != nil {
t.Fatalf("HTTP GET failed: %v", err)
}
if resp.StatusCode != http.StatusOK {
t.Errorf("Got status code %v instead of %v", resp.StatusCode, http.StatusOK)
}

err = s.Close(context.Background())
if err != nil {
t.Fatalf("Failed to close health check: %v", err)
}

_, err = http.Get("http://localhost:" + testPort + livenessPath)
if err == nil {
t.Fatalf("HTTP GET did not return error after closing health check server.")
}
}

0 comments on commit ea62bdd

Please sign in to comment.