Skip to content

Commit

Permalink
Implementation of metadata-based freshness (#694)
Browse files Browse the repository at this point in the history
* changelog

* turn on metadata-based source freshness

* add tests for metadata-based source freshness

* add metadata-based source freshness sql stub

* mark metadata-based source freshness as unsupported

* add test to show existing method is working

* turn on metadata-based source freshness

* add dbt-common to the dev-requirements so it comes from github

* add test case with no last modified date column

* add metadata-based source freshness query

* restrict to just insert steps
  • Loading branch information
mikealfare authored Feb 26, 2024
1 parent b79ced3 commit d77f5ee
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 4 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20231219-120533.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add support for checking table-last-modified by metadata
time: 2023-12-19T12:05:33.784649-05:00
custom:
Author: mikealfare
Issue: "615"
5 changes: 4 additions & 1 deletion dbt/adapters/redshift/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ class RedshiftAdapter(SQLAdapter):
}

_capabilities = CapabilityDict(
{Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full)}
{
Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full),
Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full),
}
)

@classmethod
Expand Down
29 changes: 29 additions & 0 deletions dbt/include/redshift/macros/metadata/relation_last_modified.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{% macro redshift__get_relation_last_modified(information_schema, relations) -%}

{%- call statement('last_modified', fetch_result=True) -%}
select
ns.nspname as "schema",
c.relname as identifier,
max(qd.start_time) as last_modified,
{{ current_timestamp() }} as snapshotted_at
from pg_class c
join pg_namespace ns
on ns.oid = c.relnamespace
join sys_query_detail qd
on qd.table_id = c.oid
where qd.step_name = 'insert'
and (
{%- for relation in relations -%}
(
upper(ns.nspname) = upper('{{ relation.schema }}')
and upper(c.relname) = upper('{{ relation.identifier }}')
)
{%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)
group by 1, 2, 4
{%- endcall -%}

{{ return(load_result('last_modified')) }}

{% endmacro %}
6 changes: 3 additions & 3 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# install latest changes in dbt-core + dbt-postgres
# TODO: how to switch from HEAD to x.y.latest branches after minor releases?
git+https://github.com/dbt-labs/dbt-postgres.git@main
git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core
git+https://github.com/dbt-labs/dbt-adapters.git
git+https://github.com/dbt-labs/dbt-adapters.git#subdirectory=dbt-tests-adapter
git+https://github.com/dbt-labs/dbt-common.git
git+https://github.com/dbt-labs/dbt-core.git#subdirectory=core
git+https://github.com/dbt-labs/dbt-postgres.git

# if version 1.x or greater -> pin to major version
# if version 0.x -> pin to minor
Expand Down
38 changes: 38 additions & 0 deletions tests/functional/adapter/sources_freshness_tests/files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
SCHEMA_YML = """version: 2
sources:
- name: test_source
freshness:
warn_after: {count: 10, period: hour}
error_after: {count: 1, period: day}
schema: "{{ env_var('DBT_GET_LAST_RELATION_TEST_SCHEMA') }}"
tables:
- name: test_source_no_last_modified
- name: test_source_last_modified
loaded_at_field: last_modified
"""

SEED_TEST_SOURCE_NO_LAST_MODIFIED_CSV = """
id,name
1,Martin
2,Jeter
3,Ruth
4,Gehrig
5,DiMaggio
6,Torre
7,Mantle
8,Berra
9,Maris
""".strip()

SEED_TEST_SOURCE_LAST_MODIFIED_CSV = """
id,name,last_modified
1,Martin,2023-01-01 00:00:00
2,Jeter,2023-02-01 00:00:00
3,Ruth,2023-03-01 00:00:00
4,Gehrig,2023-04-01 00:00:00
5,DiMaggio,2023-05-01 00:00:00
6,Torre,2023-06-01 00:00:00
7,Mantle,2023-07-01 00:00:00
8,Berra,2023-08-01 00:00:00
9,Maris,2023-09-01 00:00:00
""".strip()
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import os

from dbt.tests.util import run_dbt
import pytest

from tests.functional.adapter.sources_freshness_tests import files


class TestGetLastRelationModified:
@pytest.fixture(scope="class")
def seeds(self):
return {
"test_source_no_last_modified.csv": files.SEED_TEST_SOURCE_NO_LAST_MODIFIED_CSV,
"test_source_last_modified.csv": files.SEED_TEST_SOURCE_LAST_MODIFIED_CSV,
}

@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": files.SCHEMA_YML}

@pytest.fixture(scope="class", autouse=True)
def setup(self, project):
# we need the schema name for the sources section
os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] = project.test_schema
run_dbt(["seed"])
yield
del os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"]

@pytest.mark.parametrize(
"source,status,expect_pass",
[
("test_source.test_source_no_last_modified", "pass", True),
("test_source.test_source_last_modified", "error", False), # stale
],
)
def test_get_last_relation_modified(self, project, source, status, expect_pass):
results = run_dbt(
["source", "freshness", "--select", f"source:{source}"], expect_pass=expect_pass
)
assert len(results) == 1
result = results[0]
assert result.status == status

0 comments on commit d77f5ee

Please sign in to comment.