Is anybody loading data in fact and dimension tables the pure-Python way?

--

The objective of the blog is to implement Slowly Changing Dimensions Type 2 (SCD2) and fact tables with a lookup to an SCD2 using Redshift Spectrum as a data warehouse and pygrametl, a Python ETL framework.

Introduction.

Let me give you an example. You report on sales figures by an employee and store. You’ve just relocated the employee to another shop and updated the transactional system. The data warehouse that you use for your reporting got updated as well with new sales made by that employee appearing under the new store. However, old sales by that employee are now pointing to the new store, instead of the original store where the sale was made which creates a problem for you because now the sales figures by store got changed.

What is Slowly Changing Dimensions (SCD)?

Ralph Kimball introduced Slowly Changing Dimensions in 1995 to track the history of updates to dimension records. You can specify different types of slowly changing dimensions for each dimension table attribute.

Type 0: Retain Original where original values are retained regardless of updates to dimension records.
Type 1: Overwrite where the the dimension table attribute will be updated with the most recent data.
Type 2: Add New Row. When the field value is changed, the ETL system will maintain the original value and create an additional record in the dimension table with the new value. To get the relevant employee-store record, you will need to join the fact table to the dimension table using the natural key and filter on a sale order date between the effective and expiration dates from the dimension table.

There are multiple tools available for loading a Slowly Changing Dimension of type 2 in the data warehouse, for example, Pentaho and Talend. You could opt for a pure-SQL or pure-Python approach, too, or use already available Python packages — see below — with some libraries supporting a specific database management system (DBMS)/specific type of Slowly Changing Dimensions. For this project, I’ll be using pygrametl because (1) it allows building different types of fact and dimension tables; (2) one can load data to the data warehouse using a database connection, a CSV, Pandas dataframe, among others; and (3) it has great documentation.

Setup

  1. I will be using Redshift serverless — read more on setting up Redshift at Getting started — with the following role created:
create group load;
create user analytics_user password ‘YourPassword123’ in group load;
grant create on database dev to group load;
grant select on all tables in schema information_schema to group load;
grant select on all tables in schema pg_catalog to group load;
grant select on all tables in schema public to group load;

2. As it was just a test run, I ran the code on my local machine. Moreover, to simplify testing, I will type values to create a Pandas dataframe. In the first part of the code, we create a SlowlyChangingDimension object to represent a dimension table in the data warehouse with the following attributes: employee ID, name, gender, store ID, validity dates (valid_from and valid_to), and version number. Then, the loop iterates through each row in the source dataframe. If there are any changes or updates to an existing row, they will be tracked by changing the record validity dates and version number.

def fact_sales_order(dw_conn_wrapper):
"""The function creates the fact table"""
source = PandasSource(employee_df)

employees_dim = SlowlyChangingDimension(
name = 'dim_emp', # name of the dimensions table in the data warehouse
key = 'emp_sk', # name of the primary key
attributes = ['emp_id', 'emp_name', 'emp_gender', 'store_id', 'valid_from', 'valid_to', 'version'], #: a sequence of the attribute names in the dimension table.
# Should not include the name of the primary key which is given in the key argument.
lookupatts = ['emp_id'], # a sequence with a subset of the attributes that uniquely identify a dimension members.
fromatt = 'valid_from', # the name of the attribute telling from when the version becomes valid. Default: None
toatt = 'valid_to',
versionatt = 'version') # the name of the attribute telling until when the version is valid. Default: None

for row in source:
employees_dim.scdensure(row)
# Specify an optional value to return when a lookup fails
employees_dim.defaultidvalue = 0
dw_conn_wrapper.commit()

if __name__ == '__main__':
destDatabase = redshift_connector.connect(host='',
database='',
port=,
user='',
password='')
dw_conn_wrapper = pygrametl.ConnectionWrapper(connection = destDatabase)
dw_conn_wrapper.setasdefault()
main()

In the 2nd part of the code, we create a fact table named ‘fact_sales_order’, with three arguments — name (name of the fact table), keyrefs (keys in the fact table), and measures (facts one wants to keep track of). The code then populates the fact table with data from a Pandas dataframe, using an employees dimension table to look up the employee id (emp_sk) based on the order date.

def fact_sales_order(dw_conn_wrapper):
"""The function creates the fact table"""
source = PandasSource(sales_order_df)
fact_table = FactTable(
name = 'fact_sales_order', # name of the fact table in the data warehouse
keyrefs = ['ord_id', 'emp_sk'], # foreign keys in the fact table)
measures = ['unit_sold', 'tot_revenue', 'ord_date']) #facts one wants to keep track of
# Specify an optional value to return when a lookup fails
fact_table.defaultidvalue = 0

for row in source:
row['emp_sk'] = employees_dim.lookupasof(row, row['ord_date'], (True, True), {'emp_id':'emp_id'})
fact_table.ensure(row, False, {'ord_id': 'ord_id'})

dw_conn_wrapper.commit()

def main():
fact_sales_order(dw_conn_wrapper)

if __name__ == '__main__':
destDatabase = redshift_connector.connect(host='',
database='',
port=,
user='',
password='')
dw_conn_wrapper = pygrametl.ConnectionWrapper(connection = destDatabase)
dw_conn_wrapper.setasdefault()
main()

Putting everything together — you can find the complete code in the GitHub repository.

#importing libraries
import psycopg2
import pygrametl
from pygrametl.tables import Dimension, FactTable, SlowlyChangingDimension
from pygrametl.datasources import PandasSource
import pandas as pd
import redshift_connector
from datetime import datetime, timedelta

def fact_sales_order(dw_conn_wrapper):
"""The function creates the fact table"""
source = PandasSource(employee_df)

employees_dim = SlowlyChangingDimension(
name = 'dim_emp', # name of the dimensions table in the data warehouse
key = 'emp_sk', # name of the primary key
attributes = ['emp_id', 'emp_name', 'emp_gender', 'store_id', 'valid_from', 'valid_to', 'version'], #: a sequence of the attribute names in the dimension table.
# Should not include the name of the primary key which is given in the key argument.
lookupatts = ['emp_id'], # a sequence with a subset of the attributes that uniquely identify a dimension members.
fromatt = 'valid_from', # the name of the attribute telling from when the version becomes valid. Default: None
toatt = 'valid_to',
versionatt = 'version') # the name of the attribute telling until when the version is valid. Default: None

for row in source:
employees_dim.scdensure(row)
# Specify an optional value to return when a lookup fails
employees_dim.defaultidvalue = 0
dw_conn_wrapper.commit()

source = PandasSource(sales_order_df)
fact_table = FactTable(
name = 'fact_sales_order', # name of the fact table in the data warehouse
keyrefs = ['ord_id', 'emp_sk'], # foreign keys in the fact table)
measures = ['unit_sold', 'tot_revenue', 'ord_date']) #facts one wants to keep track of
# Specify an optional value to return when a lookup fails
fact_table.defaultidvalue = 0

for row in source:
row['emp_sk'] = employees_dim.lookupasof(row, row['ord_date'], (True, True), {'emp_id':'emp_id'})
fact_table.ensure(row, False, {'ord_id': 'ord_id'})

dw_conn_wrapper.commit()

def main():
fact_sales_order(dw_conn_wrapper)

if __name__ == '__main__':
destDatabase = redshift_connector.connect(host='',
database='',
port=,
user='',
password='')
dw_conn_wrapper = pygrametl.ConnectionWrapper(connection = destDatabase)
dw_conn_wrapper.setasdefault()
main()

Testing

Originally Krisin, allocated to store with store_id = 2 made a sale (ord_id = 2) on 2024–01–11 (run script 1). Later that day, Kristin got transferred to a new store (store_id = 3), hence a new row was inserted in the table dim_emp to reflect the new store attribute value while the previous record was ‘invalidated’ with valid_to populated as of 2024–01–11 (run script 2).

Prior to the change to the employee record
After the update to the employee record

After January 11, new sales orders for Kristin would have employee key 3 to reflect the store change for Kristin (see the example of ord_id = 4). The fact table, however, remained untouched with ord_id = 2 still allocated to store_id = 2 (run script 3).

New sale is made after the update to the employee record

To reiterate differences in the implementation, if store_id attribute is a type 0, the sales would be reported for store_id = 2. If store_id is a type 1, both orders (ord_id = 2 and ord_id = 4) would be under store_id = 3.

Keep in mind, though that with this approach, the code will iterate through each row in the dataframe and use the “scdensure” function to ensure that all rows in the employee’s dimension table are up-to-date with any changes made in the source system which is fine on a small table but might cause the performance issues when performed on large tables.

Thanks for reading :)
Follow me on Medium for more such content.

--

--