Upleveling Flyte’s Data Lineage Using Dolt
By: Max Hoffman (Software Engineer at Dolthub)
Dolt and Flyte joined forces to build two data integrations. Dolt is a SQL database that supports Git Versioning. Flyte is a workflow orchestrator for creating and evolving machine learning processes and mission-critical data. When combined, Dolt extends Flyte’s data lineage to storage layer versioning.
Flytekit release (v0.19.0) includes SQLAlchemy and Dolt plugins, two contributions that expand Flyte’s ability to integrate with relational databases. In this blog, we explain what Flyte is, what Dolt is, and how the two intersect to introduce a new way of thinking about reproducibility in machine learning workflows.
What is Flyte?
At its core, Flyte is an orchestrator responsible for quarterbacking the data and compute infrastructure of an enterprise. At a 10k foot view, Flyte is a Kubernetes (K8S) cluster that accepts, executes, and records machine learning and data processing workflows. Flyte can run everything from Python functions to Spark jobs, Sagemaker jobs, and map workflows with thousand-task fan-outs. Jobs are submitted as K8S custom resource definitions (CRD), YAML files that support arbitrary language SDK pluggability. Flyte also comes with a handful of tools to support admins and task monitoring.
Flyte has invested heavily into data reproducibility and versioning. Data dependencies are hashed and recorded as metadata. Task lineage is compiled and saved on or before the execution time. Shared tasks are cached to short-circuited workflows and optimize performance. Updates to tasks themselves are versioned and recorded. In a scenario where one wants to know the changes introduced in the data, Dolt’s data versioning can help enhance Flyte’s data lineage.
Flyte Data Typing
Flyte’s type system provides a backbone for reproducibility and versioning. Extra work upfront defining schemas comes to bear in the form of better data quality and metadata management when compiling and executing compute graphs. This comes full circle later with Dolt’s SQL versioning, the same principles applied at the row layer.
The example below, adapted from an XGBoost tutorial in the Flyte documentation shows the type system in action.
Without digging into the weeds too much, the input source (<span class=“code-inline”>dataset</span>) switches between two identities. In <span class=“code-inline”>my_workflow</span>, <span class=“code-inline”>dataset</span> is a static <span class=“code-inline”>FlyteFile</span> type:
At execution time, the orchestrator downloads this file, records an md5 hash and other metadata, and injects a Python path object into <span class=“code-inline”>my_task</span>:
Rigid typing is an interesting point of differentiation between Flyte and other workflow managers, and key for extending Flyte with data plugins like Dolt.
What is Dolt?
Dolt is a novel relational database with Git versioning. Dolt can do everything MySQL does, including act as a drop-in replacement for application databases. That is one of the reasons we added Flyte’s SQLAlchemy plugin. The select statement below performs the same whether a MySQL RDS or Dolt server is running on <span class=“code-inline”>uri</span>:
Dolt was designed from the ground up for versioning data. Dolt’s range of Git features includes cloning, branching, merging, and diffing databases. Dolt databases live in folders like git repositories, and all the <span class=“code-inline”>git</span> commands you know for version control work exactly the same on the command line in a dolt directory.
Dolt takes Flyte’s versioning one step further, letting users understand how data sources evolve over time. Two table versions can be diffed to see row-level changes. Branches can isolate experiments from others’ work. Data can be queried by commit version, timestamp, or branch name without leaving SQL. Recording execution IDs in dolt commits helps audit data changes.
How It Works
Using the Flyte-Dolt Plugin will require Flyte and Dolt dependencies. We will use a <span class=“code-inline”>flytesnacks</span> tutorial for this demo. To follow along, first clone <span class=“code-inline”>flytesnacks</span> and navigate to the dolt cookbook:
We will need several Python dependencies, including <span class=“code-inline”>flytekitplugins-dolt</span>:
We will also need to install <span class=“code-inline”>dolt</span>:
Our workflow reads from a <span class=“code-inline”>foo</span> database, which can be initialized locally:
And we are ready to get started! To test the plugin, run <span class=“code-inline”>quickstart_example.py</span>, a short workflow that creates, writes, and reads a dataframe:
This first workflow is comprised of two tasks: <span class=“code-inline”>populate_rabbits</span> and <span class=“code-inline”>unwrap_rabbits</span>:
<span class=“code-inline”>populate_rabbits</span> creates a dataframe and stores the table in Dolt. <span class=“code-inline”>unwrap_rabbits</span> does the exact opposite, reading the table from Dolt and returning a DataFrame. Our workflow combines these two steps:
It might appear that this workflow accomplishes little. Any Python script can attach a <span class=“code-inline”>data</span> attribute to an object and immediately read it back. Flyte’s backend and typing system perform a lot of work under the hood to save the data to Dolt via this configuration:
Running the workflow again with a different hyperparameter saves a different table:
which we can view by inspecting the <span class=“code-inline”>foo</span> database:
Diffing can be useful productivity or debugging tool. Histograms and distributional shifts help understand the difference between two datasets, but it is also useful to know what rowsare shared at a more granular level. Dolt’s structural sharing is the only way to get efficient row-level diffs of data.
A second example isolates workflow runs using Dolt branches.
The configuration here is more complicated. We will create new branches for different hyperparameter values:
In this example, we have <span class=“code-inline”>users</span> and <span class=“code-inline”>big_users</span> tables. The first and last config reference their respective <span class=“code-inline”>tablename</span>s. The middle config, <span class=“code-inline”>query_users</span>, is a read-only config that selects <span class=“code-inline”>users</span> whose count value is greater than 5.
The workflow has two logical steps after populating the <span class=“code-inline”>users</span> database: filtering the data using our query configuration, and then counting the number of rows returned by the query. <span class=“code-inline”>populate_users</span> and <span class=“code-inline”>filter_users</span> are identical to the previous example, refer to the source for the full code:
We will run this workflow twice:
Which creates distinct branches for our two <span class=“code-inline”>a</span> values:
As in the first example, we can still diff the two runs:
Flyte’s caching prevents unnecessary task re-runs, but Dolt will also no-op if a workflow rerun creates no data mutations:
Dolt stores data as a commit graph, just like Git. This means you can query data by branch name, as in the last example, or by timestamp or commit hash.
If we remember the <span class=“code-inline”>query_users</span> config from the last example:
We notice that the query is just a MySQL select statement. Dolt supports <span class=“code-inline”>AS OF</span> syntax in this statement, pointing to the table at a specific point in time. We use the command line to test how this works:
<span class=“code-inline”>HASHOF('run/a_is_3')</span> is a quick way to get the most recent commit of the <span class=“code-inline”>run/a_is_3</span> branch. If you wanted to pass the specific commit string, you can use these helpers:
Generated by Flyte execution id: ex:local:local:local
There are several ways you could use time travel and asof queries:
- Fetch the most recent data in a specific branch
- Input the 5 most recent versions of a table
- Only input newly added rows in the last commit
- Input data from exactly one month ago
We have discussed Flyte’s workflow automation software, Dolt’s versioned database, and why you might use Dolt in lieu of static CSV’s to version your data pipelines. We demonstrated a branching organization that creates different workspaces for different versions of an execution run, and how to perform diffs and time travel between those branches.
Dolt schemas are version controlled and gate check row entry, extending Flyte's typing assertions through the data layer. Dolt's branch and merge features are the only efficient solution for collaborating between isolated data workspaces, which when paired with Dolt's lineage history creates a new breed of data catalog. Dolt's storage layer is the only data structure that supports efficient diffing while maintaining decades of B tree optimizations, which lets you avoid manually trying to find the difference between <span class=“code-inline”>train_final_v5.csv</span> and <span class=“code-inline”>train_final_actually_v2.csv</span> when something breaks in production.