Skip to content

Commit

Permalink
Merge branch 'main'
Browse files Browse the repository at this point in the history
  • Loading branch information
DVAlexHiggs committed Mar 22, 2023
2 parents bd3c5c5 + e9f2ac8 commit bd7408b
Show file tree
Hide file tree
Showing 14 changed files with 223 additions and 118 deletions.
11 changes: 1 addition & 10 deletions dbt_project.yml
@@ -1,5 +1,5 @@
name: dbtvault
version: 0.9.2
version: 0.9.5
require-dbt-version: [">=1.0.0", "<2.0.0"]
config-version: 2

Expand All @@ -14,12 +14,3 @@ target-path: "target"
clean-targets:
- "target"
- "dbt_packages"

vars:
hash: MD5
hash_content_casing: 'UPPER' # Default UPPER, alternatively DISABLED
null_key_required: '-1' # Default -1, allows user to configure
null_key_optional: '-2' # Default -2, allows user to configure
enable_ghost_records: false #default false to ghost records are enabled
system_record_value: 'DBTVAULT_SYSTEM' #Default DBTVAULT_SYSTEM, allows user to configure

4 changes: 2 additions & 2 deletions macros/internal/helpers/logging/log_relation_sources.sql
Expand Up @@ -9,7 +9,7 @@

{% macro default__log_relation_sources(relation, source_count) %}

{%- if execute -%}
{%- if 'docs' not in invocation_args_dict['rpc_method'] and execute -%}

{%- do dbt_utils.log_info('Loading {} from {} source(s)'.format("{}.{}.{}".format(relation.database, relation.schema, relation.identifier),
source_count)) -%}
Expand All @@ -18,7 +18,7 @@

{% macro databricks__log_relation_sources(relation, source_count) %}

{%- if execute -%}
{%- if 'docs' not in invocation_args_dict['rpc_method'] and execute -%}

{%- do dbt_utils.log_info('Loading {} from {} source(s)'.format("{}.{}".format(relation.schema, relation.identifier),
source_count)) -%}
Expand Down
54 changes: 54 additions & 0 deletions macros/materialisations/period_mat_helpers/check_datediff.sql
@@ -0,0 +1,54 @@
/*
* Copyright (c) Business Thinking Ltd. 2019-2023
* This software includes code developed by the dbtvault Team at Business Thinking Ltd. Trading as Datavault
*/

{%- macro check_num_periods(start_date, stop_date, period) -%}

{% set num_periods = adapter.dispatch('check_num_periods',
'dbtvault')(start_date=start_date,
stop_date=stop_date,
period=period) %}

{%- if num_periods > 100000 -%}
{%- set error_message -%}
'Max iterations is 100,000. Consider using a different datepart value (e.g. day)
or loading data for a shorter time period.
vault_insert_by materialisations are not intended for this purpose,
please see https://dbtvault.readthedocs.io/en/latest/materialisations/'
{%- endset -%}

{{- exceptions.raise_compiler_error(error_message) -}}
{%- endif -%}

{% do return(num_periods) %}

{%- endmacro %}

{% macro default__check_num_periods(start_date, stop_date, period) %}

{% set num_periods_check_sql %}
SELECT {{ datediff('start_timestamp', 'stop_timestamp', period) }} AS NUM_PERIODS
FROM
(SELECT CAST('{{ start_date }}' AS {{ dbt.type_timestamp() }}) AS start_timestamp,
CAST(NULLIF('{{ stop_date | lower }}', 'none') AS {{ dbt.type_timestamp() }}) AS stop_timestamp)
{% endset %}
{% set num_periods_dict = dbtvault.get_query_results_as_dict(num_periods_check_sql) %}
{% set num_periods = num_periods_dict['NUM_PERIODS'][0] | int %}

{% do return(num_periods) %}

{% endmacro %}

{% macro sqlserver__check_num_periods(start_date, stop_date, period) %}

{% set num_periods_check_sql %}
SELECT DATEDIFF_BIG({{ period }}, CAST('{{ start_date }}' AS DATETIME2),
CAST(NULLIF('{{ stop_date | lower }}', 'none') AS DATETIME2)) AS NUM_PERIODS
{% endset %}
{% set num_periods_dict = dbtvault.get_query_results_as_dict(num_periods_check_sql) %}
{% set num_periods = num_periods_dict['NUM_PERIODS'][0] | int %}

{% do return(num_periods) %}

{% endmacro %}
Expand Up @@ -92,8 +92,8 @@
WITH period_data AS (
SELECT
CAST(COALESCE(MAX({{ timestamp_field }}), CAST('{{ start_date }}' AS DATETIME2)) AS DATETIME2) AS start_timestamp,
COALESCE({{ dbtvault.dateadd('millisecond', 86399999, from_date_or_timestamp) }},
{{ current_timestamp() }} ) AS stop_timestamp
CAST(COALESCE({{ dbtvault.dateadd('millisecond', 86399999, from_date_or_timestamp) }},
{{ current_timestamp() }} ) AS DATETIME2) AS stop_timestamp
FROM {{ target_relation }}
)
SELECT
Expand Down
Expand Up @@ -29,6 +29,8 @@

{%- do dbtvault.check_placeholder(sql) -%}

{%- do dbtvault.check_num_periods(start_stop_dates.start_date, start_stop_dates.stop_date, period) -%}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
Expand Down
145 changes: 78 additions & 67 deletions macros/materialisations/vault_insert_by_rank_materialization.sql
Expand Up @@ -53,74 +53,85 @@
{% set build_sql = create_table_as(False, target_relation, filtered_sql) %}
{% else %}

{% set target_columns = adapter.get_columns_in_relation(target_relation) %}
{%- set target_cols_csv = target_columns | map(attribute='quoted') | join(', ') -%}
{%- set loop_vars = {'sum_rows_inserted': 0} -%}

{% for i in range(min_max_ranks.max_rank | int ) -%}

{%- set iteration_number = i + 1 -%}

{%- set filtered_sql = dbtvault.replace_placeholder_with_rank_filter(sql, rank_column, iteration_number) -%}

{{ dbt_utils.log_info("Running for {} {} of {} on column '{}' [{}]".format('rank', iteration_number, min_max_ranks.max_rank, rank_column, model.unique_id)) }}

{% set tmp_relation = make_temp_relation(target_relation) %}

{# This call statement drops and then creates a temporary table #}
{# but MSSQL will fail to drop any temporary table created by a previous loop iteration #}
{# See MSSQL note and drop code below #}
{% call statement() -%}
{{ create_table_as(True, tmp_relation, filtered_sql) }}
{% if min_max_ranks.max_rank | int > 100000 %}
{%- set error_message -%}
'Max iterations is 100,000. Consider using a different rank column
or loading a smaller amount of data.
vault_insert_by materialisations are not intended for this purpose,
please see https://dbtvault.readthedocs.io/en/latest/materialisations/'
{%- endset -%}

{{- exceptions.raise_compiler_error(error_message) -}}
{% else %}
{% set target_columns = adapter.get_columns_in_relation(target_relation) %}
{%- set target_cols_csv = target_columns | map(attribute='quoted') | join(', ') -%}
{%- set loop_vars = {'sum_rows_inserted': 0} -%}

{% for i in range(min_max_ranks.max_rank | int ) -%}

{%- set iteration_number = i + 1 -%}

{%- set filtered_sql = dbtvault.replace_placeholder_with_rank_filter(sql, rank_column, iteration_number) -%}

{{ dbt_utils.log_info("Running for {} {} of {} on column '{}' [{}]".format('rank', iteration_number, min_max_ranks.max_rank, rank_column, model.unique_id)) }}

{% set tmp_relation = make_temp_relation(target_relation) %}

{# This call statement drops and then creates a temporary table #}
{# but MSSQL will fail to drop any temporary table created by a previous loop iteration #}
{# See MSSQL note and drop code below #}
{% call statement() -%}
{{ create_table_as(True, tmp_relation, filtered_sql) }}
{%- endcall %}

{{ adapter.expand_target_column_types(from_relation=tmp_relation,
to_relation=target_relation) }}

{%- set insert_query_name = 'main-' ~ i -%}
{% call statement(insert_query_name, fetch_result=True) -%}
INSERT INTO {{ target_relation }} ({{ target_cols_csv }})
(
SELECT {{ target_cols_csv }}
FROM {{ tmp_relation.include(schema=True) }}
);
{%- endcall %}

{% set result = load_result(insert_query_name) %}
{% if 'response' in result.keys() %} {# added in v0.19.0 #}
{# Investigate for Databricks #}
{%- if result['response']['rows_affected'] == None %}
{% set rows_inserted = 0 %}
{%- else %}
{% set rows_inserted = result['response']['rows_affected'] %}
{%- endif %}

{% else %} {# older versions #}
{% set rows_inserted = result['status'].split(" ")[2] | int %}
{% endif %}

{%- set sum_rows_inserted = loop_vars['sum_rows_inserted'] + rows_inserted -%}
{%- do loop_vars.update({'sum_rows_inserted': sum_rows_inserted}) %}

{{ dbt_utils.log_info("Ran for {} {} of {}; {} records inserted [{}]".format('rank', iteration_number,
min_max_ranks.max_rank,
rows_inserted,
model.unique_id)) }}

{# In databricks and sqlserver a temporary view/table can only be dropped by #}
{# the connection or session that created it so drop it now before the commit below closes this session #} model.unique_id)) }}
{% if target.type in ['databricks', 'sqlserver'] %}
{{ dbtvault.drop_temporary_special(tmp_relation) }}
{% else %}
{% do to_drop.append(tmp_relation) %}
{% endif %}

{% do adapter.commit() %}

{% endfor %}
{% call noop_statement('main', "INSERT {}".format(loop_vars['sum_rows_inserted']) ) -%}
{{ filtered_sql }}
{%- endcall %}

{{ adapter.expand_target_column_types(from_relation=tmp_relation,
to_relation=target_relation) }}

{%- set insert_query_name = 'main-' ~ i -%}
{% call statement(insert_query_name, fetch_result=True) -%}
INSERT INTO {{ target_relation }} ({{ target_cols_csv }})
(
SELECT {{ target_cols_csv }}
FROM {{ tmp_relation.include(schema=True) }}
);
{%- endcall %}

{% set result = load_result(insert_query_name) %}
{% if 'response' in result.keys() %} {# added in v0.19.0 #}
{# Investigate for Databricks #}
{%- if result['response']['rows_affected'] == None %}
{% set rows_inserted = 0 %}
{%- else %}
{% set rows_inserted = result['response']['rows_affected'] %}
{%- endif %}

{% else %} {# older versions #}
{% set rows_inserted = result['status'].split(" ")[2] | int %}
{% endif %}

{%- set sum_rows_inserted = loop_vars['sum_rows_inserted'] + rows_inserted -%}
{%- do loop_vars.update({'sum_rows_inserted': sum_rows_inserted}) %}

{{ dbt_utils.log_info("Ran for {} {} of {}; {} records inserted [{}]".format('rank', iteration_number,
min_max_ranks.max_rank,
rows_inserted,
model.unique_id)) }}

{# In databricks and sqlserver a temporary view/table can only be dropped by #}
{# the connection or session that created it so drop it now before the commit below closes this session #} model.unique_id)) }}
{% if target.type in ['databricks', 'sqlserver'] %}
{{ dbtvault.drop_temporary_special(tmp_relation) }}
{% else %}
{% do to_drop.append(tmp_relation) %}
{% endif %}

{% do adapter.commit() %}

{% endfor %}
{% call noop_statement('main', "INSERT {}".format(loop_vars['sum_rows_inserted']) ) -%}
{{ filtered_sql }}
{%- endcall %}
{% endif %}

{% endif %}

Expand Down
4 changes: 2 additions & 2 deletions macros/supporting/data_types/type_string.sql
Expand Up @@ -21,9 +21,9 @@

{%- macro databricks__type_string(is_hash=false, char_length=255) -%}
{%- if is_hash -%}
{%- if var('hash') | lower == 'md5' -%}
{%- if var('hash', 'MD5') | lower == 'md5' -%}
VARCHAR(16)
{%- elif var('hash') | lower == 'sha' -%}
{%- elif var('hash', 'MD5') | lower == 'sha' -%}
VARCHAR(32)
{%- endif -%}
{%- else -%}
Expand Down
8 changes: 5 additions & 3 deletions macros/supporting/ghost_records/binary_ghost.sql
Expand Up @@ -20,10 +20,12 @@

{%- macro sqlserver__binary_ghost(alias, hash) -%}
{%- if hash | lower == 'md5' -%}
CAST(REPLICATE(CAST(CAST('0' AS tinyint) AS BINARY(16)), 16) AS BINARY(16)) AS {{ alias }}
CAST(REPLICATE(CAST(CAST('0' AS tinyint) AS BINARY(16)), 16) AS BINARY(16))
{%- elif hash | lower == 'sha' -%}
CAST(REPLICATE(CAST(CAST('0' AS tinyint) AS BINARY(32)), 32) AS BINARY(32)) AS {{ alias }}
CAST(REPLICATE(CAST(CAST('0' AS tinyint) AS BINARY(32)), 32) AS BINARY(32))
{%- else -%}
CAST(REPLICATE(CAST(CAST('0' AS tinyint) AS BINARY(16)), 16) AS BINARY(16)) AS {{ alias }}
CAST(REPLICATE(CAST(CAST('0' AS tinyint) AS BINARY(16)), 16) AS BINARY(16))
{%- endif -%}

{%- if alias %} AS {{ alias }} {%- endif -%}
{%- endmacro -%}
6 changes: 1 addition & 5 deletions macros/supporting/ghost_records/create_ghost_record.sql
Expand Up @@ -40,11 +40,7 @@
{%- do col_definitions.append(col_sql) -%}

{%- elif ((col_name | lower) == (src_eff | lower)) or ((col_name | lower) == (src_ldts | lower))-%}
{%- if (col.dtype | lower) == 'date' -%}
{%- set col_sql = dbtvault.cast_date('1900-01-01', as_string=true, datetime=false, alias=col_name)-%}
{%- else -%}
{%- set col_sql = dbtvault.cast_date('1900-01-01 00:00:00', as_string=true, datetime=true, alias=col_name, date_type=col.dtype)-%}
{%- endif -%}
{% set col_sql = dbtvault.date_ghost(date_type = (col.dtype | lower), alias=col_name) -%}
{%- do col_definitions.append(col_sql) -%}

{%- elif (col_name | lower) == (src_source | lower) -%}
Expand Down
13 changes: 13 additions & 0 deletions macros/supporting/ghost_records/date_ghost.sql
@@ -0,0 +1,13 @@
{%- macro date_ghost(date_type, alias) -%}
{{ adapter.dispatch('date_ghost', 'dbtvault')(date_type=date_type, alias=alias) }}
{%- endmacro -%}

{%- macro default__date_ghost(date_type, alias=none) -%}

{%- if date_type == 'date' -%}
{{ dbtvault.cast_date('1900-01-01', as_string=true, datetime=false, alias=alias) }}
{%- else -%}
{{ dbtvault.cast_date('1900-01-01 00:00:00', as_string=true, datetime=true, alias=alias, date_type=date_type) }}
{%- endif -%}

{%- endmacro -%}
2 changes: 1 addition & 1 deletion macros/tables/bigquery/eff_sat.sql
Expand Up @@ -140,7 +140,7 @@ new_closed_records AS (
h.{{ src_ldts }},
lo.{{ src_source }}
FROM source_data AS h
LEFT JOIN Latest_open AS lo
LEFT JOIN latest_open AS lo
ON lo.{{ src_pk }} = h.{{ src_pk }}
LEFT JOIN latest_closed AS lc
ON lc.{{ src_pk }} = h.{{ src_pk }}
Expand Down

0 comments on commit bd7408b

Please sign in to comment.