Samhita Alla
Samhita Alla

Flyte 1.5: Streaming Support, Pod Templates, Partial Tasks and More

April was an exciting month for Flyte, as we launched a range of innovative features and upgrades that catered to user requests. Among the notable highlights of the Flyte 1.5 release were a revamped data subsystem and the introduction of general streaming support in the data persistence layer. Let's check out the details. 

Revamped data subsystem

One of the most significant changes in the Flyte 1.5 release is a complete revamp of the data subsystem. The data persistence layer has been entirely overhauled, and Flyte now exclusively utilizes `fsspec` to handle input and output operations.

This new implementation offers more performant IO, improving performance for most users. Importantly, this change does not require any modifications to user code.

Thanks to the revamped data subsystem, FlyteKit can now leverage the powerful streaming capabilities of `fsspec` for reading and writing data.

Stream files and directories

With Flyte’s new streaming capabilities, users can easily read and write data from remote file systems without the need to transfer entire files or directories. This can greatly improve performance and reduce network traffic, particularly when dealing with large files or data sets.

For example, let's say we want to stream a file from a remote location and copy it to another location. With the new FlyteFile streaming support, this can be accomplished with just a few lines of code:

Copied to clipboard!
@task
def copy_file(ff: FlyteFile) -> FlyteFile:
    new_file = FlyteFile.new_remote_file(ff.remote_path)
    with ff.open("r", cache_type="simplecache", cache_options={}) as r:
        with new_file.open("w") as w:
            w.write(r.read())
    return new_file

The new FlyteDirectory streaming support makes it possible to stream an entire directory. For instance, you can accept a FlyteDirectory as an input, walk through it and copy the files to another FlyteDirectory one by one. Here is an example of how you can achieve this:

Copied to clipboard!
@task
def process_folder(fd: FlyteDirectory) -> FlyteDirectory:
    out_fd = FlyteDirectory.new_remote()
    for base, x in fd.crawl():
        src = os.path.join(base, x)
        out_file = out_fd.new_file(x)
        with FlyteFile(src).open("rb") as f:
            with out_file.open("wb") as o:
                o.write(f.read())
    return out_fd

This feature is marked as experimental. We'd love feedback on the API!

Partial tasks

With the latest release, Flyte now has limited support for partial tasks. Partial tasks are tasks that can start executing with a subset of the inputs, and then wait for the rest of the inputs to become available before it continues.

This feature can be especially beneficial in situations where certain inputs need to be frozen.

Copied to clipboard!
@task
def t1(a: int, b: str) -> str:
    return f"{a} -> {b}"
  
t1_fixed_b = functools.partial(t1, b="hello")

@workflow
def wf(a: int) -> str:
    return t1_fixed_b(a=a)

Partial task support is also compatible with map tasks to a limited extent. For instance:

Copied to clipboard!
from flytekit import task, workflow, partial, map_task

@task
def t1(x: int, y: float) -> float:
    return x + y

@workflow
def wf(y: List[float]):
    partial_t1 = partial(t1, x=5)
    return map_task(partial_t1)(y=y)

We are currently seeking feedback on this feature; it’s labeled as “experimental” for now.

Database migrations

As part of our ongoing development efforts, we identified the need to clean up our database migrations. You’ll see a series of new migrations in Flyte 1.5.

If you are up-to-date on your migrations, these new migrations should have zero impact, as they are labeled "noop." Please note, however, that they will add around a minute to the init container/command that runs the migrations in the default Helm charts.

Worth noting: Since these migrations should be a no-op, they do not come with any rollback commands.

Approval flow in the UI

The approve() workflow node allows you to wait on an explicit approval signal before continuing execution. Users can now approve or reject executions through the UI.

Pod template support in `ContainerTask`

The pod template feature enables users to effortlessly define and tailor their pod specifications from within their task definition. With this functionality, users can personalize any container task to utilize unique pod capabilities.

Copied to clipboard!
ps = V1PodSpec(
    containers=[],
    tolerations=[
        V1Toleration(effect="NoSchedule", key="nvidia.com/gpu", operator="Exists")
    ],
)
ps.runtime_class_name = "nvidia"
nsr = V1NodeSelectorRequirement(
    key="nvidia.com/gpu.memory", operator="Gt", values=["10000"]
)
pref_sched = V1PreferredSchedulingTerm(
    preference=V1NodeSelectorTerm(match_expressions=[nsr]), weight=1
)
ps.affinity = V1Affinity(
    node_affinity=V1NodeAffinity(
        preferred_during_scheduling_ignored_during_execution=[pref_sched]
    )
)
pt = PodTemplate(pod_spec=ps, labels={"somelabel": "foobar"})

ct = ContainerTask(
    name="ellipse-area-metadata-shell",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(a=float, b=float),
    outputs=kwtypes(area=float, metadata=str),
    image=image,
    command=cmd,
    pod_template=pt,
)

The use of a `pod_template_name` enables data/ML engineers to abstract complex Kubernetes configurations, freeing them from having to understand the intricacies of Kubernetes PodTemplate. Platform administrators can create a Kubernetes resource PodTemplate and direct users to specify `pod_template_name="template_a"`. This automatically injects the correct Kuberenetes configuration, sparing the users the need to delve into the complexities of Kubernetes PodTemplate.

Headless authentication

The `DeviceAuth` flow for pyflyte now supports headless authentication, and can be enabled by adding the following to your `~/.flyte/demo-config.yaml` file:

Copied to clipboard!
❯ cat ~/.flyte/demo-config.yaml
admin:
  endpoint: dns:///xyz
  authType: DeviceFlow

After enabling `DeviceFlow`, you can authenticate by running a command such as:

Copied to clipboard!
❯ pyflyte run --remote hello.py wf

To Authenticate navigate in a browser to the following URL: https://xyz/activate and enter code: SCWBRTNH
OR copy paste the following URL: https://xyz/activate?user_code=SCWBRTNH

Authentication Pending...
Authentication Pending...
Authentication Pending...
Authentication Pending...
Authentication Pending...
Authentication Pending...
Authentication Pending...
Authentication Pending...
Authentication successful!
Go to https://xyz/console/projects/flytesnacks/domains/development/executions/f8cdccb9a534448e4883 to see execution in the console.

Import multiple modules in `pyflyte run`

You can now execute `pyflyte run` on a module that imports from multiple other modules.

Consider the following directory structure:

Copied to clipboard!
├── __init__.py
├── other.py
├── wf1
│   ├── __init__.py
│   ├── main.py # import t1
│   └── t1.py # import t2
└── wf2
	├── __init__.py
	└── t2.py
	└── t3.py
	└── t4.py

In this scenario, `main.py` imports from `t1.py`, and `t1.py` imports from `t2.py`. Running `pyflyte run main.py wf` should now be possible.

Other Enhancements

  • Added support for Python 3.11
  • Included TensorFlow model Flyte type
  • Separated flyte-binary services into http and grpc via Helm charts
  • Ray jobs can now be submitted to separate clusters instead of the one FlytePropeller is running

New guides

  • Guide to integrating MLFlow: The MLflow Tracking component provides an API and user interface that enables logging of parameters, code versions, metrics and output files during the execution of your ML code.
  • Guide to using signal in workflow execution: This guide outlines a workflow execution process that can be paused until a specified amount of time has passed or until it receives external inputs that are not part of the workflow execution inputs.
  • Guide to integrating DuckDB: This guide explains how to use DuckDBQuery to run SQL queries with DuckDB in Flyte.

1.5 Contributors

We would like to extend our sincere gratitude to all the contributors who have made valuable contributions to Flyte 1.5. Your efforts in providing code, documentation, bug fixes and feedback have been instrumental in the continuous improvement and enhancement of Flyte. 

{{contributors-1-5="/blog-component-assets"}}


We highly value the feedback of our users and community members, which helps us to improve our product continuously. To connect with other users and get support from our team, we encourage you to join our Slack channel. For updates on product development, community events, and announcements, follow us on Twitter to join the conversation and share your thoughts.

In case you encounter any issues or bugs, we request you let us know by creating a GitHub issue. If you find Flyte useful, don't forget to star us on GitHub