Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect data gaps and track elapsed time in time_weight summaries #787

Open
smyrman opened this issue Dec 14, 2023 · 4 comments
Open

Detect data gaps and track elapsed time in time_weight summaries #787

smyrman opened this issue Dec 14, 2023 · 4 comments
Labels
proposed-feature A proposed feature or function to ease a task

Comments

@smyrman
Copy link

smyrman commented Dec 14, 2023

What's the functionality you would like to add

We would like bette handling of explicit data gaps for time_weight() summaries and accessors. That is, be able to exclude gaps in data within and between time buckets for the weighted sum, and let this affect the average, integral, interpolated_average and interpolated_integral accessors. For this exclusion to work, we must also track the elapsed time, as we can no longer assume it to be last_time - first_time.

Gaps can be detected in different ways. For us, we would like to declare a gap when there is more than duration D between two samples. There are other possible ways to detect gaps, such as look for explicit NULL values (which could e.g. be inserted by pre-processing the source time-series before aggregation). We don't mind which method is used as long as there is one method to detect them. There are some details here in how to treat the last value before a gap. Is it valid for duration D? Is it valid for a 0 duration because it's followed by a gap? Is this choice affected by the choice of LOCF or LINEAR algorithm? Let's not get into that just yet, but a choice needs to be made.

What matters is that when you then calculate the TimeWeightSummary, we would like for the detected gaps to affect the weighted sum field. We would also want to propose adding an additional field which accurately tracks the elapsed time where the time spent in gaps are omitted.

Finally, let average, integral, interpolated_average and interpolated_integral use the elapsed time value to report their numbers. Also allow an accessor for the elapsed_time value.

How would the function be used

Allow the same time-weight summaries to be used to cover more cases:

  • Integral reporting in case of gaps (e.g. power consumption where gaps should not be counted).
  • Average reporting in case of gaps (e.g temperature).
  • Allow reporting the measured time per bucket (e.g. to report on data quality).

We could e.g. query for the average and elapsed time.

Why should this feature be added?

We have been struggeling to find a consistent way of dealing with gaps (not filling them, but not counting them neither), using the Timescale two-step aggregations, such as [compact_]state_agg and time_weight. Especially gaps within an evaluation bucket, but also between them (with interpolate), they must be handled correctly.

For us, we define a gap if the distance between two points is larger than a "gap detection" interval D, where D is configurable attribute per data-source. I.e. something similar to the heartbeat_agg, except we are interested in weighted sums and state aggregations, not the gaps themselves. Ideally, if using locf, we consider the last value as valid for duration D before a gap, while for linear, we would start the gap at the last point. At least this is what we thought makes the most sense..

To explain why we care, consider a time_weight solution that should work generically, either the system logs a temperature where we want a weighted average, or power consumption (W) that should be integrated into kWh. We can not rely on inserting fake values into the source signal if we want the same time weight summary to allow collecting good data for both. I.e. without elapsed time and gap-detection for for power consumption, if we consider the system to not draw power when it doesn't log data, we would need to insert 0 values before each gap in the source signal before aggregation. If we however do the same for temperature, then we draw the TWA closer to 0, which makes the result invalid.

A much better solution, in our view, is to not pre-insert any fake data, but rather add explicit gap detection and tracking of elapsed time.

What scale is this useful at?

Especially useful in general purpose applications for time-series rather than tailor made systems were you can do what you want.

Drawbacks

Handling backwards compatibility may be a challenge since the solution is proposing changes to the TimeWeightSummary structure?

Open Questions

Are any questions we'd need to address before releasing this feature?

One must decide how to detect gaps. E.g. either look for explicit NULL values or look for durations longer than a specific period D between two time-stamps.

For either alternative, one must decide how the last value before a gap should be counted. In our view, this means considering the last value before a gap valid for the gap detection duration if using the locf algorithm. However, this comes with additional challenges for accessors when the query is returning results in time buckets, as you would ideally want to split the final value of each bucket into one or more successive buckets, if the next gap is beyond the end of the bucket.

Alternatives

Use custom SQL aggregation functions and accessors.

@smyrman smyrman added the proposed-feature A proposed feature or function to ease a task label Dec 14, 2023
@smyrman smyrman changed the title Track elapsed time in time_weight summaries Detect data gaps and track elapsed time in time_weight summaries Dec 14, 2023
@smyrman
Copy link
Author

smyrman commented Dec 15, 2023

An SQL based proto-type for LOCF. assuming that the last value before a gap should be carried forward for the duration of the gapDetection setting. The sum does not count the last LOCF value carried forward; this must be handled by rollup and accessor methods.

An easier to use solution should store the method and gap detection in the TWS type, and refuse to combine results where the gap detection setting do not match, similar to what toolkit does today for the method parameter.

--- time-weight experiment

CREATE TABLE signal (time TIMESTAMPTZ, value DOUBLE PRECISION);

INSERT INTO signal VALUES ('2022-01-01T00:00:00Z', 1),  ('2022-01-01T02:00:00Z', 2), ('2022-01-01T02:30:00Z', 0);

CREATE TYPE tws AS (
	-- weighted sum
 	ws DOUBLE PRECISION,
	-- elapsed duration
  	d  INTERVAL,
 	-- first point
 	ft TIMESTAMPTZ,
  	fv DOUBLE PRECISION,
  	-- last point
	lt TIMESTAMPTZ,
	lv DOUBLE PRECISION
);



CREATE OR REPLACE FUNCTION  tws_add_locf(s TWS, ts TIMESTAMPTZ, value DOUBLE PRECISION, gap INTERVAL) RETURNS TWS AS $$
	DECLARE
		d INTERVAL;
	BEGIN
     	IF s IS NULL
      		THEN
        		RETURN ROW(CAST(0 AS DOUBLE PRECISION), CAST('PT0S' AS INTERVAL), ts, value, ts, value);
			ELSE
				d := ts - s.lt;
				IF d > gap
					THEN
						d := gap;
				END IF;
				RETURN ROW(
					s.ws + (s.lv*EXTRACT(epoch FROM d)),
					s.d + d,
					s.ft,
					s.fv,
					ts,
					value);
    	END IF;
	END
$$ LANGUAGE plpgsql IMMUTABLE;

CREATE OR REPLACE AGGREGATE tws_locf (ts TIMESTAMPTZ, value DOUBLE PRECISION, gap INTERVAL) (
    sfunc = tws_add_locf,
    stype = tws
);

CREATE OR REPLACE FUNCTION  tws_combine_locf(a TWS, b TWS, gap INTERVAL) RETURNS TWS AS $$
	DECLARE
		d INTERVAL;
	BEGIN
     	CASE
			WHEN a is NULL
				THEN
					return b;
			WHEN b is NULL
				THEN
					return a;
			ELSE
				d := b.ft - a.ft;
				IF d > gap
					THEN
						d := gap;
				END IF;
				RETURN ROW(
					a.ws + b.ws + (a.lv*EXTRACT(epoch FROM d)),
					a.d + b.d + d,
					a.ft,
					a.fv,
					b.lt,
					b.lv);
    	END CASE;
	END
$$ LANGUAGE plpgsql IMMUTABLE;



CREATE OR REPLACE AGGREGATE rollup_locf(s tws, gap INTERVAL) (
    sfunc = tws_combine_locf,
    stype = tws
);



SELECT time_bucket('PT3H', time) AS start_time, tws_locf(time, value, 'PT30M'::INTERVAL) AS s FROM (SELECT time, value FROM signal ORDER BY time) GROUP BY start_time;

SELECT  time_bucket('PT3H', start_time) as start_time2, rollup_locf(s, 'PT30M'::INTERVAL) FROM (SELECT time_bucket('PT1H', time) AS start_time, tws_locf(time, value, 'PT30M'::INTERVAL) AS s FROM (SELECT time, value FROM signal ORDER BY time) GROUP BY start_time) GROUP BY start_time2;

@smyrman
Copy link
Author

smyrman commented Dec 15, 2023

Second proto-type, storing gap detection setting and method in the summary, and also also handling linear aggregation (:

--- time-weight experiment

CREATE TABLE signal (time TIMESTAMPTZ, value DOUBLE PRECISION);

INSERT INTO signal VALUES ('2022-01-01T00:00:00Z', 1),  ('2022-01-01T02:00:00Z', 2), ('2022-01-01T02:30:00Z', 0);


CREATE TYPE tws_interpolation AS ENUM('linear', 'locf');
DROP TYPE tws CASCADE;
CREATE TYPE tws AS (
   	-- Identify aggregation type.
	m tws_interpolation,
	gap DOUBLE PRECISION,
	-- Time-weighted sum.
	ws DOUBLE PRECISION,
	-- Aggregated duration.
 	d  DOUBLE PRECISION,
	-- First point.
	ft TIMESTAMPTZ,
	fv DOUBLE PRECISION,
	-- Last point.
	lt TIMESTAMPTZ,
	lv DOUBLE PRECISION
);



CREATE OR REPLACE FUNCTION  add_to_tws(s TWS, ts TIMESTAMPTZ, value DOUBLE PRECISION, gap_seconds DOUBLE PRECISION, method tws_interpolation) RETURNS TWS AS $$
	DECLARE
		d DOUBLE PRECISION;
	BEGIN
     	CASE
     		WHEN s IS NULL
      			THEN
        			RETURN ROW(method, gap_seconds, CAST(0 AS DOUBLE PRECISION), CAST(0 AS DOUBLE PRECISION), ts, value, ts, value);
			WHEN s.m != method
				THEN
					RAISE EXCEPTION SQLSTATE '90001' USING MESSAGE = 'interpolation method must match aggregation state interpolation method';
		WHEN s.gap != gap_seconds
				THEN
					RAISE EXCEPTION SQLSTATE '90001' USING MESSAGE = 'gap detection must match aggregation state';
		WHEN method = 'locf'
			THEN
				d := EXTRACT(epoch FROM (ts - s.lt));
				IF d > gap_seconds
					THEN
						d := gap_seconds;
				END IF;
				RETURN ROW(
					method,
					gap_seconds,
					s.ws + (s.lv*d),
					s.d + d,
					s.ft,
					s.fv,
					ts,
					value);
		WHEN method = 'linear'
			THEN
				d := EXTRACT(epoch FROM (ts - s.lt));
				IF d > gap_seconds
					THEN
						d := 0;
				END IF;
				RETURN ROW(
					method,
					gap_seconds,
					s.ws + ((s.lv+value)*d/2),
					s.d + d,
					s.ft,
					s.fv,
					ts,
					value);
		ELSE
		 	RAISE EXCEPTION SQLSTATE '90001' USING MESSAGE = 'invalid interpolation method';
    	END CASE;
	END
$$ LANGUAGE plpgsql IMMUTABLE;

CREATE OR REPLACE AGGREGATE time_weight_gap(ts TIMESTAMPTZ, value DOUBLE PRECISION, gap_seconds DOUBLE PRECISION, method tws_interpolation) (
    sfunc = add_to_tws,
    stype = tws
);


CREATE OR REPLACE FUNCTION  tws_combine(a TWS, b TWS) RETURNS TWS AS $$
	DECLARE
		d DOUBLE PRECISION;
	BEGIN
     	CASE
			WHEN a is NULL
				THEN
					return b;
			WHEN b is NULL
				THEN
					return a;
			WHEN a.m != b.m
				THEN
					RAISE EXCEPTION SQLSTATE '90001' USING MESSAGE = 'summary interpolation method must match';
			WHEN a.gap != b.gap
				THEN
					RAISE EXCEPTION SQLSTATE '90001' USING MESSAGE = 'summary gap detection must match';
			WHEN a.m = 'locf'
				THEN
					d := EXTRACT(epoch FROM (b.ft - a.lt));
					IF d > a.gap
						THEN
							d := a.gap;
					END IF;
					RETURN ROW(
						a.m,
						a.gap,
						a.ws + b.ws + (a.lv*d),
						a.d + b.d + d,
						a.ft,
						a.fv,
						b.lt,
						b.lv);
			WHEN a.m = 'linear'
				THEN
					d := EXTRACT(epoch FROM (b.ft - a.lt));
					IF d > a.gap
						THEN
							d := 0;
					END IF;
					RETURN ROW(
						a.m,
						a.gap,
						a.ws + b.ws + ((a.lv+b.fv)*d/2),
						a.d + b.d + d,
						a.ft,
						a.fv,
						b.lt,
						b.lv);
		ELSE
		 	RAISE EXCEPTION SQLSTATE '90001' USING MESSAGE = 'invalid interpolation method';
    	END CASE;
	END
$$ LANGUAGE plpgsql IMMUTABLE;



CREATE OR REPLACE AGGREGATE rollup(s tws) (
    sfunc = tws_combine,
    stype = tws
);



SELECT time_bucket('PT3H', time) AS start_time, time_weight_gap(time, value, 1800.0, 'locf') AS s FROM (SELECT time, value FROM signal ORDER BY time) GROUP BY start_time;
SELECT time_bucket('PT3H', time) AS start_time, time_weight_gap(time, value, 1800.0, 'linear') AS s FROM (SELECT time, value FROM signal ORDER BY time) GROUP BY start_time;


SELECT time_bucket('PT3H', start_time) as start_time2, rollup(s) FROM (SELECT time_bucket('PT1H', time) AS start_time, time_weight_gap(time, value, 1800.0, 'locf') AS s FROM (SELECT time, value FROM signal ORDER BY time) GROUP BY start_time) GROUP BY start_time2;
SELECT time_bucket('PT3H', start_time) as start_time2, rollup(s) FROM (SELECT time_bucket('PT1H', time) AS start_time, time_weight_gap(time, value, 1800.0, 'linear') AS s FROM (SELECT time, value FROM signal ORDER BY time) GROUP BY start_time) GROUP BY start_time2;

@smyrman
Copy link
Author

smyrman commented Dec 18, 2023

PS! Ideally we would like a similar solution for [compact] stats aggregation with similar logic to locf interpolation. I.e. that if there is a gap, the value is considered valid for the duration of the gap detection, and that we are able to not count gap values has having a state.

@smyrman
Copy link
Author

smyrman commented Dec 18, 2023

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
proposed-feature A proposed feature or function to ease a task
Projects
None yet
Development

No branches or pull requests

1 participant