Samhita Alla
Samhita Alla

Flyte 1.10: Monorepo, New Agents, Eager Workflows and More

We're delighted to present our latest release! September and October kept us busy as we developed several exciting features and fixed numerous bugs, improving the overall developer experience. Notable highlights of the Flyte 1.10 release include the monorepo and new Flyte agents. Let's delve into the details.

Monorepo

The backend development of Flyte has been transitioned to a monorepo. This transition includes the migration of repositories such as datacatalog, flyteadmin, flytecopilot, flyteplugins, flytepropeller and flytestdlib. Each of these components now resides as a top-level directory in the flyte repo. We believe that this change will significantly enhance the contribution experience, making it easier to test and merge changes into the backend code. In an upcoming blog post, we will be providing a more detailed explanation of why we opted for this monorepo structure, how we executed the migration, and what the development experience will look like. 

New agents

Flyte 1.10 agents are not only more performant than ever — we also support more of them, including Airflow, Memverge, Snowflake, Databricks and sensors!

Airflow

The Airflow agent enables the smooth execution of Airflow tasks in the Flyte workflow without requiring any code changes. All the Airflow tasks will be executed on an Airflow agent (long-running server) rather than launching a new pod for each task, significantly reducing overhead.

This integration allows you to:

  • Compile Airflow tasks into Flyte tasks
  • Incorporate Airflow sensors/operators into Flyte workflows
  • Support the local execution of Airflow tasks without requiring a cluster setup

To install the plugin, run the following command:

Copied to clipboard!
pip install flytekitplugins-airflow

Here's an example of an Airflow file sensor:

Copied to clipboard!
from airflow.sensors.filesystem import FileSensor
from flytekit import task, workflow


@task
def t1():
    print("flyte")


@workflow
def wf():
    sensor = FileSensor(task_id="id", filepath="/tmp/1234")
    sensor >> t1()


if __name__ == "__main__":
    wf()

And here’s how you can define an Airflow time sensor:

Copied to clipboard!
from datetime import datetime, timedelta
from pytz import UTC

from airflow.sensors.time_sensor import TimeSensor
from flytekit import task, workflow, ImageSpec


@task
def t1():
    print("flyte")


@workflow
def wf():
    sensor = TimeSensor(task_id="fire_immediately", target_time=(datetime.now(tz=UTC) + timedelta(seconds=5)).time())
    sensor >> t1()


if __name__ == "__main__":
    wf()

You can find more examples of the Airflow agent in this PR.

MemVerge

The MemVerge plugin facilitates the execution of Flyte tasks using the MemVerge Memory Machine Cloud. It supports resource requests and limits (CPU and memory), container images and specifications for environment variables. ImageSpec can be used to define the images for running tasks.

The following secrets need to be defined for the agent server:

  • `mmc_address`: MMCloud OpCenter address
  • `mmc_username`: MMCloud OpCenter username
  • `mmc_password`: MMCloud OpCenter password

To install the plugin, use the following command:

Copied to clipboard!
pip install flytekitplugins-mmcloud

Here is an example showcasing the functionality of the MemVerge agent:

Copied to clipboard!
import pandas as pd
from flytekit import ImageSpec, Resources, task, workflow
from sklearn.datasets import load_wine
from sklearn.linear_model import LogisticRegression

from flytekitplugins.mmcloud import MMCloudConfig

image_spec = ImageSpec(packages=["scikit-learn"], registry="docker.io/memverge")


@task
def get_data() -> pd.DataFrame:
    """Get the wine dataset."""
    return load_wine(as_frame=True).frame


@task(task_config=MMCloudConfig(), container_image=image_spec)  
def process_data(data: pd.DataFrame) -> pd.DataFrame:
    """Simplify the task from a 3-class to a binary classification problem."""
    return data.assign(target=lambda x: x["target"].where(x["target"] == 0, 1))


@task(
    task_config=MMCloudConfig(submit_extra="--migratePolicy [enable=true]"),
    requests=Resources(cpu="1", mem="1Gi"),
    limits=Resources(cpu="2", mem="4Gi"),
    container_image=image_spec,
    environment={"KEY": "value"},
)
def train_model(data: pd.DataFrame, hyperparameters: dict) -> LogisticRegression:
    """Train a model on the wine dataset."""
    features = data.drop("target", axis="columns")
    target = data["target"]
    return LogisticRegression(max_iter=3000, **hyperparameters).fit(features, target)


@workflow
def training_workflow(hyperparameters: dict) -> LogisticRegression:
    """Put all of the steps together into a single workflow."""
    data = get_data()
    processed_data = process_data(data=data)
    return train_model(
        data=processed_data,
        hyperparameters=hyperparameters,
    )

Snowflake

The Snowflake agent enables the execution of a query on a Snowflake database, both locally and remotely.

To install the plugin, use the following command:

Copied to clipboard!
pip install flytekitplugins-snowflake

Here’s an example of the Snowflake agent:

Copied to clipboard!
try:
    from typing import Annotated
except ImportError:
    from typing_extensions import Annotated

import pandas as pd
from flytekit import StructuredDataset, kwtypes, task, workflow
from flytekitplugins.snowflake import SnowflakeConfig, SnowflakeTask
from flytekit.types.schema import FlyteSchema

snowflake_task_templatized_query2 = SnowflakeTask(
    name="sf",
    inputs=kwtypes(number=int),
    output_schema_type=StructuredDataset ,
    task_config=SnowflakeConfig( 
        user='hhcs9527',
        account='ed61997.europe-west4.gcp',
        database='AGENT',
        schema='PEOPLE',
        table='INTRO',
        warehouse='COMPUTE_WH'
    ),
    query_template="SELECT AGE, NAME from INTRO limit %(number)s",
)

@task
def convert_bq_table_to_pandas_dataframe(sd: DogeCoinDataset) -> pd.DataFrame:
    return sd.open(pd.DataFrame).all()

@workflow
def sf_query_wf(number: int) -> pd.DataFrame:
    sd = snowflake_task_templatized_query2(number=number)
    return convert_bq_table_to_pandas_dataframe(sd=sd)

if __name__ == "__main__":
    print(sf_query_wf(number=2))

Databricks

The Databricks agent can be used to submit Spark jobs to the Databricks platform.

To install the plugin, run the following command:

Copied to clipboard!
pip install flytekitplugins-spark

Here is an example showcasing the functionality of the Databricks agent:

Copied to clipboard!
import datetime
import os
import random
from operator import add

import flytekit
from flytekit import Secret, task, workflow
from flytekitplugins.spark import DatabricksAgent

SECRET_GROUP = "token-info"
SECRET_NAME = "token_secret"


@task(
    secret_requests=[
        Secret(
            group=SECRET_GROUP,
            key=SECRET_NAME,
            mount_requirement=Secret.MountType.ENV_VAR,
        )
    ],
    task_config=DatabricksAgent(
        spark_conf={
            "spark.driver.memory": "1000M",
            "spark.executor.memory": "1000M",
            "spark.executor.cores": "1",
            "spark.executor.instances": "2",
            "spark.driver.cores": "1",
        },
        executor_path="/databricks/python3/bin/python",
        applications_path="dbfs:/entrypoint.py",
        databricks_conf={
            "run_name": "flytekit databricks plugin example",
            "new_cluster": {
                "spark_version": "12.2.x-scala2.12",
                "node_type_id": "n2-highmem-4",
                "num_workers": 1,
            },
            "timeout_seconds": 3600,
            "max_retries": 1,
        },
        databricks_instance=os.environ["DATABRICKS_HOST"],
    ),
    container_image="flyteorg/flytekit:databricks-0.18.0-py3.7",
)
def hello_spark(partitions: int) -> float:
    n = 100000 * partitions
    sess = flytekit.current_context().spark_session
    count = sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    pi_val = 4.0 * count / n
    return pi_val


def f(_):
    x = random.random() * 2 - 1
    y = random.random() * 2 - 1
    return 1 if x**2 + y**2 <= 1 else 0


@task(cache_version="1")
def print_every_time(value_to_print: float, date_triggered: datetime.datetime) -> int:
    print("My printed value: {} @ {}".format(value_to_print, date_triggered))
    return 1


@workflow
def my_databricks_job(
    triggered_date: datetime.datetime = datetime.datetime.now(),
) -> float:
    pi = hello_spark(partitions=50)
    print_every_time(value_to_print=pi, date_triggered=triggered_date)
    return pi

Base and file sensors

This feature was introduced in the v1.9.1 patch release. 

Sensors are valuable for waiting for specific events to occur. You can inherit the `BaseSensor` class to create a custom sensor in Flyte. Here's an example of a file sensor:

Copied to clipboard!
import asyncio
from typing import Optional, TypeVar

from flytekit import FlyteContextManager
from flytekit.sensor.base_sensor import BaseSensor

T = TypeVar("T")


class FileSensor(BaseSensor):
    def __init__(self, name: str, config: Optional[T] = None, **kwargs):
        super().__init__(name=name, sensor_config=config, **kwargs)

    async def poke(self, path: str) -> bool:
        fs = FlyteContextManager.current_context().file_access.get_filesystem_for_path(path, asynchronous=True)
        return await asyncio.to_thread(fs.exists, path)

Pyflyte ergonomic improvements

  • The command `pyflyte run remote-launchplan` allows the execution of launch plans from the server on the CLI.
  • The `pyflyte run` command now supports all launch plan parameters, including labels, annotations, service accounts and tags. Labels and annotations serve as metadata attachments for objects in Kubernetes, while service accounts provide identities for Kubernetes pods. Tags correspond to the tags that can be set for an execution. 
Copied to clipboard!
pyflyte --verbose run --remote --env MY_ENV1=VAL1 --env MY_ENV2=VAL2 \
    --label label1=value1 \
    --annotation annotation1=value1 \
    --service-account default \
    --tags my-tag \
    --raw-output-data-prefix s3://.../test-override \
    --max-parallelism 30 \
    hello.py wf
Options available for the `pyflyte run` command.
  • Use pyflyte to activate and deactivate launch plans.
Copied to clipboard!
# Activate all launchplans
pyflyte register module.py --activate-launchplans

# Deactivate a launch plan
pyflyte launchplan every_min_installer --deactivate
  • Use pyflyte to interact with gate nodes in local executions. This allows you to debug workflows that use gate nodes so that you can test them out without running them on a Flyte cluster
  • Beautified output of the `pyflyte run` command (introduced in the v1.9.1 patch release)

Programmatically consume inputs and outputs

The Flyte UI now displays flyte-remote code snippets that illustrate how to access the inputs and outputs of an execution. You can conveniently copy and paste these snippets to retrieve data from your execution.

https://user-images.githubusercontent.com/6610300/277036319-b609c954-e72b-4e88-95c4-b6f22b07d4f2.png

Eager workflows

This feature was introduced in the v1.9.1 patch release.

Unlike static and dynamic workflows, eager workflows enable the use of familiar Python constructs through the `asyncio` API. To illustrate this, here's a simple eager workflow using the `@eager` decorator.

Copied to clipboard!
from flytekit import task, workflow
from flytekit.experimental import eager


@task
def add_one(x: int) -> int:
    return x + 1


@task
def double(x: int) -> int:
    return x * 2


@eager
async def simple_eager_workflow(x: int) -> int:
    out = await add_one(x=x)
    if out < 0:
        return -1
    return await double(x=out)

When you decorate a function with `@eager`, any function invoked within it that’s decorated with `@task`, `@workflow`, or `@eager` becomes an awaitable object within the lifetime of the parent eager workflow execution. Note that this happens automatically and you don’t need to use the `async` keyword when defining a task or workflow that you want to invoke within an eager workflow.

What can you do with eager workflows?

  • Operate on task and sub-workflow outputs
  • Define Python conditionals 
  • Define loops
  • Invoke static workflows
  • Nest eager subworkflows
  • Catch exceptions

Local entrypoint and support for offloaded types

New in v1.10.0 release.

We have added a new feature that enables the execution of an eager workflow locally with the `local_entrypoint` argument, with the tasks or sub-workflows being run remotely. Moreover, all offloaded types such as FlyteFile, FlyteDirectory and StructuredDataset will materialize as Python values, fully downloaded into the pod.

Copied to clipboard!
import random
from flytekit import task
from flytekit.experimental import eager
from flytekit.remote import FlyteRemote


@task
def create_structured_dataset(x: int) -> StructuredDataset:
    df = pd.DataFrame({"a": [random.random() for _ in range(x)]})
    return StructuredDataset(dataframe=df)


@eager(remote=FlyteRemote(...), local_entrypoint=True)
async def eager_wf() -> int:
    # this will be run on the configured Flyte remote
    dataset = await create_structured_dataset(x=x)

    # this will materialize the DataFrame locally, which
    # you can operate on in the eager workflow itself
    df = dataset.open(pd.DataFrame).all()
    df = df.assign(b=df["a"] * 2)

    return int(df["b"].sum())

When you specify `local_entrypoint=True`, the eager workflow literally becomes a local entrypoint to the configured `FlyteRemote` cluster. This feature is designed for you to iterate much more quickly in a local environment so that you can leverage the power of your Flyte cluster when needed and materialize any data locally so that you can debug, develop and experiment more easily.

You can access the documentation on eager workflows here.

FlyteDirectory batch upload

This feature was introduced in the v1.9.1 patch release.

Optimize memory consumption by defining the batch size during FlyteDirectory upload or download. This designated batch size will be utilized to process the directory in manageable chunks during the upload or download process.

Copied to clipboard!
import os

from flytekit import BatchSize, task, workflow
from flytekit.types.directory import FlyteDirectory
from typing_extensions import Annotated

directory = "/tmp/test"


@task
def generate_files(num_files: int, filesize_mb: int) -> Annotated[FlyteDirectory, BatchSize(100)]:
    os.mkdir(directory)
    for i in range(num_files):
        file_path = os.path.join(directory, f"file_{i}.txt")
        with open(file_path, "w") as f:
            f.write(str(os.urandom(filesize_mb * 1024 * 1024)))
    return FlyteDirectory(directory, remote_directory="s3://flyte-benchmark/test/")


@task
def get_files(d: Annotated[FlyteDirectory, BatchSize(100)]):
    d._downloader()


@workflow
def ton_of_files_wf(num_files: int = 1000, filesize_mb: int = 8):
    d = generate_files(num_files=num_files, filesize_mb=filesize_mb)
    get_files(d=d)

Pydantic type transformer

Pydantic is a popular Python data validation library that provides a flexible approach for users to define custom types, mapping field names to a range of value types, much like data classes. With this integration, Pydantic base models can now be utilized as inputs and outputs in Flyte tasks.

To install the plugin, use the following command:

Copied to clipboard!
pip install flytekitplugins-pydantic

The following is an example of the Pydantic integration:

Copied to clipboard!
from pydantic import BaseModel

class Config(BaseModel):
    lr: float = 1e-3
    batch_size: int = 32
    files: List[FlyteFile]
    directories: List[FlyteDirectory]

@task
def train(cfg: Config):
    ...

Mashumaro to serialize/deserialize dataclasses

In contrast to dataclass JSON, Mashumaro expedites the serialization and deserialization of dataclass — up to 1.5 times faster for more substantial tasks. For a comprehensive analysis of the performance benchmarks, please refer to the detailed breakdown provided here.

Copied to clipboard!
from dataclasses import dataclass
from typing import List


from flytekit import Resources, task, workflow
from mashumaro.mixins.json import DataClassJSONMixin


@dataclass
class CurrencyPosition(DataClassJSONMixin):
    currency: str
    balance: float


@dataclass
class StockPosition(DataClassJSONMixin):
    ticker: str
    name: str
    balance: int


@dataclass
class OP(DataClassJSONMixin):
    currencies: List[CurrencyPosition]
    stocks: List[StockPosition]


@task
def t1(obj: OP) -> OP:
    ...
    return obj


@workflow
def wf() -> OP:
    my_op = OP(
        currencies=[
            CurrencyPosition("USD", 238.67),
            CurrencyPosition("EUR", 361.84),
        ],
        stocks=[
            StockPosition("AAPL", "Apple", 10),
            StockPosition("AMZN", "Amazon", 10),
        ],
    )
    return t3(obj=my_op)

Other enhancements

  • In the flytekit `conditional()` block, you can check for `None` using `is_none()`. Both `is_true()` and `is_false()` are already supported. For example:
Copied to clipboard!
@workflow
def decompose_unary() -> int:
    result = return_true()
    return conditional("test").if_(result.is_none()).then(success()).else_().then(failed())
  • Azure workload identity can now be used for `fsspec` in flytekit, allowing flytekit to make use of AKS-enabled workload identities. Before this change, reading and writing data to a storage account was only possible by setting environment variables for the storage account key. Set `anon` to `False` to use the default credentials provided by AKS workload
  • The new version adds support for `admin.clientSecretEnvVar` in the flytectl config.yaml file for use by flytekit
  • We’ve provided an alternative deployment guide for Flyte on Google Cloud Platform (GCP) leveraging the Google Compute Engine (GCE) ingress controller, GCP managed TLS certificates and GCP Identity Aware Proxy with the goal of implementing a zero-trust access model
  • The `pyflyte backfill` command now supports the use of `WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE` in the form of the `--no-fail-fast` option. When set to True, the backfill will fail immediately if any of the backfill steps fail. When set to False, the backfill will continue to run even if some of the backfill steps fail
  • The MLFlow plugin now works with Python 3.11
  • Version 1.10 adds support for Azure Blob Storage for storing metadata and raw data, including structured datasets, without interrupting other standard Azure authentication. Ensure that `storage_options` are set consistently for all uses of `fsspec`
  • The `enable_deck` option is now available in the `@task` decorator, enabling the viewing of decks in Flyte tasks
  • We’ve added an image type transformer to support `PIL.Image`; it’s now a valid type in flytekit
  • `FlyteRemote.execute()` now allows execution name prefixes, enabling the launch of multiple executions with the same execution name prefix. Under the hood, a UUID is appended to the execution name (introduced in the v1.9.1 patch release)

You can find the detailed release notes here.

Docs improvements

  • Added examples of eager workflows to the documentation.
  • Restructured the documentation for a more streamlined developer experience.
  • Revamped the basics section within the documentation.
  • Documented the integration of the MMCloud agent.

1.10 contributors

We extend our heartfelt gratitude to all the contributors who have made invaluable contributions to Flyte 1.10. Thank you for your dedication and support!

{{contributors-1-10="/blog-component-assets"}}

We highly value the feedback of our users and community members, which helps us to improve our product continuously. To connect with other users and get support from our team, we encourage you to join our Slack channel. For updates on product development, community events, and announcements, follow us on Twitter to join the conversation and share your thoughts.

In case you encounter any issues or bugs, we request you let us know by creating a GitHub issue. If you find Flyte useful, don't forget to ⭐ on GitHub.