-
Notifications
You must be signed in to change notification settings - Fork 0
/
scd2_bq_procedure.sql
199 lines (183 loc) · 7.81 KB
/
scd2_bq_procedure.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
/*
Copyright 2023 Google. This software is provided as-is, without warranty or representation for any use or purpose. Your use of it is subject to your agreement with Google.
BigQuery Stored Procecedure for SCD Type 2 Transformation.
Assumptions:
- Source and target tables contain the same columns and data types.
- Data type of start_date and end_date is (DATE, DATETIME or TIMESTAMP)
- Primary key columns cannot be null
- To faciliate null comparisons, the following special "null" values are used
- STRING: 'NULL'
- DATE/DATETIME/TIMESTAMP: '1900-01-01'
- NUMERIC datatypes: -1
Sample
call test.perform_scd2_transform(
'my-test-project',
'my_dataset',
'source_table',
'my_dataset',
'target_table',
'',
'pk1,pk2',
'start_date',
'end_date',
''
);
*/
CREATE OR REPLACE PROCEDURE test.perform_scd2_transform (
project_id STRING,
target_dataset_id STRING,
target_table_name STRING,
source_dataset_id STRING,
source_table_name STRING,
source_table_filter STRING, -- WHERE clause condition to filter out source table (e.g., by ingestion date)
primary_keys STRING, -- comma separated list of key column
start_date_column STRING, -- DATE, DATETIME or TIMESTAMP
end_date_column STRING, -- DATE, DATETIME or TIMESTAMP
target_columns_to_update STRING -- Statement fragment to set column values on update (e.g., 'field1 = \'X\', field2 = 5').
-- This will be appended to the update portion of the merge statement.
)
BEGIN
-- Variables for Customisation
DECLARE numeric_nullvalue DEFAULT '-1';
DECLARE string_nullvalue DEFAULT '\'NULL\'';
DECLARE date_nullvalue DEFAULT '\'1900-01-01\'';
DECLARE new_record_end_date DEFAULT '\"9999-12-31 23:59:59\"';
-- Internal Variables
DECLARE target_table_name_full STRING;
DECLARE source_table_name_full STRING;
DECLARE date_column_type STRING DEFAULT 'DATETIME';
DECLARE keyalias STRING DEFAULT '';
DECLARE nullkeys STRING DEFAULT '';
DECLARE key_join_cond STRING DEFAULT '';
DECLARE inner_key_join_cond DEFAULT '';
DECLARE i INT64 DEFAULT 0;
DECLARE nullvalue STRING;
DECLARE nonkey_join_cond DEFAULT '';
DECLARE nonkey_join_cond_w_end_date DEFAULT '';
DECLARE column_list DEFAULT '';
DECLARE merge_statement DEFAULT '';
DECLARE filter_clause DEFAULT '';
DECLARE update_clause DEFAULT '';
SET target_table_name_full = CONCAT(project_id, '.', target_dataset_id, '.', target_table_name);
SET source_table_name_full = CONCAT(project_id, '.', source_dataset_id, '.', source_table_name);
IF length(source_table_filter) > 0 THEN
SET filter_clause = CONCAT(' WHERE ', source_table_filter);
END IF;
IF length(target_columns_to_update) > 0 THEN
SET update_clause = CONCAT(', ', target_columns_to_update);
END IF;
-- get column details from INFORMATION_SCHEMA
EXECUTE IMMEDIATE concat(
'CREATE TEMP TABLE scd2_table_columns AS ',
'SELECT ordinal_position, column_name, data_type ',
'FROM `', project_id, '.', target_dataset_id, '.INFORMATION_SCHEMA.COLUMNS` ',
'WHERE table_catalog = \'', project_id, '\' ',
' AND table_schema = \'', target_dataset_id, '\' ',
' AND table_name = \'', target_table_name, '\' '
);
SET date_column_type = (
SELECT
data_type
FROM
scd2_table_columns
WHERE
column_name = start_date_column
);
-- generate list of column for the "insert" portion of the merge statement
FOR all_column in (
SELECT
column_name
FROM
scd2_table_columns
WHERE
column_name not in (start_date_column, end_date_column)
ORDER BY
ordinal_position
)
DO
SET column_list = concat(column_list, all_column.column_name, ',');
END FOR;
SET column_list = RTRIM(column_list, ',');
-- generate join conditions based on primary key columns
SET i = 0;
FOR key_column in (
SELECT
column_name,
data_type
FROM
scd2_table_columns
WHERE
column_name in (
select
*
from
unnest(split(primary_keys))
)
AND column_name not in (start_date_column, end_date_column)
ORDER BY
ordinal_position
) DO
SET nullvalue = numeric_nullvalue;
IF key_column.data_type = 'STRING' THEN
SET nullvalue = string_nullvalue;
ELSEIF key_column.data_type in ('DATE', 'DATETIME', 'TIMESTAMP') THEN
SET nullvalue = concat(key_column.data_type, ' ', date_nullvalue);
END IF;
SET keyalias = concat(keyalias, key_column.column_name, ' AS JK' , i, ',');
SET nullkeys = concat(nullkeys, nullvalue, ' AS JK' , i, ',');
SET key_join_cond = concat(key_join_cond, 'T.', key_column.column_name, ' = S.JK' , i, ' AND ');
SET inner_key_join_cond = concat(inner_key_join_cond, 'T.', key_column.column_name, ' = S.', key_column.column_name, ' AND ');
SET i = i + 1;
END FOR;
SET keyalias = RTRIM(keyalias, ',');
SET nullkeys = RTRIM(nullkeys, ',');
SET key_join_cond = RTRIM(key_join_cond, ' AND ');
SET inner_key_join_cond = RTRIM(inner_key_join_cond, ' AND ');
-- generate comparison statements for non-key columns
FOR non_key_column in (
SELECT
column_name,
data_type
FROM
scd2_table_columns
WHERE
column_name not in (
select
*
from
unnest(split(primary_keys))
)
AND column_name not in (start_date_column, end_date_column)
ORDER BY
ordinal_position
) DO
SET nullvalue = numeric_nullvalue;
IF non_key_column.data_type = 'STRING' THEN
SET nullvalue = string_nullvalue;
ELSEIF non_key_column.data_type in ('DATE', 'DATETIME', 'TIMESTAMP') THEN
SET nullvalue = concat(non_key_column.data_type, ' ', date_nullvalue);
END IF;
SET nonkey_join_cond = concat(nonkey_join_cond, 'IFNULL(T.', non_key_column.column_name, ',', nullvalue, ') <> IFNULL(S.', non_key_column.column_name, ',', nullvalue, ') OR ');
END FOR;
SET nonkey_join_cond = RTRIM(nonkey_join_cond, ' OR ');
SET nonkey_join_cond = concat('(', nonkey_join_cond, ')');
SET nonkey_join_cond_w_end_date = concat(nonkey_join_cond, ' AND T.', end_date_column,' = ', date_column_type, ' ', new_record_end_date);
-- combine all the statement fragments into the final merge statement
SET merge_statement = concat('MERGE INTO `', target_table_name_full, '` T ',
'USING (',
' SELECT ', keyalias, ', * FROM `', source_table_name_full, '` S ', filter_clause, ' ',
' UNION ALL',
' SELECT ', nullkeys, ',S.* FROM `', source_table_name_full, '` S ',
' JOIN `', target_table_name_full,'` T',
' ON ', inner_key_join_cond, ' AND (', nonkey_join_cond_w_end_date, ') ',
' ', filter_clause, '',
') S ',
'ON ', key_join_cond, ' ',
'WHEN MATCHED AND (', nonkey_join_cond_w_end_date, ') THEN UPDATE ',
'SET ', end_date_column, ' = CURRENT_', date_column_type, '() ', update_clause, ' ',
'WHEN NOT MATCHED THEN ',
' INSERT (', column_list, ', ', start_date_column, ', ', end_date_column, ')',
' VALUES (', column_list, ', CURRENT_', date_column_type, '(), ', date_column_type, ' ', new_record_end_date, ')'
);
EXECUTE IMMEDIATE merge_statement;
END;