Airflow is not an ETL tool…
Introduction
In this post, I will look into three data orchestration tools — Mage, Kestra, and Dagster for orchestrating a job that extracts data from WeatherAPI, does the light-touch transformation, and loads it into a DuckDB database.
You can find the original code that I use to adapt to a particular tool here.
Airflow is not an ETL tool or what is a data orchestration?
I remember being confused by the phrase “Airflow is not an ETL tool…” — you can replace Airflow with any data orchestrator — and the reason I was confused is because of the PythonOperator that allows you to execute Python functions on the same machine where Airflow is installed on. The fact that some examples of DAGs available on the Internet placed these Python functions in the same DAG configuration file that contains workflow definitions didn’t make it more understandable.
“Data Orchestration involves using different tools and technologies together to move data between systems.” Right. But then, do not data integration tools also move data between systems? One of the most commonly listed benefits across articles titled ‘Data orchestration and why it is so crucial for data engineering’ is that data orchestration tools allow running complex data pipelines with several tasks and dependencies between them at the right time and in the right order.
Are you interested in running tasks in sequence? Sure. Maybe you meant to execute tasks in parallel, instead of in sequence? Sure. Do you need to run some or all tasks in a parallel group in a sequence? Of course.
Another benefit that differentiates data orchestrator tools from data integration tools is that data orchestration tools have more sophisticated in-built functionality for error handling, whether it is canceling operation altogether, continuing to process the orchestration after the error occurred, or canceling the orchestration and invoking an alternate orchestration or notification.
My orchestration was simple: to execute the steps in my data pipeline in sequence, on a schedule. The review of the error-handling functionalities of each tool was also outside of scope.
Data orchestrators universe
The matrix below was compiled using the articles ‘Data orchestration and why it is so crucial for data engineering’ on the Internet where the tools were mapped against the type of the software license — Open source vs Commerical (closed) — and the tool focus — generic data pipeline or domain-specific i.e. machine learning pipelines. I would also suggest reviewing https://github.com/pditommaso/awesome-pipeline#pipeline-frameworks--libraries that contain a wider list of Open-source workflow tools.
I’ve chosen to compare Mage, Kestra, and Dagster for this project. All three tools are open-source tools with Kestra and Dagster having a commercial offer; all three tools are generic data orchestrators.
Data orchestrators >> Mage
Mage is an open-source, hybrid framework for transforming and integrating data.
— Mage pipelines (blocks) are defined in Python, R, or SQL and allow extracting JSON-formatted data from databases, APIs, and file formats using Singer taps — you can read more on Singer taps here. Some of the available data source integrations are Amazon S3, API, Azure Blob Storage, BigQuery, DynamoDB, MongoDB, MySQL, PostgreSQL, Redshift (Amazon Web Services), and Snowflake. Some of the available data destination integrations are Amazon S3, BigQuery, MongoDB, MySQL, PostgreSQL, and Snowflake.
— One can deploy Mage with Docker or pip/conda to deploy and launch mage webserver/UI from the command line. As I didn’t want to go through the hassle of installing packages, libraries, and other dependencies required to deploy a mage project, I used the Docker compose.
— Mage’s orchestration can be triggered with scheduling, manually, programmatically (when the Mage API call is made), and AWS events.
(1) Start a container with:
git clone https://github.com/mage-ai/compose-quickstart.git mage-quickstart
cd mage-quickstart
cp dev.env .env
rm dev.env
docker compose up
(2) Open the front end by visiting http://localhost:6789.
(3) To create a new batch pipeline, go to ‘Pipelines’ >> ‘Standard (Batch)’.
(4) As I wanted to test the Secrets functionality, I’ve added key-value secrets using the Secrets UI.
(5) Added the step ‘Data loader’ >> ‘Python’ >> ‘API’ where I pull weather data from the weather API — you can find the extraction script here.
(6) Added the step ‘Data exported’ >> ‘Python’ >> ‘Generic (no template)’ where I store the data that I’ve received from the previous step to the in-memory DuckDB — you can find the loading script here.
(7) To trigger the pipeline, click ‘Triggers’ >> ‘New Trigger’ and select one of the options listed above.
Overall, I appreciated that (1) one can create reusable functions that can be applied to different data sources and destinations; (2) no knowledge of specific frameworks is required, providing that one knows some Python/R/SQL, he/she can just start creating pipelines right away; (3) one can create/edit pipelines using the GUI.
Data orchestrators >> Kestra
Kestra is an open-source, event-driven orchestrator.
— Flows are defined in a declarative YAML syntax.
— You can set up and manage your Kestra environment using Docker (open source). Alternatively, you can get the Enterprise edition and let Kestra manage it for you.
— Kestra allows you to execute your pipeline on a regular cadence (scheduler trigger); ‘continue-on-success’ basis (flow trigger); and based on an HTTP request emitted by a webhook (webhook trigger). Apart from it, Kestra has event-based triggers such as triggers based on file detection events or a new message arrival in a message queue.
(1) Start a container with:
docker run — pull=always — rm -it -p 8080:8080 — user=root -v /var/run/docker.sock:/var/run/docker.sock -v /tmp:/tmp kestra/kestra:develop-full server local
(2) Open the front end by visiting http://localhost:8080 .
(3) To create a new batch pipeline, go to ‘Flows’ >> ‘Create’. Each flow should contain three required components: id
, a unique identifier within a namespace; a namespace
used to group flows; and tasks
, a list of tasks that will be executed in the order they are defined.
If you have not defined pipelines through a .yaml file, Kestra with its yaml-based configuration might be quite a challenge for you. To overcome it, though, Kestra has a collection of pre-built blueprints for the common challenges and scenarios. Some of the existing blueprints are:
- ‘Extract a CSV file via HTTP API and upload it to S3 by using the scheduled date as a filename’,
- ‘Schedule a Python data ingestion job to extract data from an API and load it to DuckDB using dltHub (data load tool)’,
- ‘Scheduled data ingestion to AWS S3 data lake managed by Apache Iceberg, AWS Glue, and Amazon Athena (inline Python script)’, etc.
And yes, I belong to the group of people who ‘have not defined pipelines through a .yml file’, so I am not entirely positive that the way I structured the tasks in the flow is the correct one.
What I liked in Kestra the most is how detailed and well-organized the data pipeline metadata is. You can also check the overall dashboard to get a high-level overview of your pipelines, and a logs page that shows a list of events that happened when executing your pipelines.
Data orchestrators >> Dagster
Dagster is an orchestration platform for the development, production, and observation of data assets.
— Dagster runs on Python and can be installed with pip/conda. Running dagster dev
after the installation will launch the Dagster webserver/UI and the Dagster daemon. Like in the case with Kestra, you can opt for fully managed Dagster Cloud.
— You can materialize your assets at a specific time and/or day (basic schedules); on schedules that provide custom run config and tags; on schedules from partitioned assets and jobs, and on customized execution times.
I’ve started with Dagster Cloud but gave up on it after not being able to deploy Dagster with GitHub Actions caused by the same/similar error about DuckDB: Error loading data-orchestration-dagster: {‘__typename’: ‘PythonError’, ‘message’: “dagster._core.errors.DagsterImportError: Encountered ImportError: `No module named ‘dagster_duckdb_polars’` while importing module quickstart_etl. Local modules were resolved using the working directory `/opt/dagster/app`
regardless of the changes in the Github Actions script: changing the Python version and adding shell script commands to install different packages.
After not a great start, I’ve decided to pull one of the Dagster examples and build my pipeline based on it.
(1) Installing dagster and project dependencies.
pip install dagster
dagster project from-example — name weatherapi — example quickstart_etl
pip install -e ".[dev]"
(2) To start the Dagster UI, run: dagster dev
.
(3) Open the front end by visiting http://localhost:3000.
(4) Add your asset script under ‘assets’. Each asset should include an @asset decorator to indicate to Dagster that the function produces an asset; an asset key that uniquely identifies the asset in Dagster, and a set of upstream assets dependencies referring to their asset keys.
└── data-orchestration-dagster/
└── quickstart_etl/
├── assets/
│ ├── weather_api.py
│ └── __init__.py
└── __init__
To run your assets, go to http://localhost:3000/assets >> ‘Reload definitions’; select all assets >> Materialize selected’.
I wish the GUI allowed you to do more activity there as you can only materialize your assets via the front end that you develop locally. Based on my limited review, as you do the development locally you might get dreadful ‘Import errors’ after you refresh definitions of your assets — Airflow users will understand.
Initially, I was under the impression that I would need to learn a completely different framework with Dagster but this is not the case. Dagster is run on Python, hence, providing that you have some Python knowledge, you should be able to create and manage your pipelines.
Because Dagster is one of the asset-based workflows, a concept foreign to those who are used to task-based workflow tools such as Airflow, you need to change the way you structure your code. To introduce you to a new way of organizing your workflows, Dagster has a vast knowledge base. However, like in the case with Kestra, I’m not entirely sure that I learned the specifics of the tool well and structured the code in the best way.
Summary
It has been a fun experiment to explore (1) Mage and plan for its deployment with AWS — more about this in the next post; (2) shift the perspective in regards to pipeline creation and do the .yaml-based configuration with Kestra and create the asset-centric workflow with Dagster.
You can see the difference in the implementation of the same weather script (https://github.com/eponkratova/data-orchestration/blob/master/weather_api.py) at:
— Mage — https://github.com/eponkratova/data-orchestration/tree/master/mage
— Kestra -https://github.com/eponkratova/data-orchestration/tree/master/kestra
— Dagster-https://github.com/eponkratova/data-orchestration/tree/master/dagster
Dagster was one of the most difficult tools to work with due to my lack of understanding of the difference between task-based and asset-based workflows because, for me, even task-based workflows produce a certain asset at the end of the day, they just don’t emphasize it. My current understanding is that asset-based workflows group together different tasks that are required to produce a certain asset — in the cookie example from the Dagster University, “To create the cookie dough, combine the wet ingredients and dry ingredients”. It might be interesting to spend more time with Dagster to understand the concept better.