Stale data detection with dbt and BigQuery dataset metadata

--

See Part II or last modified in Redshift at https://eponkratova.medium.com/part-ii-or-stale-data-detection-with-dbt-and-redshift-metadata-f503f91620d

As you journey deeper in data maturity, you likely get more serious about the health of your data. One of the multiple ways to measure the health of your data is via analyzing and evaluating data freshness or, to put it simpler, checking how recent the data is.

Source: https://www.moderndatastack.xyz/category/data-quality-monitoring

You could set up native dbt freshness tests to test any of the tables for freshness by adding the freshness block to your source.yml file.

tables:
— name: tableName
description: tableDesc
freshness: # default freshness
warn_after: {count: 10, period: day}
error_after: {count: 10, period: day}
loaded_at_field: report_timestamp # by default dbt expects UTC “convert_timezone(‘America/Los_Angeles’, ‘UTS’, column)”

The issue that I encountered was that the data that we get pushed from a SaaS provider, didn’t have an update column, and as it is an off-the-shelf SaaS application, we didn’t have control over new columns, such as ‘last modified date’ to be added/removed. Hence, we could not use ‘freshness tests’ just because we didn’t have the ‘last modified at’ column in a dataset.

For the solution, we turned to the BigQuery table metadata and set up generic tests — which are not that generic — in dbt.

Getting ready

  1. If you have not connected your dbt to BigQuery, please refer to https://docs.getdbt.com/docs/get-started/getting-started/getting-set-up/setting-up-bigquery for details on how to set up the connection.
  2. For the exercise's sake, I would be using a public dataset. Read more on how to access BigQuery public datasets at https://www.optimizesmart.com/how-to-access-bigquery-public-data-sets/

How to do it…

  1. One file was added to macros/test_freshness.sql with the following snippet of code:
--freshness test for table bikeshare_stations
{% test freshness_bikeshare_stations(model, column_name) %}

with source_output as (
select
TIMESTAMP_MILLIS(last_modified_time) as last_modified_time
from {{ model }}
where table_id = 'bikeshare_stations'
)
select SAFE_CAST(DATE_DIFF (current_Date,date(last_modified_time), DAY) as integer) as days_since_refresh
from source_output where SAFE_CAST(DATE_DIFF (current_Date,date(last_modified_time), DAY) as integer) >= 1

{% endtest %}

--freshness test for table bikeshare_trips
{% test freshness_bikeshare_trips(model, column_name) %}

with source_output as (
select
TIMESTAMP_MILLIS(last_modified_time) as last_modified_time
from {{ model }}
where table_id = 'bikeshare_trips'
)
select SAFE_CAST(DATE_DIFF (current_Date,date(last_modified_time), DAY) as integer) as days_since_refresh
from source_output where SAFE_CAST(DATE_DIFF (current_Date,date(last_modified_time), DAY) as integer) >=1

{% endtest %}s

2. Data source properties are defined in .yml file(s) nested under models/<filename>.yml. In my cases, I’ve defined the freshness tests on the ‘last_modified_time’ column under sources >> tables.__TABLES__ in the models/sources.yml file.

version: 2

sources:
- name: public
schema: austin_bikeshare
database: bigquery-public-data
description: Raw bikesharedata
tables:
- name: __TABLES__
description: metadata
columns:
- name: last_modified_time
tests:
- freshness_bikeshare_stations
- freshness_bikeshare_trips

3. Once you are done, run dbt test in your command line. If your data sources were updated less than one day ago, the test will be passed i.e. there will be no results. If a row gets returned by the query, that indicates a failure.

Edits: Following the suggestions by Johann De Wet, instead of adding a test to each table, you could create the following macro:

{% test freshness_metadata(database, schema, model, column_name) %}

with source_output as (
select
TIMESTAMP_MILLIS({{ column_name }}) as last_modified_time
from {{ database}}.{{ schema }}.__TABLES__
where table_id = {{ model }}
)
select SAFE_CAST(DATE_DIFF (current_Date,date({{ column_name }}), DAY) as integer) as days_since_refresh
from source_output where SAFE_CAST(DATE_DIFF (current_Date,date({{ column_name }}), DAY) as integer) >= 1

{% endtest %}

And then, update your source.yml with:

version: 2

sources:
- name: public
schema: austin_bikeshare
database: bigquery-public-data
description: Raw bikesharedata
tables:
- name: bikeshare_stations
description: xyz
columns:
- name: last_modified_time
tests:
- freshness_metadata
- name: bikeshare_trips
description: xyz
columns:
- name: last_modified_time
tests:
- freshness_metadata

Once you run it,

How it works…

There are two tables in BigQuery — INFORMATION_SCHEMA and __TABLES__ that you could use to explore dataset metadata. However, the last modified date for all tables in a dataset is stored in _TABLES__.

--

--