Skip to content

Commit

Permalink
feat: include 0 count statistics throughout the network lifespan (#1724)
Browse files Browse the repository at this point in the history
  • Loading branch information
sborrazas committed Apr 9, 2024
1 parent 65575cb commit dc7e145
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 68 deletions.
18 changes: 18 additions & 0 deletions lib/ae_mdw/db/util.ex
Expand Up @@ -329,4 +329,22 @@ defmodule AeMdw.Db.Util do
|> :aec_db.get_header()
|> :aec_headers.time_in_msecs()
end

@spec network_date_interval(state()) :: {Date.t(), Date.t()}
def network_date_interval(state) do
case State.prev(state, Model.Time, nil) do
{:ok, {end_time, _end_tx_index}} ->
{:ok, {start_time, _start_tx_index}} = State.next(state, Model.Time, nil)

[start_date, end_date] =
[start_time, end_time]
|> Enum.map(&DateTime.to_date(DateTime.from_unix!(div(&1, 1_000))))

{start_date, end_date}

:none ->
{:ok, start_date} = Date.new(2018, 12, 11)
{start_date, start_date}
end
end
end
91 changes: 76 additions & 15 deletions lib/ae_mdw/stats.ex
Expand Up @@ -259,8 +259,9 @@ defmodule AeMdw.Stats do

defp build_statistics_streamer(state, tag, filters, _scope, cursor) do
interval_by = Map.get(filters, :interval_by, :day)
min_date = filters |> Map.get(:min_start_date) |> to_interval(interval_by, Util.min_int())
max_date = filters |> Map.get(:max_start_date) |> to_interval(interval_by, Util.max_int())
{start_network_date, end_network_date} = DbUtil.network_date_interval(state)
min_date = filters |> Map.get(:min_start_date, start_network_date) |> to_interval(interval_by)
max_date = filters |> Map.get(:max_start_date, end_network_date) |> to_interval(interval_by)
key_boundary = {{tag, interval_by, min_date}, {tag, interval_by, max_date}}

cursor =
Expand All @@ -270,40 +271,98 @@ defmodule AeMdw.Stats do
end

fn direction ->
Collection.stream(state, Model.Statistic, direction, key_boundary, cursor)
state
|> Collection.stream(Model.Statistic, direction, key_boundary, cursor)
|> fill_missing_dates(tag, interval_by, direction, cursor, min_date, max_date)
end
end

defp render_statistic(state, {_tag, :month, interval_start} = statistic_key) do
Model.statistic(count: count) = State.fetch!(state, Model.Statistic, statistic_key)
defp fill_missing_dates(stream, tag, interval_by, :backward, cursor, min_date, max_date) do
max_date =
case cursor do
nil -> max_date
{_tag, _interval_by, interval_start} -> min(max_date, interval_start)
end

stream
|> Stream.concat([:end])
|> Stream.transform(max_date, fn
:end, acc ->
virtual_stream = Stream.map(acc..min_date//-1, &{:virtual, {tag, interval_by, &1}, 0})

{virtual_stream, acc}

{_tag, _interval_by, interval_start} = interval_key, interval_start ->
{[{:db, interval_key}], interval_start - 1}

{_tag, _interval_by, interval_start} = interval_key, acc ->
virtual_stream =
acc..(interval_start + 1)
|> Stream.map(&{:virtual, {tag, interval_by, &1}, 0})
|> Stream.concat([{:db, interval_key}])

{virtual_stream, interval_start - 1}
end)
end

defp fill_missing_dates(stream, tag, interval_by, :forward, cursor, min_date, max_date) do
min_date =
case cursor do
nil -> min_date
{_tag, _interval_by, interval_start} -> max(min_date, interval_start)
end

stream
|> Stream.concat([:end])
|> Stream.transform(min_date, fn
:end, acc ->
virtual_stream = Stream.map(acc..max_date//1, &{:virtual, {tag, interval_by, &1}, 0})

{virtual_stream, acc}

{_tag, _interval_by, interval_start} = interval_key, interval_start ->
{[{:db, interval_key}], interval_start + 1}

{_tag, _interval_by, interval_start} = interval_key, acc ->
virtual_stream =
acc..(interval_start - 1)
|> Stream.map(&{:virtual, {tag, interval_by, &1}, 0})
|> Stream.concat([{:db, interval_key}])

{virtual_stream, interval_start + 1}
end)
end

defp render_statistic(_state, {:virtual, {_tag, :month, interval_start}, count}) do
%{
start_date: months_to_iso(interval_start),
end_date: months_to_iso(interval_start + 1),
count: count
}
end

defp render_statistic(state, {_tag, :week, interval_start} = statistic_key) do
Model.statistic(count: count) = State.fetch!(state, Model.Statistic, statistic_key)

defp render_statistic(_state, {:virtual, {_tag, :week, interval_start}, count}) do
%{
start_date: days_to_iso(interval_start * @days_per_week),
end_date: days_to_iso((interval_start + 1) * @days_per_week),
count: count
}
end

defp render_statistic(state, {_tag, :day, interval_start} = statistic_key) do
Model.statistic(count: count) = State.fetch!(state, Model.Statistic, statistic_key)

defp render_statistic(_state, {:virtual, {_tag, :day, interval_start}, count}) do
%{
start_date: days_to_iso(interval_start),
end_date: days_to_iso(interval_start + 1),
count: count
}
end

defp render_statistic(state, {:db, statistic_key}) do
Model.statistic(count: count) = State.fetch!(state, Model.Statistic, statistic_key)

render_statistic(state, {:virtual, statistic_key, count})
end

defp convert_blocks_param({"type", "key"}), do: {:ok, {:block_type, :key}}
defp convert_blocks_param({"type", "micro"}), do: {:ok, {:block_type, :micro}}
defp convert_blocks_param(param), do: convert_param(param)
Expand Down Expand Up @@ -337,7 +396,11 @@ defmodule AeMdw.Stats do
end
end

defp serialize_statistics_cursor({_tag, _interval_by, interval_start}), do: "#{interval_start}"
defp serialize_statistics_cursor({:db, {_tag, _interval_by, interval_start}}),
do: "#{interval_start}"

defp serialize_statistics_cursor({:virtual, {_tag, _interval_by, interval_start}, _count}),
do: "#{interval_start}"

defp deserialize_statistic_cursor(nil), do: {:ok, nil}

Expand Down Expand Up @@ -536,9 +599,7 @@ defmodule AeMdw.Stats do
|> Date.to_iso8601()
end

defp to_interval(nil, _interval_by, default), do: default

defp to_interval(date, interval_by, _default) do
defp to_interval(date, interval_by) do
seconds = date |> DateTime.new!(Time.new!(0, 0, 0)) |> DateTime.to_unix()
day_start = div(seconds, @seconds_per_day)

Expand Down

0 comments on commit dc7e145

Please sign in to comment.