Part II or Stale data detection with dbt and Redshift metadata

--

In an ideal world, your SaaS provider, e.g. Xero allows you to export your data or has some systems in place that push data to your preferred storage, e.g. to your data warehouse. In the real world, the shared data might be outdated which could impact the quality of data analysis, and hence, the quality of your decisions. And this is where data observability comes into the picture.

Data observability is an organization’s ability to fully understand the health of the data in their systems to ensure little or no anomalous data is fresh and mapped to the source data.

Source: https://www.moderndatastack.xyz/category/data-quality-monitoring

We would just take the ‘update’ column and add freshness tests with dbt. Great, we are sorted, right? Well, no. The challenge with off-the-shelf SaaS applications is that it is not always possible to add a column to the raw data to indicate when the data was pushed to your systems.

To overcome this challenge, I looked into the metadata tables as they often have the “creation / modified date”. In Part I or “State data detection with dbt and BigQuery dataset metadata”. I already checked BigQuery and dbt. What makes Redshift different from BigQuery is that pg_class_info doesn’t contain “last modified time” of the data warehouse tables. As for the system catalog tables, they are maintained only for 2–5 days. AWS recommends “to periodically copy it to other tables or unload it to Amazon S3”. Then, you could import these files from s3 into a permanent history table in your data warehouse and use it to identify the date when the imported table was modified last time. If you are using AWS services, check out “Export RedShift System Tables And Views To S3” on how to export Redshift System tables to S3 using the stored procedures. The article also contains other links on how to complete the task with AWS Lambda and AWS Glue.

In my case, as dbt is a preferred ELT tool, I decided to use it to extract the previous day's records from the query and insert tables and append them to the existing history tables. If you don’t know what dbt is, dbt or data build tool is an Extract Load Transform (ELT) tool developed by a consulting company Fishtown Analytics, currently named dbt Labs. The company has two product offerings: dbt Core, an open-source command-line interface (CLI) and dbt Cloud. For simplicity, I would be using dbt Cloud.

Getting ready

  1. There are already plenty of great tutorials on the Internet on how to set up dbt with Redshift:
    — Quickstart for dbt Cloud and Redshift — https://docs.getdbt.com/docs/quickstarts/dbt-cloud/redshift
    — dbt CLI and Amazon Redshift workshop— https://catalog.workshops.aws/dbt-cli-and-amazon-redshift/en-US/introduction

How to do it…

  1. Create a new file, e.g. stl_insert_history.sql under models/ and paste the following code. In this code we extract all insert/delete/create records populated in the stl_insert table as of yesterday and append them to the table stl_insert_history in the schema system_tables. Incremental models are one of four materialization options in dbt where “first time a model is run, the table is built by transforming all rows of source data. On subsequent runs, dbt transforms only the rows in your source data that you tell dbt to filter for, inserting them into the target table which is the table that has already been built. (dbt)” During the first run, the table will be built with all rows in the table, however, as we don’t specify a key column, dbt will simpler append new data to the existing table.
{{ config(materialized = 'incremental', alias = 'stl_insert_history', schema = 'system_tables', tags = ["daily"]) }}

select *,
CAST('{{run_started_at.strftime ("%Y-%m-%d %H:%M:%S") }}' AS TIMESTAMP) as dbt_at
from stl_insert
where starttime::date = current_date-1

2. Create a new file, e.g. stl_query_history.sql under models/ and paste the following code. Like in the case with stl_insert, we extract all queries that were run yesterday and append them to the table stl_query_history in the schema system_tables — you could extract the entire table but the size of your table will grow exponentially with the increase of the usage of the data warehouse.

{{ config(materialized = 'incremental',
alias = 'stl_query_history', schema = 'system_tables', tags = ["daily"]) }}

select *,
CAST('{{run_started_at.strftime ("%Y-%m-%d %H:%M:%S") }}' AS TIMESTAMP) as dbt_at
from stl_query
where starttime::date = current_date-1

If you run this query in your data warehouse, you would get the list of tables in your data warehouse and the last update date — provided that your tables got updated/created after you started capturing the history.

WITH max_query_time
AS (SELECT Max(query) AS query,
tbl,
Max(endtime) AS last_insert
FROM system_tables.stl_insert_history
GROUP BY tbl),
max_query_history
AS (SELECT query,
querytxt,
Max(endtime) AS endtime
FROM system_tables.stl_query_history
GROUP BY query,
querytxt),
final
AS (SELECT sti.SCHEMA,
sti.table,
max_query_history.endtime,
max_query_history.querytxt
FROM max_query_time
JOIN max_query_history
ON max_query_history.query = max_query_time.query
JOIN svv_table_info sti
ON sti.table_id = max_query_time.tbl)
SELECT *
FROM final

3. Unlike the BigQuery solution, this test .sql test macro should be defined only once in macros/test_freshness.sql — the solution was inspired by Johann De Wet.

{% test freshness_metadata(model, sch, tbl) %}

WITH max_query_time
AS (SELECT Max(query) AS query,
tbl,
Max(endtime) AS last_insert
FROM system_tables.stl_insert_history
GROUP BY tbl),
max_query_history
AS (SELECT query,
querytxt,
Max(endtime) AS endtime
FROM system_tables.stl_query_history
GROUP BY query,
querytxt),
final
AS (SELECT sti.SCHEMA,
sti.table,
max_query_history.endtime
FROM max_query_time
JOIN max_query_history
ON max_query_history.query = max_query_time.query
JOIN svv_table_info sti
ON sti.table_id = max_query_time.tbl
where sti.table = '{{ tbl }}' and sti.SCHEMA = '{{ sch }}')
select datediff(day, date(endtime), current_Date) as days_since_refresh

from final where datediff(day, date(endtime), current_Date) >= 1

{% endtest %}

4. Data source properties are defined in .yml file(s) nested under models/<filename>.yml. Specify the arguments — the table name and schema below the test name.

version: 2

sources:
- name: ticket
schema: public
description: Raw ticket data
tables:
- name: sales
description: The table contains raw data on ticket sales
tests:
- freshness_metadata:
tbl: sales
sch: public
- name: users
description: The table contains raw data on users
tests:
- freshness_metadata:
tbl: users
sch: public

5. Once you are done, run dbt test in your command line. If your data sources were updated less than one day ago, the test will be passed i.e. there will be no results. If a row gets returned by the query, that indicates a failure. sales data was updated today while users' data was updated more than one day ago which was indicated by the test results.

Things to keep in mind:

(1) If you get the permissions error, check the rights that the database user you are using with dbt has. Review https://discourse.getdbt.com/t/the-exact-privileges-we-grant-to-set-up-redshift/2909 as it contains the roles and groups that a dbt user should have to be able to query and modify the existing tables.

(2) If needed, you might be required to grant your user access to the system tables withalter user yourUser syslog access unrestricted;

(3) I am not entirely positive about the table svv_table_info. You might need to stash it in a historical table, too.

(4) Don’t forget to schedule the system tables retrieval.

--

--