Skip to content

Commit

Permalink
[CORE-2098] Use minikubetestenv to deploy Prometheus (#9645)
Browse files Browse the repository at this point in the history
This adds a test ensuring that sidecar (storage) container metrics are properly made available.
  • Loading branch information
robert-uhl committed Jan 19, 2024
1 parent 7b5f342 commit c9c292e
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 27 deletions.
Binary file not shown.
1 change: 1 addition & 0 deletions src/internal/minikubetestenv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"@io_k8s_api//core/v1:core",
"@io_k8s_apimachinery//pkg/api/errors",
"@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
"@io_k8s_apimachinery//pkg/util/intstr",
"@io_k8s_apimachinery//pkg/util/net",
"@io_k8s_client_go//kubernetes",
"@org_golang_google_protobuf//types/known/durationpb",
Expand Down
42 changes: 24 additions & 18 deletions src/internal/minikubetestenv/client_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,41 +34,46 @@ var (
)

type acquireSettings struct {
SkipLoki bool
TLS bool
EnterpriseMember bool
CertPool *x509.CertPool
ValueOverrides map[string]string
UseNewCluster bool
skipLoki bool
tls bool
enterpriseMember bool
certPool *x509.CertPool
valueOverrides map[string]string
useNewCluster bool
installPrometheus bool
}

type Option func(*acquireSettings)

var SkipLokiOption Option = func(as *acquireSettings) {
as.SkipLoki = true
as.skipLoki = true
}

var WithTLS Option = func(as *acquireSettings) {
as.TLS = true
as.tls = true
}

func WithCertPool(pool *x509.CertPool) Option {
return func(as *acquireSettings) {
as.CertPool = pool
as.certPool = pool
}
}

func WithValueOverrides(v map[string]string) Option {
return func(as *acquireSettings) {
as.ValueOverrides = v
as.valueOverrides = v
}
}

var EnterpriseMemberOption Option = func(as *acquireSettings) {
as.EnterpriseMember = true
as.enterpriseMember = true
}
var UseNewClusterOption Option = func(as *acquireSettings) {
as.UseNewCluster = true
as.useNewCluster = true
}

var WithPrometheus Option = func(as *acquireSettings) {
as.installPrometheus = true
}

type managedCluster struct {
Expand All @@ -91,11 +96,12 @@ func (cf *ClusterFactory) assignClient(assigned string, mc *managedCluster) {
func deployOpts(clusterIdx int, as *acquireSettings) *DeployOpts {
return &DeployOpts{
PortOffset: uint16(clusterIdx * 150),
UseLeftoverCluster: *useLeftoverClusters && !as.UseNewCluster,
DisableLoki: as.SkipLoki,
TLS: as.TLS,
CertPool: as.CertPool,
ValueOverrides: as.ValueOverrides,
UseLeftoverCluster: *useLeftoverClusters && !as.useNewCluster,
DisableLoki: as.skipLoki,
TLS: as.tls,
CertPool: as.certPool,
ValueOverrides: as.valueOverrides,
InstallPrometheus: as.installPrometheus,
}
}

Expand Down Expand Up @@ -185,7 +191,7 @@ func AcquireCluster(t testing.TB, opts ...Option) (*client.APIClient, string) {
t.Cleanup(func() { // must come after assignment to run cleanmup with code coverage before the lease is removed.
clusterFactory.mu.Lock()
if mc := clusterFactory.managedClusters[assigned]; mc != nil {
collectMinikubeCodeCoverage(t, mc.client, mc.settings.ValueOverrides)
collectMinikubeCodeCoverage(t, mc.client, mc.settings.valueOverrides)
if *cleanupDataAfter {
deleteAll(t, mc.client)
}
Expand Down
60 changes: 51 additions & 9 deletions src/internal/minikubetestenv/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/net"
kube "k8s.io/client-go/kubernetes"

Expand Down Expand Up @@ -83,15 +84,16 @@ type DeployOpts struct {
// Because NodePorts are cluster-wide, we use a PortOffset to
// assign separate ports per deployment.
// NOTE: it might make more sense to declare port instead of offset
PortOffset uint16
DisableLoki bool
EnterpriseMember bool
EnterpriseServer bool
Determined bool
ValueOverrides map[string]string
TLS bool
CertPool *x509.CertPool
ValuesFiles []string
PortOffset uint16
DisableLoki bool
EnterpriseMember bool
EnterpriseServer bool
Determined bool
ValueOverrides map[string]string
TLS bool
CertPool *x509.CertPool
ValuesFiles []string
InstallPrometheus bool
}

func getLocalImage() string {
Expand Down Expand Up @@ -902,6 +904,46 @@ func putRelease(t testing.TB, ctx context.Context, namespace string, kubeClient
} else { // same config, no need to change anything
t.Logf("Previous helmOpts matched the previous cluster config, no changes made to cluster in %v", namespace)
}
if opts.InstallPrometheus {
// get latest version with: helm repo update && helm pull prometheus-community/kube-prometheus-stack -d etc/helm/charts/
require.NoErrorWithinTRetry(t, time.Minute, func() error {
chartPath := localPath(t, "etc", "helm", "charts", "kube-prometheus-stack-55.11.0.tgz")
return errors.Wrap(helm.UpgradeE(t, &helm.Options{
KubectlOptions: &k8s.KubectlOptions{Namespace: namespace}, SetStrValues: map[string]string{"namespaceOverride": namespace}}, chartPath, namespace+"-prometheus"), "could not upgrade or install Prometheus")
})
require.NoErrorWithinTRetry(t, time.Minute, func() error {
// Try to delete service; not a big deal if it fails because it may not exist, and we will retry shortly anyway.
kubeClient.CoreV1().Services(namespace).Delete(ctx, "pachyderm-prometheus-server", metav1.DeleteOptions{}) //nolint:errcheck
if _, err := kubeClient.CoreV1().Services(namespace).Create(ctx, &v1.Service{
TypeMeta: metav1.TypeMeta{
Kind: "Service",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "pachyderm-prometheus-server",
},
Spec: v1.ServiceSpec{
Type: v1.ServiceTypeNodePort,
Selector: map[string]string{
"app.kubernetes.io/name": "prometheus",
"operator.prometheus.io/name": namespace + "-prometheus-prometheus",
},
Ports: []v1.ServicePort{
{
Port: int32(pachAddress.Port + 10),
NodePort: int32(pachAddress.Port + 10),
TargetPort: intstr.FromInt(9090),
Name: "http-web",
},
},
},
}, metav1.CreateOptions{}); err != nil {
return errors.Wrap(err, "could not create Prometheus server service")
}
waitForLabeledPod(t, ctx, kubeClient, namespace, "app.kubernetes.io/name=prometheus")
return nil
})
}
pClient := pachClient(t, pachAddress, opts.AuthUser, namespace, opts.CertPool)
t.Cleanup(func() {
collectMinikubeCodeCoverage(t, pClient, opts.ValueOverrides)
Expand Down
112 changes: 112 additions & 0 deletions src/server/pps/server/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
//go:build k8s

package server_test

import (
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"testing"
"time"

"github.com/pachyderm/pachyderm/v2/src/internal/errors"
"github.com/pachyderm/pachyderm/v2/src/internal/minikubetestenv"
"github.com/pachyderm/pachyderm/v2/src/internal/pctx"
"github.com/pachyderm/pachyderm/v2/src/internal/require"
"github.com/pachyderm/pachyderm/v2/src/internal/testutil"
)

func TestContainerMetrics(t *testing.T) {
ctx, cancel := pctx.WithCancel(pctx.TestContext(t))
defer cancel()
t.Parallel()
c, namespace := minikubetestenv.AcquireCluster(t, minikubetestenv.WithPrometheus)

docsDir := t.TempDir()
tmpDir := t.TempDir()
cmd := testutil.PachctlBashCmdCtx(ctx, t, c, `
# run a pipeline in order to have a container to scrape metrics from
pachctl create project video-to-frame-traces
pachctl create repo raw_videos_and_images
pachctl put file raw_videos_and_images@master:robot.png -f https://raw.githubusercontent.com/pachyderm/docs-content/main/images/opencv/robot.jpg
pachctl create pipeline -f - <<EOF
pipeline:
name: video_mp4_converter
input:
pfs:
repo: raw_videos_and_images
glob: "/*"
transform:
image: lbliii/video_mp4_converter:1.0.14
cmd:
- python3
- /video_mp4_converter.py
- --input
- /pfs/raw_videos_and_images/
- --output
- /pfs/out/
EOF
kubectl -n {{.namespace}} apply -f - <<EOF
apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
name: worker-scraper
labels:
release: {{.namespace}}-prometheus
spec:
selector:
matchLabels:
app: pipeline
component: worker
namespaceSelector:
matchNames:
- {{.namespace}}
podMetricsEndpoints:
- port: metrics-storage
path: /metrics
- port: metrics-user
path: /metrics
EOF
`,
"namespace", namespace,
"docsContent", docsDir,
"tmpDir", tmpDir)
cmd.Stdout = os.Stdout

require.NoError(t, cmd.Run())
require.NoErrorWithinTRetry(t, 2*time.Minute, func() error {
var resp *http.Response
var err error
addr := c.GetAddress()
uri := fmt.Sprintf("http://%s:%d/api/v1/query?query=pachyderm_auth_dex_approval_errors_total", addr.Host, addr.Port+10)
if resp, err = http.Get(uri); err != nil {
return errors.Wrapf(err, "could not fetch %s", uri)
}

b, err := io.ReadAll(resp.Body)
if err != nil {
return errors.Wrap(err, "could not read response")
}
defer resp.Body.Close()
type response struct {
Status string `json:"status"`
Data struct {
ResultType string `json:"resultType"`
Result []any `json:"result"`
} `json:"data"`
}
var pResp response
if err := json.Unmarshal(b, &pResp); err != nil {
return errors.Wrap(err, "could not parse response")
}
if pResp.Status != "success" {
return errors.Errorf("got %q; expected \"success\"", pResp.Status)
}
if len(pResp.Data.Result) == 0 {
return errors.New("no result")
}
return nil
})
}

0 comments on commit c9c292e

Please sign in to comment.