Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
120410: sql: allow additional fmt flags for stmt fingerprinting r=xinhaoz a=xinhaoz

This commit introduces a way to add additional format flags when
formatting a statement AST into its statement fingerprint representation
for sql stats. This allows us to more aggressively generalize the
statement fingerprint.

`sql.stats.statement_fingerprint.format_mask` will be used to supply
these additional flags to the formatter.  It is currently 0 by default
since no new flags for fingerprints have been added.

Epic: none
Part of: #120409

Release note: None

120646: telemetryccl: use log spy in backup/restore test r=abarganier a=dhartunian

Previously this test used file logging to test the telemetry output, which can result in flakes on CI. This commit modifies the test to use a log spy which is a bit more reliable. Additionally, the deserialization now happens in the `Intercept()` method which makes the test easier to read.

Resolves: #120115
Epic: None
Release note: None

120653: server: refactor TestAdminDebugRedirect test r=abarganier a=dhartunian

Adjusts test to use more standard redirect ignoring behavior in stdlib, and removes the test tenant override since this test works with tenants now after some adjustments to URL handling.

The #120095 issue was a timeout that this change doesn't explicitly deal with here since that problem isn't reproducible. The hope is that modified redirect error handling might trigger a less error-prone branch in the HTTP-client. There's nothing else to really change in this test since it's quite simple and we haven't seen similar timeouts persist in other HTTP tests.

Resolves: #120095
Resolves: #112955
Epic: None

Release note: None

120699: sql: skip TestSqlActivityUpdateTopLimitJob r=abarganier a=dhartunian

Release note: None

120715: workflows: tag `cockroach` builds for integration tests r=rail a=rickystewart

... with the tag `integration-test-artifact-build`. We do this to track how long it takes to build these artifacts specifically.

Epic: CRDB-8308
Release note: None

Co-authored-by: Xin Hao Zhang <xzhang@cockroachlabs.com>
Co-authored-by: David Hartunian <davidh@cockroachlabs.com>
Co-authored-by: Ricky Stewart <ricky@cockroachlabs.com>
  • Loading branch information
4 people committed Mar 19, 2024
6 parents 0c6fad5 + 5c3cc49 + cffcc30 + 1385f55 + 5dd2c27 + 6008d44 commit f0116ea
Show file tree
Hide file tree
Showing 21 changed files with 155 additions and 130 deletions.
1 change: 1 addition & 0 deletions build/github/acceptance-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
set -euxo pipefail

bazel build --config crosslinux //pkg/cmd/cockroach-short \
--bes_keywords integration-test-artifact-build \
--jobs 100 $(./build/github/engflow-args.sh)

ARTIFACTSDIR=$PWD/artifacts
Expand Down
5 changes: 4 additions & 1 deletion build/github/examples-orms.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
set -euxo pipefail

pushd cockroach
bazel build //pkg/cmd/cockroach-short --config crosslinux --jobs 100 $(./build/github/engflow-args.sh)
bazel build //pkg/cmd/cockroach-short \
--config crosslinux --jobs 100 \
--bes_keywords integration-test-artifact-build \
$(./build/github/engflow-args.sh)
cp _bazel/bin/pkg/cmd/cockroach-short/cockroach-short_/cockroach-short ../examples-orms/cockroach
# We need Go in the `PATH`.
export PATH=$(dirname $(bazel run @go_sdk//:bin/go --run_under=realpath)):$PATH
Expand Down
1 change: 1 addition & 0 deletions build/github/local-roachtest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export COCKROACH_DEV_LICENSE=$(gcloud secrets versions access 1 --secret=cockroa
set -x

bazel build --config=$CROSSCONFIG $(./build/github/engflow-args.sh) \
--bes_keywords integration-test-artifact-build \
--jobs 100 \
//pkg/cmd/cockroach-short \
//pkg/cmd/roachtest \
Expand Down
123 changes: 58 additions & 65 deletions pkg/ccl/telemetryccl/telemetry_logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -156,17 +155,60 @@ type expectedSampleQueryEvent struct {
stmt string
}

type telemetrySpy struct {
t *testing.T

sampledQueries []eventpb.SampledQuery
sampledQueriesRaw []logpb.Entry
recoveryEvents []eventpb.RecoveryEvent
}

func (l *telemetrySpy) Intercept(entry []byte) {
var rawLog logpb.Entry
if err := json.Unmarshal(entry, &rawLog); err != nil {
l.t.Errorf("failed unmarshaling %s: %s", entry, err)
}

if rawLog.Channel != logpb.Channel_TELEMETRY {
return
}

var sq eventpb.SampledQuery
if strings.Contains(rawLog.Message, "IMPORT") ||
strings.Contains(rawLog.Message, "RESTORE") ||
strings.Contains(rawLog.Message, "BACKUP") {
if err := json.Unmarshal([]byte(rawLog.Message[rawLog.StructuredStart:rawLog.StructuredEnd]), &sq); err == nil {
l.sampledQueries = append(l.sampledQueries, sq)
l.sampledQueriesRaw = append(l.sampledQueriesRaw, rawLog)
return
} else {
l.t.Errorf("failed unmarshaling %s: %s", rawLog.Message, err)
}
}

var re eventpb.RecoveryEvent
if err := json.Unmarshal([]byte(rawLog.Message[rawLog.StructuredStart:rawLog.StructuredEnd]), &re); err == nil {
l.recoveryEvents = append(l.recoveryEvents, re)
return
} else {
l.t.Errorf("failed unmarshaling %s: %s", rawLog.Message, err)
}
}

var _ log.Interceptor = &telemetrySpy{}

// TODO(janexing): add event telemetry tests for failed or canceled bulk jobs.
func TestBulkJobTelemetryLogging(t *testing.T) {
defer leaktest.AfterTest(t)()
sc := log.ScopeWithoutShowLogs(t)
defer sc.Close(t)

skip.WithIssue(t, 120115)

ctx := context.Background()

cleanup := logtestutils.InstallLogFileSink(sc, t, logpb.Channel_TELEMETRY)
spy := &telemetrySpy{
t: t,
}
cleanup := log.InterceptWith(ctx, spy)
defer cleanup()

st := logtestutils.StubTime{}
Expand Down Expand Up @@ -325,64 +367,20 @@ func TestBulkJobTelemetryLogging(t *testing.T) {
execTimestamp++
}

log.FlushFiles()

var filteredSampleQueries []logpb.Entry
testutils.SucceedsSoon(t, func() error {
filteredSampleQueries = []logpb.Entry{}
sampleQueryEntries, err := log.FetchEntriesFromFiles(
0,
math.MaxInt64,
10000,
regexp.MustCompile(`"EventType":"sampled_query"`),
log.WithMarkedSensitiveData,
)
require.NoError(t, err)

for _, sq := range sampleQueryEntries {
if !(strings.Contains(sq.Message, "IMPORT") || strings.Contains(sq.Message, "RESTORE") || strings.Contains(sq.Message, "BACKUP")) {
continue
}
filteredSampleQueries = append(filteredSampleQueries, sq)
}
if len(filteredSampleQueries) < len(testData) {
return errors.New("not enough sample query events fetched")
}
return nil
})

var recoveryEventEntries []logpb.Entry
testutils.SucceedsSoon(t, func() error {
recoveryEventEntries, err = log.FetchEntriesFromFiles(
0,
math.MaxInt64,
10000,
regexp.MustCompile(`"EventType":"recovery_event"`),
log.WithMarkedSensitiveData,
)
require.NoError(t, err)
if len(recoveryEventEntries) < len(testData) {
return errors.New("not enough recovery events fetched")
}
return nil
})
log.FlushAllSync()

for _, tc := range testData {
t.Run(tc.name, func(t *testing.T) {
var foundSampleQuery bool
for i := len(filteredSampleQueries) - 1; i >= 0; i-- {
e := filteredSampleQueries[i]
var sq eventpb.SampledQuery
jsonPayload := []byte(e.Message)
if err := json.Unmarshal(jsonPayload, &sq); err != nil {
t.Errorf("unmarshalling %q: %v", e.Message, err)
}
for i := len(spy.sampledQueries) - 1; i >= 0; i-- {
sq := spy.sampledQueries[i]
if sq.Statement.StripMarkers() == tc.sampleQueryEvent.stmt {
foundSampleQuery = true
if strings.Contains(e.Message, "NumRows:") {
rawEvent := spy.sampledQueriesRaw[i]
if strings.Contains(rawEvent.Message, "NumRows:") {
t.Errorf("for bulk jobs (IMPORT/BACKUP/RESTORE), "+
"there shouldn't be NumRows entry in the event message: %s",
e.Message)
rawEvent.Message)
}
require.Greater(t, sq.BulkJobId, uint64(0))
tc.recoveryEvent.bulkJobId = sq.BulkJobId
Expand All @@ -394,18 +392,13 @@ func TestBulkJobTelemetryLogging(t *testing.T) {
}

var foundRecoveryEvent bool
for i := len(recoveryEventEntries) - 1; i >= 0; i-- {
e := recoveryEventEntries[i]
var re eventpb.RecoveryEvent
jsonPayload := []byte(e.Message)
if err := json.Unmarshal(jsonPayload, &re); err != nil {
t.Errorf("unmarshalling %q: %v", e.Message, err)
}
if string(re.RecoveryType) == tc.recoveryEvent.recoveryType &&
tc.recoveryEvent.bulkJobId == re.JobID &&
re.ResultStatus == "succeeded" {
for i := len(spy.recoveryEvents) - 1; i >= 0; i-- {
e := spy.recoveryEvents[i]
if string(e.RecoveryType) == tc.recoveryEvent.recoveryType &&
tc.recoveryEvent.bulkJobId == e.JobID &&
e.ResultStatus == "succeeded" {
foundRecoveryEvent = true
require.Equal(t, tc.recoveryEvent.numRows, re.NumRows)
require.Equal(t, tc.recoveryEvent.numRows, e.NumRows)
break
}
}
Expand Down
1 change: 0 additions & 1 deletion pkg/server/debug/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ go_test(
"//pkg/server/srvtestutils",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
61 changes: 25 additions & 36 deletions pkg/server/debug/debug_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,19 @@ import (
"bytes"
"context"
"net/http"
"net/url"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/server/debug"
"github.com/cockroachdb/cockroach/pkg/server/srvtestutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

// debugURL returns the root debug URL.
func debugURL(s serverutils.ApplicationLayerInterface, path string) string {
return s.AdminURL().WithPath(debug.Endpoint).WithPath(path).String()
func debugURL(s serverutils.ApplicationLayerInterface, path string) *serverutils.TestURL {
return s.AdminURL().WithPath(debug.Endpoint).WithPath(path)
}

// TestAdminDebugExpVar verifies that cmdline and memstats variables are
Expand All @@ -46,7 +43,7 @@ func TestAdminDebugExpVar(t *testing.T) {

ts := s.ApplicationLayer()

jI, err := srvtestutils.GetJSON(ts, debugURL(ts, "vars"))
jI, err := srvtestutils.GetJSON(ts, debugURL(ts, "vars").String())
if err != nil {
t.Fatalf("failed to fetch JSON: %v", err)
}
Expand All @@ -73,7 +70,7 @@ func TestAdminDebugMetrics(t *testing.T) {

ts := s.ApplicationLayer()

jI, err := srvtestutils.GetJSON(ts, debugURL(ts, "metrics"))
jI, err := srvtestutils.GetJSON(ts, debugURL(ts, "metrics").String())
if err != nil {
t.Fatalf("failed to fetch JSON: %v", err)
}
Expand All @@ -100,7 +97,7 @@ func TestAdminDebugPprof(t *testing.T) {

ts := s.ApplicationLayer()

body, err := srvtestutils.GetText(ts, debugURL(ts, "pprof/block?debug=1"))
body, err := srvtestutils.GetText(ts, debugURL(ts, "pprof/block?debug=1").String())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -130,7 +127,7 @@ func TestAdminDebugTrace(t *testing.T) {
}

for _, c := range tc {
body, err := srvtestutils.GetText(ts, debugURL(ts, c.segment))
body, err := srvtestutils.GetText(ts, debugURL(ts, c.segment).String())
if err != nil {
t.Fatal(err)
}
Expand All @@ -148,7 +145,7 @@ func TestAdminDebugAuth(t *testing.T) {
defer s.Stopper().Stop(context.Background())
ts := s.ApplicationLayer()

url := debugURL(ts, "")
url := debugURL(ts, "").String()

// Unauthenticated.
client, err := ts.GetUnauthenticatedHTTPClient()
Expand Down Expand Up @@ -199,17 +196,14 @@ func TestAdminDebugRedirect(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.WithIssue(t, 120095)

s := serverutils.StartServerOnly(t, base.TestServerArgs{
DefaultTestTenant: base.TestIsForStuffThatShouldWorkWithSharedProcessModeButDoesntYet(
base.TestTenantProbabilistic, 112955,
),
})
s := serverutils.StartServerOnly(t, base.TestServerArgs{})
defer s.Stopper().Stop(context.Background())
ts := s.ApplicationLayer()

expURL := debugURL(ts, "/")
// Drops the `?cluster=` query param if present.
expURL.RawQuery = ""

origURL := debugURL(ts, "/incorrect")

// Must be admin to access debug endpoints
Expand All @@ -218,29 +212,24 @@ func TestAdminDebugRedirect(t *testing.T) {
t.Fatal(err)
}

// Don't follow redirects automatically.
redirectAttemptedError := errors.New("redirect")
client.CheckRedirect = func(req *http.Request, via []*http.Request) error {
return redirectAttemptedError
// Don't follow redirects automatically. This error is a special
// case in the `CheckRedirect` docs that forwards the last response
// instead of following the redirect.
return http.ErrUseLastResponse
}

resp, err := client.Get(origURL)
if urlError := (*url.Error)(nil); errors.As(err, &urlError) &&
errors.Is(urlError.Err, redirectAttemptedError) {
// Ignore the redirectAttemptedError.
err = nil
}
resp, err := client.Get(origURL.String())
if err != nil {
t.Fatal(err)
} else {
resp.Body.Close()
if resp.StatusCode != http.StatusMovedPermanently {
t.Errorf("expected status code %d; got %d", http.StatusMovedPermanently, resp.StatusCode)
}
if redirectURL, err := resp.Location(); err != nil {
t.Error(err)
} else if foundURL := redirectURL.String(); foundURL != expURL {
t.Errorf("expected location %s; got %s", expURL, foundURL)
}
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusMovedPermanently {
t.Errorf("expected status code %d; got %d", http.StatusMovedPermanently, resp.StatusCode)
}
if redirectURL, err := resp.Location(); err != nil {
t.Error(err)
} else if foundURL := redirectURL.String(); foundURL != expURL.String() {
t.Errorf("expected location %s; got %s", expURL, foundURL)
}
}
14 changes: 12 additions & 2 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2864,7 +2864,8 @@ func (ex *connExecutor) execCopyOut(

// Log the query for sampling.
// These fields are not available in COPY, so use the empty value.
f := tree.NewFmtCtx(tree.FmtHideConstants)
flags := tree.FmtFlags(queryFormattingForFingerprintsMask.Get(&ex.server.cfg.Settings.SV))
f := tree.NewFmtCtx(flags)
f.FormatNode(cmd.Stmt)
stmtFingerprintID := appstatspb.ConstructStatementFingerprintID(
f.CloseAndGetString(),
Expand Down Expand Up @@ -3128,7 +3129,8 @@ func (ex *connExecutor) execCopyIn(
res.SetRowsAffected(ctx, numInsertedRows)
}
// These fields are not available in COPY, so use the empty value.
f := tree.NewFmtCtx(tree.FmtHideConstants)
flags := tree.FmtHideConstants | tree.FmtFlags(queryFormattingForFingerprintsMask.Get(&ex.server.cfg.Settings.SV))
f := tree.NewFmtCtx(flags)
f.FormatNode(cmd.Stmt)
stmtFingerprintID := appstatspb.ConstructStatementFingerprintID(
f.CloseAndGetString(),
Expand Down Expand Up @@ -3522,6 +3524,14 @@ var allowSnapshotIsolation = settings.RegisterBoolSetting(

var logIsolationLevelLimiter = log.Every(10 * time.Second)

// Bitmask for enabling various query fingerprint formatting styles.
var queryFormattingForFingerprintsMask = settings.RegisterIntSetting(
settings.ApplicationLevel,
"sql.stats.statement_fingerprint.format_mask",
"enables setting additional fmt flags for statement fingerprint formatting",
0,
)

func (ex *connExecutor) txnIsolationLevelToKV(
ctx context.Context, level tree.IsolationLevel,
) isolation.Level {
Expand Down

0 comments on commit f0116ea

Please sign in to comment.