David Espejo
David Espejo
David Espejo
David Espejo
Shivam Sharma
Shivam Sharma

So you think you can type: how Flyte’s type system works

Introduction

“Writing software without types lets you go at full speed. Full speed towards the cliff.” —Tom Hacohen, CTO @ Svix

Flyte's type system is a key element that enables it to function as a robust dataflow engine, laying out the foundation for many features crucial for building reliable and efficient ML workflows, including:

  • Versioning: the type engine provides a structured way to define and manage data types, which ensures reproducibility and consistency across different workflow executions. This approach helps in maintaining consistent data types across different versions of workflows, enabling seamless versioning and rollback capabilities.
  • Memoization: Flyte's type annotations allow the engine to create unique cache keys for tasks based on the serialized representation of their inputs. If a task is executed with the same inputs as a previous run, Flyte can simply retrieve the cached output, saving valuable compute time and resources. For ML tasks like hyperparameter tuning or model training, which can be computationally expensive, this caching mechanism offers significant performance improvements.

This post explores Flyte's type engine, its motivations, inner workings, and benefits for ML workflows.

What’s a type system?

Let’s start with the basics. In the words of Guido van Rossum, a type is “a set of values and a set of functions that one can apply to these values” (source). The `bool` type, for example, is a set of two values and a few supported functions (or operators):

If a program only had a single function, it wouldn’t matter much what type of data it returns. A program, though, consists of multiple functions. This is especially true in ML/Data pipelines, where there may be multiple stages of data processing, training, validation, and serving, with each function calling another function to complete the next step in the process.

Say that each step in this hypothetical pipeline is implemented with a single function as shown in the diagram below:

We want to ensure that the Caller function output type is consistent with the input the called function expects.

The notion of consistency in Python is introduced in PEP-483. To define it, we need to briefly introduce the notion of subtypes first:

For a type to be considered a subtype, it has to meet the following criteria:

  • Every value from second_type is also in the set of values of first_type; and
  • every function from first_type is also in the set of functions of second_type.

With our example here:

  • Every value from `int` is also in the set of values of `float`, because every integer is also a real number (and not the opposite way)
  • Every function from `float` is also in the set of functions of `int`, because you can sum or multiply integers just as you do with floats.

This relationship is part of the requirements for two types to be considered consistent:

  • A type `t1` is consistent with a type `t2` if `t1` is a subtype of `t2` (but not the other way around).

In our example, `int` is a subtype of `float`, so `int` is consistent with `float` and not the other way around. 

In consequence, a program like the following demonstrates type consistency:

Copied to clipboard!
def get_data(a):
    o1 = a**2
    return o1 # returns a float

def transform_data(o1):
    o2 = o1 + 3 # safe as it uses an operation that's part of the float operations 
    return o2

print(read_data(get_data(5.0))) #we passed a float as argument 
>>> 28.0

But a program like this demonstrates type inconsistency:

Copied to clipboard!
def get_data(a):
    o1 = a**2
    return o1 # returns a float 

def transform_data(o1):
    o2 = o1 & 3  # unsafe as it uses an operation that's only supported on integers 
    return o2

print(read_data(raise_number(5.0)))

Failing with the following error:

Copied to clipboard!
TypeError: unsupported operand type(s) for &: 'float' and 'int'

Bitwise operands are only available for integers, not floats. As `float`, the type passed to the `transform_data` function is not a subtype of `int`, the compiler detects a type mismatch. 

The above is an elemental example where an experienced eye could see the type mismatch without even running the program. Bigger codebases will need better documentation to help new developers understand the desired behavior and where to start debugging. 

As an optional measure to enable developers to write code that’s easier to read and where type inconsistencies are mitigated more effectively, Python added Type hints in version 3.5, released in 2015. 

In our example program, using type hints would make it much easier to detect the type mismatch:

Copied to clipboard!
def get_data(a:float)-> float: # it will return a float 
    o1 = a**2
    return o1

def transform_data(o1: int)-> int: # it expects an integer 
    o2 = o1 & 3 
    return o2

print(read_data(raise_number(5.0)))

With further additions to its type system, Python adopted gradual typing, allowing users to annotate types in different areas of the code but without making it mandatory so they can be used by third-party tools such as type checkers, IDEs, or linters (source).

So, why a Flyte type system?

Flyte was conceived to be the platform that holds together -under a common API- the heterogeneous tools that make up an ML system. Developing a specialized type system was the most suitable option to interoperate with systems that support different data types and formats.

The decision to embrace static typing for task inputs and outputs was driven by several key factors:

  • Avoiding Subtle Bugs: There was no typing support in the early versions of Flyte. The first version of Flyte's memoization did not use types at all, simply hashing the inputs and the task name to memoize. This approach led to multiple subtle bugs when users updated the return type or the type of one of the inputs.
  • Enabling Versioning and Collaboration: As Flyte adoption started to grow, there was an increasing demand to enable technical and non-technical users to execute registered versions of workflows. Maintaining and executing these versions, while passing in the right data, required a bold approach to reliability.
  • Improving User Experience: Even today, if you try to launch a workflow in the Flyte UI, it prevents you from sending inconsistent data to the workflow. 
“Launch workflow” menu in Flyte’s UI. Here the required input also carry the expected data type (string)

Moreover, the CLI automatically provides a help string showing the required inputs. 

This positively impacts the user experience and simplifies collaboration between teams.

  • Handling Diverse Data Formats: As multiple teams started using Flyte, we observed that some data scientists preferred working with pandas dataframes or polars for small amounts of data, while the upstream data processing produced Spark dataframes. By then, the Arrow project was getting started and Flyte leaned heavily into the Arrow ecosystem to make it part of the core type system. Flytekit relies on `pyarrow` to efficiently read and write big volumes of data in parquet format.
  • Ensuring Data Integrity and Governance: Flyte automatically manages users' data without making them decide on storage locations. This helps ensure that the platform can provide exactly-once semantics, eliminating the chances of data corruption. It also makes it possible to make central decisions to comply with regulations like GDPR, HIPAA, etc.

How it works?

There are two abstractions that the Flyte’s type system provides: Literal and LiteralType.

The `LiteralType` defines the structure of a type including the type definition or type “metadata”, whereas `Literal` is the instance that represents the data transformed from the original values, following the schema defined by `LiteralType`.

They both are described in Google’s Protocol Buffers (Protobuf), an intermediate representation (IR) that enables fast iteration while maintaining backward compatibility, correctness, and speed.

Think of these abstractions as a shipping box. The `Literal` is the content of the box while the `LiteralType` is the label.

When you register a task to the Flyte API, the task is serialized into a protobuf representation that contains the input and output types. The protobuf is sent to `flyteadmin` (Flyte’s control plane) which, in turns, performs type checking. If type checks pass, the compiled workflow is stored in blob storage alongside version and metadata. 

Here, Flyte’s IDL (Interface Definition Language) plays a significant role by providing the necessary type definitions that enable Flyte to perform static type checking, ensuring that the data conforms to expected types throughout the workflow execution.

Summarized registration and type-checking process

Let’s take the `StructuredDataset` type as an example. Flyte represents dataframes using `StructuredDataset`, supporting storage formats like parquet, csv, or feather, to efficiently move data coming from heterogeneous systems between tasks and workflows.

According to its protobuf definition, `StructuredDataset` defines a list of columns where each column has a `name` and a `literal_type`.

Hence, it could represent something like this in a dataset:

Copied to clipboard!
columns = [
    {name: "age", literal_type: INTEGER},
    {name: "name", literal_type: STRING},
    {name: "salary", literal_type: FLOAT}
]

Also, it supports not only simple types but also any other Flyte type to represent complex datasets:

Copied to clipboard!
columns = [
    {name: "user_id", literal_type: INTEGER},
    {name: "preferences", literal_type: {map_value_type: STRING}}
    {name: "scores", literal_type: {collection_type: FLOAT}},// array of floats 
    {name: "metadata", literal_type: {blob: {format: "json", dimensionality: SINGLE}}},
    {name: "status", literal_type: {enum_type: {values: ["ACTIVE", "PENDING", "INACTIVE"]}}}
]

When you pass data between tasks, Flyte validates if the two StructuredDataset types are compatible by checking the rules defined in the protobuf representation:

Copied to clipboard!
    // For two types to be compatible, the format will need to be an exact match.
    string format = 2;

The `format` field specifies the storage format and type checking will only pass if the format between two `StructuredDataset` instances are compatible. 

The format could be `parquet`, `csv`, `feather` or any other custom format.  By default, Flyte serializes datasets and stores them in `parquet` format, and there are several reasons for this:

Why parquet?

  1. Efficiency: Parquet is a columnar storage file format optimized for use with big data processing frameworks. It provides efficient data compression and encoding schemes, which result in significant storage savings and faster query performance.
  2. Interoperability: Parquet files are widely supported across various data processing tools and frameworks, including Pandas, Spark, and others. This makes it easier to integrate and process data across different systems.
  3. Schema Evolution: Parquet supports schema evolution, allowing you to add new columns to your data without breaking existing workflows. This flexibility is crucial for maintaining and updating data pipelines.
  4. Serialization/Deserialization: flytekit automatically handles the serialization of Pandas DataFrames to Parquet files and deserialization back to DataFrames, leveraging the pyarrow library, Apache Arrow’s Python binding. This simplifies the process of passing data between tasks in a workflow.

Let’s see how this is implemented in the following example:

Copied to clipboard!
from flytekit import task, workflow
import pandas as pd #This is a dependency of the StructuredDataset type 
from flytekit import task, workflow 

@task
def create_dataset() -> pd.DataFrame:
    # This will be stored as parquet by default 
    return pd.DataFrame({
        "name": ["Alice", "Bob"],
        "age": [25, 30]
    })

@task
def process_dataset(df: pd.DataFrame) -> pd.DataFrame:
    # This task can accept the dataset because the formats match 
    return df[df["age"] > 25]

@workflow
def my_workflow()-> pd.DataFrame:
    df = create_dataset()
    processed = process_dataset(df=df)
    return processed

With the dataset represented by Flyte IDL as:

Copied to clipboard!
message StructuredDatasetType {
    columns = [
        { name: "name", literal_type: { simple: STRING } },
        { name: "age", literal_type: { simple: INTEGER } }
    ]
    format = "parquet"  
}

On the other hand, for a program like the following, type checking would fail because of format mismatch:

Copied to clipboard!
from flytekit import task, workflow
import pandas as pd

@task
def create_csv_dataset() -> pd.DataFrame:
    # Let's say we explicitly set this to store as CSV 
    df = pd.DataFrame({"name": ["Alice"], "age": [25]})
    return df.with_format("csv")

@task
def process_parquet_dataset(df: pd.DataFrame) -> pd.DataFrame:
    #This task expects parquet format   
    df = df.with_format("parquet")
    return df

@workflow
def my_workflow()-> pd.DataFrame:
    df = create_csv_dataset()
    # This would fail type checking because formats don't match 
    # CSV != Parquet 
    processed = process_parquet_dataset(df=df)  # Type error! 
    return processed

As you may have observed, all tasks and workflows are annotated with Type Hints. While 

Python runtime does not enforce function and variable type annotations, Flyte’s type checking can better act when you type annotate your functions.

Also, the type engine can infer types in many cases, but that doesn’t come with any guarantees. You could even annotate the above example with the `Any` type and Flyte will try to infer types but due to the lack of metadata, it will fail:

Copied to clipboard!
AttributeError: Failed to convert inputs of task  'workflows.staticTypeChecking.process_parquet_dataset':
  'NoneType' object has no attribute 'metadata'

At the core of the Flyte’s type system is another crucial element: the `TypeTransformers`, used to convert between different representations of data, including from Flyte types (like the `StructuredDataset`) to native Python types (like `pyspark.DataFrame`) and vice versa. The documentation gives more details about the mapping between Python and Flyte types.

When to extend it?

So far you may have noticed how opinionated Flyte’s type system is and how it diverts from some of the ideas in Python’s type system and also how it bridges gaps that Python has (like the lack of a standardized representation for dataframes). This is because this type engine wasn’t built thinking exclusively on Python.

A benefit of using protobuf as an intermediate representation lies in its language-independence. The type definitions and rules for type checking apply to any language you use to author tasks and workflows.

Flyte’s Python SDK (`flytekit`) incorporates a fallback serialization mechanism for objects that don’t have native Flyte type support. When Flyte encounters an unsupported data type, it uses `cloudpickle` (an extended version of Python’s `pickle` module) to:

  • Serialize the object to a file
  • Store it on blob storage (either local or remote)
  • Track it using the type engine
  • Deserialize it when needed later in the workflow

There are known issues with using `pickle`. It can only send objects between exact same Python versions, which could be challenging as the code moves from development to production in different environments. Also, an insecure connection between client and server can lead to remote code execution. Flyte mitigates these risks by executing each Task in an isolated container that encapsulates the required Python environment, the necessary libraries, and their versions. Also, the pickle object is only accessible by the container, reducing the risk of untrusted storage locations.

Another particular limitation of the pickling process in the Flyte type system is that most type information is lost during the serialization process.

Only two pieces of information are stored:

1. The format identifier (`PythonPickle`)

2. A string representation of the type in metadata. While it preserves the full generic structure even for nested types, it's stored as a string. In consequence, it’s not used for runtime type checking. The actual type safety comes from:

  • The original type hints in the code
  • The type information preserved by `cloudpickle` in the serialized data itself

The lost information includes generic type parameters and type hierarchy.

This means that the object and its type information are preserved through `cloudpickle`'s serialization, but only minimal type information is preserved for execution efficiency.

But, what if you prefer not to serialize your data types into `pickle`? Enter the `TypeTransformer`. 

We already mentioned its role in converting from Python native types to Flyte types and viceversa. All you would need to do, then, is to write a custom `TypeTransformer` defining how your custom type should be converted into a `Literal` representation supported by Flyte, and let `flytekit` know that your custom type is now valid. A complete tutorial can be found in the documentation.

Conclusion

Data type inconsistencies can happen in software systems, but the risks are exacerbated in AI pipelines where data transformations are the norm and the interaction of multiple tools and formats can quickly lead to expensive runtime errors.

Type safety is one of the core design principles that Flyte enforces, looking to strike a balance between usability, interoperability, and reliability.

Start the journey of production-grade ML/Data pipeline workflows with the `flytekit` SDK:

`pip install flytekit`

Learn how to get started quickly

Questions? Join the Slack community