Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Add timeout for trying apply_downsamplg_config lock.
Browse files Browse the repository at this point in the history
Signed-off-by: Harkishen-Singh <harkishensingh@hotmail.com>
  • Loading branch information
Harkishen-Singh committed Jan 11, 2023
1 parent 9e121c1 commit a2eba1d
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 64 deletions.
2 changes: 1 addition & 1 deletion EXTENSION_VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
downsampling_new_config
feature_metric_rollup
5 changes: 0 additions & 5 deletions pkg/dataset/deepcopy_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 24 additions & 18 deletions pkg/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/jackc/pgx/v4"
"github.com/pkg/errors"

"github.com/timescale/promscale/pkg/internal/day"
"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/util"
)

Expand All @@ -28,15 +30,12 @@ type Config struct {
}

func (c Config) Name() string {
return downsamplePrefix + day.String(c.Interval)
return downsamplePrefix + c.Interval.String()
}

func SetState(ctx context.Context, conn *pgx.Conn, state bool) error {
_, err := conn.Exec(ctx, setGlobalDownsamplingStateSQL, state)
if err != nil {
return fmt.Errorf("error setting downsampling state: %w", err)
}
return nil
return errors.WithMessage(err, "error setting downsampling state")
}

type cfgWithName struct {
Expand All @@ -52,31 +51,38 @@ func Sync(ctx context.Context, conn *pgx.Conn, cfgs []Config) error {
return fmt.Errorf("error getting lock for syncing downsampling config")
}
defer pgLock.Close()
got, err := pgLock.GetAdvisoryLock() // To prevent failure when multiple Promscale start at the same time.

try := func() (bool, error) {
got, err := pgLock.GetAdvisoryLock() // To prevent failure when multiple Promscale start at the same time.
return got, errors.WithMessage(err, "error trying pg advisory_lock")
}

got, err := try()
if err != nil {
return fmt.Errorf("error trying pg advisory_lock")
return err
}
if !got {
// Some other Promscale instance is already working on the downsampling.Sync()
// Hence, we should skip.
return nil
}
defer func() {
if _, err = pgLock.Unlock(); err != nil {
log.Error("msg", "error unlocking downsampling.Sync advisory_lock", "err", err.Error())
// Wait for sometime and try again. If we still did not get the lock, throw an error.
time.Sleep(time.Second * 5)
got, err = try()
if err != nil {
return err
}
}()
if !got {
return fmt.Errorf("timeout: unable to take the advisory lock for syncing downsampling state")
}
}

var applyCfgs []cfgWithName
for i := range cfgs {
c := cfgs[i]
applyCfgs = append(applyCfgs, cfgWithName{Name: c.Name(), Interval: day.String(c.Interval), Retention: day.String(c.Retention)})
applyCfgs = append(applyCfgs, cfgWithName{Name: c.Name(), Interval: c.Interval.String(), Retention: c.Retention.String()})
}
if len(applyCfgs) > 0 {
str, err := json.Marshal(applyCfgs)
if err != nil {
return fmt.Errorf("error marshalling configs: %w", err)
}
fmt.Println("str", string(str))
if _, err = conn.Exec(ctx, applyDownsampleConfigSQL, str); err != nil {
return fmt.Errorf("error applying downsample config: %w", err)
}
Expand Down
54 changes: 24 additions & 30 deletions pkg/internal/day/duration.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
const (
dayUnit = 'd'
unknownUnitDErrorPrefix = `time: unknown unit "d"`
day = int64(time.Hour * 24)
)

// Duration acts like a time.Duration with support for "d" unit
Expand Down Expand Up @@ -69,7 +70,29 @@ func handleDays(s []byte) (time.Duration, error) {

// String returns a string value of DayDuration.
func (d Duration) String() string {
return time.Duration(d).String()
remainder := int64(d)
days := remainder / day
remainder = remainder % day
hours := remainder / int64(time.Hour)
remainder = remainder % int64(time.Hour)
mins := remainder / int64(time.Minute)
remainder = remainder % int64(time.Minute)
secs := remainder / int64(time.Second)

display := ""
if days != 0 {
display = fmt.Sprintf("%dd", days)
}
if hours != 0 {
display = fmt.Sprintf("%s%dh", display, hours)
}
if mins != 0 {
display = fmt.Sprintf("%s%dm", display, mins)
}
if secs != 0 {
display = fmt.Sprintf("%s%ds", display, secs)
}
return display
}

// StringToDayDurationHookFunc returns a mapstructure.DecodeHookFunc that
Expand All @@ -96,32 +119,3 @@ func StringToDayDurationHookFunc() mapstructure.DecodeHookFunc {
return d, nil
}
}

// String returns the output in form of days:hours:mins:secs
func String(d Duration) string {
const day = int64(time.Hour * 24)

remainder := int64(d)
days := remainder / day
remainder = remainder % day
hours := remainder / int64(time.Hour)
remainder = remainder % int64(time.Hour)
mins := remainder / int64(time.Minute)
remainder = remainder % int64(time.Minute)
secs := remainder / int64(time.Second)

display := ""
if days != 0 {
display = fmt.Sprintf("%dd", days)
}
if hours != 0 {
display = fmt.Sprintf("%s%dh", display, hours)
}
if mins != 0 {
display = fmt.Sprintf("%s%dm", display, mins)
}
if secs != 0 {
display = fmt.Sprintf("%s%ds", display, secs)
}
return display
}
2 changes: 1 addition & 1 deletion pkg/internal/day/duration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestString(t *testing.T) {
t.Run(tc.in, func(t *testing.T) {
var d Duration
require.NoError(t, d.UnmarshalText([]byte(tc.in)))
require.Equal(t, tc.out, String(d))
require.Equal(t, tc.out, d.String())
})
}
}
7 changes: 5 additions & 2 deletions pkg/pgmodel/metrics/database/metric_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/timescale/promscale/pkg/internal/day"
"github.com/timescale/promscale/pkg/pgxconn"
"github.com/timescale/promscale/pkg/util"
)
Expand Down Expand Up @@ -57,8 +59,9 @@ INNER JOIN timescaledb_information.job_stats js ON ( j.job_id = js.job_id AND j.
if err != nil {
return fmt.Errorf("error scanning values for execute_caggs_refresh_policy: %w", err)
}
caggsRefreshSuccess.With(prometheus.Labels{"refresh_interval": refreshInterval.String()}).Set(float64(success))
caggsRefreshTotal.With(prometheus.Labels{"refresh_interval": refreshInterval.String()}).Set(float64(total))
tmp := day.Duration(refreshInterval) // This allows label values to have 25h -> 1d1h, which is easier to understand and matches more to the user's original input.
caggsRefreshSuccess.With(prometheus.Labels{"refresh_interval": tmp.String()}).Set(float64(success))
caggsRefreshTotal.With(prometheus.Labels{"refresh_interval": tmp.String()}).Set(float64(total))
}
return nil
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/tests/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ func init() {

PromscaleExtensionVersion = strings.TrimSpace(string(content))
PromscaleExtensionContainer = "ghcr.io/timescale/dev_promscale_extension:" + PromscaleExtensionVersion + "-ts2-pg14"
PromscaleExtensionContainer = "local/dev_promscale_extension:head-ts2-pg14" // This will be removed once the PR against master is made.
//PromscaleExtensionContainer = "local/dev_promscale_extension:head-ts2-pg14" // This will be removed once the PR against master is made.
}
8 changes: 4 additions & 4 deletions pkg/tests/end_to_end_tests/continuous_agg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestContinuousAggDownsampling(t *testing.T) {
}{
{
name: "Query non-existant column, empty result",
query: `cagg{__column__="nonexistant"}`,
query: `cagg{__schema__="cagg_schema",__column__="nonexistant"}`,
startMs: startTime,
endMs: endTime,
stepMs: 360 * 1000,
Expand Down Expand Up @@ -78,7 +78,7 @@ func TestContinuousAggDownsampling(t *testing.T) {
},
{
name: "Query max column",
query: `cagg{__column__="max",instance="1"}`,
query: `cagg{__schema__="cagg_schema",__column__="max",instance="1"}`,
startMs: startTime,
endMs: startTime + 4*3600*1000 - 1, // -1ms to exclude fifth value
stepMs: 3600 * 1000,
Expand All @@ -104,7 +104,7 @@ func TestContinuousAggDownsampling(t *testing.T) {
},
{
name: "Query min column",
query: `cagg{__column__="min",instance="1"}`,
query: `cagg{__schema__="cagg_schema",__column__="min",instance="1"}`,
startMs: startTime,
endMs: startTime + 4*3600*1000 - 1, // -1ms to exclude fifth value
stepMs: 3600 * 1000,
Expand All @@ -130,7 +130,7 @@ func TestContinuousAggDownsampling(t *testing.T) {
},
{
name: "Query avg column",
query: `cagg{__column__="avg",instance="1"}`,
query: `cagg{__schema__="cagg_schema",__column__="avg",instance="1"}`,
startMs: startTime,
endMs: startTime + 4*3600*1000 - 1, // -1ms to exclude fifth value
stepMs: 3600 * 1000,
Expand Down
4 changes: 2 additions & 2 deletions pkg/tests/end_to_end_tests/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,8 @@ func TestTelemetrySQLStats(t *testing.T) {
require.NoError(t, engine.Sync())

err = conn.QueryRow(context.Background(), "SELECT value FROM _timescaledb_catalog.metadata WHERE key = 'promscale_metrics_total' AND value IS NOT NULL").Scan(&metrics)
require.NoError(t, err)
require.Equal(t, "0", metrics) // Without promscale_extension, this will give error saying "no rows in result set".
require.NoError(t, err) // Without promscale_extension, this will give error saying "no rows in result set".
require.Equal(t, "0", metrics)

// Add dummy metric.
_, err = conn.Exec(context.Background(), "SELECT _prom_catalog.create_metric_table('test_metric')")
Expand Down

0 comments on commit a2eba1d

Please sign in to comment.