Skip to content

Commit f789ab0

Browse files
authored
Merge pull request #477 from VanOord/fix/add-columns
Fix for adding columns to snapshots
2 parents 3e6457c + 2c5f558 commit f789ab0

File tree

2 files changed

+162
-2
lines changed

2 files changed

+162
-2
lines changed

dbt/include/sqlserver/macros/materializations/snapshots/snapshot.sql

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ SQL Server doesnt support this, so we use the 'SELECT * INTO XYZ FROM ABC' synta
1010

1111
{% set columns %}
1212
{% for column in columns %}
13-
, CAST(NULL AS {{column.data_type}}) AS {{column_name}}
13+
, CAST(NULL AS {{column.data_type}}) AS {{ column.quoted }}
1414
{% endfor %}
1515
{% endset %}
1616

@@ -19,7 +19,7 @@ SQL Server doesnt support this, so we use the 'SELECT * INTO XYZ FROM ABC' synta
1919
{% endset %}
2020

2121
{% set tempTable %}
22-
SELECT * INTO {{tempTableName}} {{columns}} FROM [{{relation.database}}].[{{ relation.schema }}].[{{ relation.identifier }}] {{ information_schema_hints() }}
22+
SELECT * {{columns}} INTO {{tempTableName}} FROM [{{relation.database}}].[{{ relation.schema }}].[{{ relation.identifier }}] {{ information_schema_hints() }}
2323
{% endset %}
2424

2525
{% call statement('create_temp_table') -%}
+160
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
from typing import Iterable
2+
3+
from dbt.tests.adapter.simple_snapshot.test_snapshot import BaseSimpleSnapshot, BaseSnapshotCheck
4+
from dbt.tests.fixtures.project import TestProjInfo as ProjInfo
5+
from dbt.tests.util import relation_from_name, run_dbt
6+
7+
8+
def clone_table(project: ProjInfo, to_table: str, from_table: str, select: str, where: str = None):
9+
"""
10+
Creates a new table based on another table in a dbt project
11+
12+
Args:
13+
project: the dbt project that contains the table
14+
to_table: the name of the table, without a schema, to be created
15+
from_table: the name of the table, without a schema, to be cloned
16+
select: the selection clause to apply on `from_table`; defaults to all columns (*)
17+
where: the where clause to apply on `from_table`, if any; defaults to all records
18+
"""
19+
print(project)
20+
to_table_name = relation_from_name(project.adapter, to_table)
21+
from_table_name = relation_from_name(project.adapter, from_table)
22+
select_clause = select or "*"
23+
where_clause = where or "1 = 1"
24+
sql = f"drop table if exists {to_table_name}"
25+
project.run_sql(sql)
26+
sql = f"""
27+
select {select_clause}
28+
into {to_table_name}
29+
from {from_table_name}
30+
where {where_clause}
31+
"""
32+
project.run_sql(sql)
33+
34+
35+
def add_column(project: ProjInfo, table: str, column: str, definition: str):
36+
"""
37+
Applies updates to a table in a dbt project
38+
39+
Args:
40+
project: the dbt project that contains the table
41+
table: the name of the table without a schema
42+
column: the name of the new column
43+
definition: the definition of the new column, e.g. 'varchar(20) default null'
44+
"""
45+
# BigQuery doesn't like 'varchar' in the definition
46+
if project.adapter.type() == "bigquery" and "varchar" in definition.lower():
47+
definition = "string"
48+
table_name = relation_from_name(project.adapter, table)
49+
sql = f"""
50+
alter table {table_name}
51+
add {column} {definition}
52+
"""
53+
project.run_sql(sql)
54+
55+
56+
class SnapshotSQLServer:
57+
def _assert_results(
58+
self,
59+
ids_with_current_snapshot_records: Iterable,
60+
ids_with_closed_out_snapshot_records: Iterable,
61+
):
62+
"""
63+
All test cases are checked by considering whether a source record's id has a value
64+
in `dbt_valid_to` in `snapshot`. Each id can fall into one of the following cases:
65+
66+
- The id has only one record in `snapshot`; it has a value in `dbt_valid_to`
67+
- the record was hard deleted in the source
68+
- The id has only one record in `snapshot`; it does not have a value in `dbt_valid_to`
69+
- the record was not updated in the source
70+
- the record was updated in the source, but not in a way that is tracked
71+
(e.g. via `strategy='check'`)
72+
- The id has two records in `snapshot`; one has a value in `dbt_valid_to`,
73+
the other does not
74+
- the record was altered in the source in a way that is tracked
75+
- the record was hard deleted and revived
76+
77+
Note: Because of the third scenario, ids may show up in both arguments of this method.
78+
79+
Args:
80+
ids_with_current_snapshot_records: a list/set/etc. of ids which aren't end-dated
81+
ids_with_closed_out_snapshot_records: a list/set/etc. of ids which are end-dated
82+
"""
83+
records = set(
84+
self.get_snapshot_records(
85+
"""id, CASE WHEN dbt_valid_to is null then cast(1 as bit)
86+
ELSE CAST(0 as bit) END as is_current"""
87+
)
88+
)
89+
expected_records = set().union(
90+
{(i, True) for i in ids_with_current_snapshot_records},
91+
{(i, False) for i in ids_with_closed_out_snapshot_records},
92+
)
93+
for record in records:
94+
assert record in expected_records
95+
96+
def create_fact_from_seed(self, where: str = None): # type: ignore
97+
# overwrite clone table
98+
clone_table(self.project, "fact", "seed", "*", where)
99+
100+
def add_fact_column(self, column: str = None, definition: str = None):
101+
add_column(self.project, "fact", column, definition)
102+
103+
def test_column_selection_is_reflected_in_snapshot(self, project):
104+
"""
105+
Update the first 10 records on a non-tracked column.
106+
Update the middle 10 records on a tracked column.
107+
(hence records 6-10 are updated on both)
108+
Show that all ids are current, and only the tracked column updates are reflected in
109+
`snapshot`.
110+
"""
111+
self.update_fact_records(
112+
{"last_name": "left(last_name, 3)"}, "id between 1 and 10"
113+
) # not tracked
114+
self.update_fact_records({"email": "left(email, 3)"}, "id between 6 and 15") # tracked
115+
run_dbt(["snapshot"])
116+
self._assert_results(
117+
ids_with_current_snapshot_records=range(1, 21),
118+
ids_with_closed_out_snapshot_records=range(6, 16),
119+
)
120+
121+
def test_updates_are_captured_by_snapshot(self, project):
122+
"""
123+
Update the last 5 records. Show that all ids are current, but the last 5 reflect updates.
124+
"""
125+
self.update_fact_records(
126+
{"updated_at": "DATEADD(day, 1, [updated_at])"}, "id between 16 and 20"
127+
)
128+
run_dbt(["snapshot"])
129+
self._assert_results(
130+
ids_with_current_snapshot_records=range(1, 21),
131+
ids_with_closed_out_snapshot_records=range(16, 21),
132+
)
133+
134+
def test_new_column_captured_by_snapshot(self, project):
135+
"""
136+
Add a column to `fact` and populate the last 10 records with a non-null value.
137+
Show that all ids are current, but the last 10 reflect updates and the first 10 don't
138+
i.e. if the column is added, but not updated, the record doesn't reflect that it's updated
139+
"""
140+
self.add_fact_column("full_name", "varchar(200) default null")
141+
self.update_fact_records(
142+
{
143+
"full_name": "first_name + ' ' + last_name",
144+
"updated_at": "DATEADD(day, 1, [updated_at])",
145+
},
146+
"id between 11 and 20",
147+
)
148+
run_dbt(["snapshot"])
149+
self._assert_results(
150+
ids_with_current_snapshot_records=range(1, 21),
151+
ids_with_closed_out_snapshot_records=range(11, 21),
152+
)
153+
154+
155+
class TestSnapshotSQLServer(SnapshotSQLServer, BaseSimpleSnapshot):
156+
pass
157+
158+
159+
class TestSnapshotCheckSQLServer(SnapshotSQLServer, BaseSnapshotCheck):
160+
pass

0 commit comments

Comments
 (0)