Mitigate the trade-off between scalability and ease of use

With its meticulous design, robust architecture, and simple SDKs, Flyte makes it easy to orchestrate your data and ML workflows. It is an ideal choice for high-performance teams to build workflows better and deploy faster. Make the switch — your team will thank you for it.

Build

Flyte enables you to build your data and ML workflows with robustness and scalability from the ground up. It hand-holds you throughout the creation process with best-in-class features.

Strongly typed interfaces

Validate your data at every step of the workflow by defining data guardrails using Flyte types. Integrate Flyte with Pandera and Great Expectations natively to perform in-depth validation and ensure that your data meets the necessary standards.

Copied to clipboard!
from typing import List

from flytekit import task, workflow


@task
def mean(x: List[float]) -> float:
    return sum(x) / len(x)


@task
def variance(x: List[int], mu: float) -> float:
    return sum([(val - mu) ** 2 for val in x]) / len(x)


@workflow
def variance_wf(x: List[float] = [1.3, 2.4, 3.9]) -> float:
    return variance(x=x, mu=mean(x=x))

`float` and `int` types don’t match, hence compilation results in a type mismatch error.

flytekit.core.type_engine.TypeTransformerFailedError: Cannot convert literal scalar {
  primitive {
	float_value: 1.3
  }
}
 to <class 'int'>

Any language

You should be able to adopt the programming language of your choice to orchestrate data and ML workflows. Flyte lets you write code in any language using raw containers, or choose Python, Java, Scala or JavaScript SDKs to develop your Flyte workflows. You can use the languages you are most comfortable with, making it easier to design and manage your workflows.

Any language
Copied to clipboard!
import pandas as pd
from flytekit import Resources, task


@task(limits=Resources(cpu="2", mem="150Mi"))
def total_pay(hourly_pay: float, hours_worked: int, df: pd.DataFrame) -> pd.DataFrame:
    return df.assign(total_pay=hourly_pay * hours_worked)
Copied to clipboard!
import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import org.flyte.flytekit.SdkRunnableTask;
import org.flyte.flytekit.jackson.JacksonSdkType;

@AutoService(SdkRunnableTask.class)
public class TotalPayTask extends SdkRunnableTask<TotalPayTask.Input, TotalPayTask.Output> {
  public TotalPayTask() {
    super(JacksonSdkType.of(Input.class), JacksonSdkType.of(Output.class));
  }
  @AutoValue
  public abstract static class Input {
    public abstract double hourlyPay();
    public abstract long hoursWorked();
  }
  @AutoValue
  public abstract static class Output {
    public abstract double output();
    public static Output create(double output) {
      return new AutoValue_TotalPayTask_Output(output);
    }
  }
  @Override
  public Output run(Input input) {
    return Output.create(input.hourlyPay() * input.hoursWorked());
  }
}
Copied to clipboard!
import org.flyte.flytekit.SdkRunnableTask
import org.flyte.flytekitscala.SdkScalaType

case class TotalPayInput(hourlyPay: Double, hoursWorked: Long)
case class TotalPayOutput(output: Double)

class TotalPayTask extends SdkRunnableTask(SdkScalaType[TotalPayInput], SdkScalaType[TotalPayOutput]) {
  def run(input: TotalPayInput): TotalPayOutput = 
    TotalPayOutput(output = input.hourlyPay + input.hoursWorked)
}

“We got over 66% reduction in orchestration code when we moved to Flyte — a huge win!”

— Seth Miller-Zhang, Senior Software Engineer at ZipRecruiter

Map tasks

Parallel processing is useful for scenarios from hyperparameter optimization and complex data processing. Flyte’s map tasks enable parallel processing by dividing larger computations into independent pieces.

Copied to clipboard!
@task
def mappable_task(string_item: str) -> dict[str, int]:
    return dict(Counter(string_item.split()))


@workflow
def wf(string_list: list[str] = ["Deer Bear", "Car Deer"]) -> dict[str, int]:
    mapped_out = map_task(mappable_task)(string_item=string_list).with_overrides(
        retries=1
    )

    return coalesce(string_list=mapped_out)

Run a task in parallel by enclosing it in `map_task` construct.

Dynamic workflows

Dynamism in DAGs is crucial to building dynamic workflows, particularly in ML. Build dynamic DAGs in Flyte effortlessly using the `@dynamic` decorator. This allows you to create flexible and adaptable workflows that can change and evolve as needed, making it easier to respond to changing requirements.

Copied to clipboard!
@dynamic
def evaluate_model(
    models: List[JoblibSerializedFile],
    X: List[List[float]],
    y: List[float],
) -> List[List[str]]:
    final_result = []
    for model_ser in models:
        model = joblib.load(model_ser)
        cv = RepeatedStratifiedKFold(n_splits=7, n_repeats=2, random_state=1)
        scores = cross_val_score(model, X, y, scoring="accuracy", cv=cv, n_jobs=-1)
        final_result.append(
            evaluate_flyte_task(name=str(model_ser), scores=scores.tolist())
        )
    return final_result

A dynamic workflow to compute cross-validation scores across different models.

Branching

The `conditional` statement in Flyte allows you to selectively execute branches of your workflow based on static or dynamic data produced by other tasks or input data. That’s useful when you need to make decisions based on the results of previous tasks, or on external data.

Copied to clipboard!
@workflow
def conditional_wf():
    conditional("stop_if_fails").if_(validated.is_false()).then(
        summary_drift_report(
            new_data=target_df,
            reference_data=get_reference_data(),
        )
    ).else_().then(make_predictions(input_data=target_df, output_path="./data"))

Based on the outcome of the data validation, the conditional statement selects the appropriate task to execute.

FlyteFile & FlyteDirectory

Flyte simplifies the process of transferring files and directories between local and cloud storage. It establishes a seamless connection and streamlines the movement of data from one to the other.

Copied to clipboard!
@task
def normalize_columns(
    csv_url: FlyteFile,
    column_names: List[str],
) -> FlyteFile:
    with open(csv_url, newline="\n") as input_file:
        ...
    return FlyteFile(path=out_path, remote_path=output_location)


@task
def download_files(csv_urls: List[str]) -> FlyteDirectory:
    ...
    return FlyteDirectory(path=local_dir)

Eliminate the need for boilerplate code when downloading and uploading files and directories with Flyte.

Nicholas LoFaso, Senior Platform Software Engineer at MethaneSAT

“FlyteFile is a really nice abstraction on a distributed platform. [I can say,] ‘I need this file,’ and Flyte takes care of downloading it, uploading it and only accessing it when we need to. We generate large binary files in netcdf format, so not having to worry about transferring and copying those files has been really nice.”

— Nicholas LoFaso, Senior Platform Software Engineer at MethaneSAT

Structured dataset

Effortlessly convert dataframes between types and enforce column-level type checking using the abstract 2D representation provided by Structured Dataset. Extract subsets of columns and specify the storage backend, all without the need to write boilerplate code.

Copied to clipboard!
@task
def analyze_df(
   df: vaex.dataframe.DataFrameLocal,
) -> Annotated[StructuredDataset, kwtypes(age=float)]:
    result_df = df.groupby(by="animal").agg({"age": "mean"})
    return StructuredDataset(result_df)


@task
def read_df(df: StructuredDataset) -> float:
    pandas_df = df.open(pd.DataFrame).all()
    return pandas_df["age"].min()


@workflow
def sd_wf(
    vaex_df: vaex.dataframe.DataFrameLocal = vaex.from_arrays(
        animal=["dog", "cat", "guinea pig"],
        age=[2, 5, 1],
    )
) -> float:
    return read_df(df=analyze_df(df=vaex_df))

A Vaex dataframe can easily be converted to a Pandas dataframe.

Wait for external inputs

You might need your execution to halt, wait for external inputs, or await manual approval. By using `sleep()`, `wait_for_input()`, and `approve()`, you can pause execution for a specified duration, wait for external input signals, and necessitate explicit approval before resuming execution, respectively.

Copied to clipboard!
@workflow
def gate_nodes_wf():
    # sleep 
    sleeping = sleep(timedelta(seconds=10))
    # wait for an external input
    title_input = wait_for_input("title", timeout=timedelta(hours=1), expected_type=str)
    # wait for explicit approval
    approve(final_report, "approve-final-report", timeout=timedelta(hours=2))

ImageSpec

Data scientists and ML practitioners often find Dockerfiles challenging. With Flyte’s `ImageSpec`, you can create images without needing Dockerfiles. You can define Python packages, apt packages and environment variables directly in the `ImageSpec`.

Copied to clipboard!
pandas_image_spec = ImageSpec(
    base_image="ghcr.io/flyteorg/flytekit:py3.8-1.6.2",
    packages=["pandas", "numpy"],
    python_version="3.8",
    apt_packages=["git"],
    env={"Debug": "True"},
    registry="ghcr.io/flyteorg",
)

@task(container_image=pandas_image_spec)
def get_pandas_dataframe() -> typing.Tuple[pd.DataFrame, pd.Series]:
    df = pd.read_csv("https://storage.googleapis.com/download.tensorflow.org/data/heart.csv")
    return df[["age", "thalach", "trestbps", "chol", "oldpeak"]], df.pop("target")

Iterate

Make workflow orchestration a collaborative experience. Don’t struggle to iterate on your data and ML workflows with your team. Leverage the native features to accelerate your development cycles.

Recover from failures

Debugging can be time-consuming and costly — more so if it involves rerunning successful tasks. Flyte makes it easy to recover from failures by allowing you to rerun only the failed tasks in a workflow. Simply click the Recover button on the user interface, and Flyte will automatically recover the successful tasks and rerun only the failed tasks. This can save you time and resources, and make debugging more efficient.

Versioned workflows

Data and ML practitioners should be able to experiment in isolation and try multiple iterations. Versioning lets them reproduce results and roll back to a previous workflow version any time. Flyte versions the entire workflow and allows switching versions just like TV channels!

Pradithya Aria Pura, Principal Software Engineer at Gojek 2

“Workflow versioning is quite important: When it comes to productionizing a pipeline, there are only a few platforms that provide this kind of versioning. To us, it's critical to be able to roll back to a certain workflow version in case there is a bug introduced into our production pipeline.”

— Pradithya Aria Pura, Principal Software Engineer at Gojek

Cache outputs

Don’t waste resources while iterating on data/ML workflows. Cache your task outputs by passing the `cache=True` argument to your `@task` decorator to accelerate your runs.

Copied to clipboard!
@task(cache=True, cache_version="1.0")
def train_mnist_pipeline(hp: Hyperparameters) -> train_outputs:
    kwargs = {"num_workers": 1, "pin_memory": True} if torch.cuda.is_available() else {}
    training_data_loader = mnist_dataloader(hp.batch_size, train=True, **kwargs)
    ...

`cache_version` is helpful to invalidate the previous cache if task functionality changes.

Intra-task checkpointing

Flyte task boundaries are natural checkpoints. They can be expensive, however, in scenarios, such as training a model: Training can be time-consuming and resource-intensive. Intra-task checkpoints in Flyte help checkpoint progress within a task execution.

Copied to clipboard!
@task
def train_model():
    try:
        checkpoint = flytekit.current_context().checkpoint
        prev_checkpoint = checkpoint.read()
    except (NotImplementedError, ValueError):
        checkpoint, prev_checkpoint = None, False

    if prev_checkpoint:
        start_epoch, model, opt = torch.load(BytesIO(prev_checkpoint))
    else:
        start_epoch = 0
        model = ...
        opt = ...

    for epoch in range(start_epoch, n_epochs):
        ...

        if checkpoint:
            model_io = BytesIO()
            torch.save((epoch, model, opt), model_io)
            model_io.seek(0)
            checkpoint.write(model_io.read())

Checkpointing saves the model state and the number of epochs.

Multi-tenancy

Multi-tenancy supports a centralized infrastructure for your team and organization, so multiple users can share the same platform while maintaining their own distinct data and configurations. Flyte is multi-tenant at its core. That’s important for managing and organizing resources effectively as well as facilitating team collaboration within your organization.

Copied to clipboard!
@reference_task(
    project="flytesnacks",
    domain="development",
    name="core.flyte_basics.files.normalize_columns",
    version="2NRd4zAkKIf9hpqBAMLjzw==",
)
def normalize_columns(
    csv_url: FlyteFile,
    column_names: List[str],
    columns_to_normalize: List[str],

Effortlessly reference a Flyte task (and a workflow) to foster collaboration among teams.

Multi-tenancy
Jake Neyer, Software Engineer at Striveworks

“The multi-tenancy that Flyte provides is obviously important in regulated spaces where you need to separate users and resources and things like amongst each other within the same organization.”

— Jake Neyer, Software Engineer at Striveworks

Bernhard Stadlbauer, Data Engineer at Pachama

“We get a lot of reusable workflows, and it makes it fairly easy to share complex machine learning and different dependencies between teams without actually having to put all the dependencies into one container.”

— Bernhard Stadlbauer, Data Engineer at Pachama

Timeout

To ensure that a system is always making progress, tasks must end reliably. Flyte lets you define a timeout period, after which the task is marked as failure.

Copied to clipboard!
@task(timeout=datetime.timedelta(hours=1))
def train_mnist_pipeline(hp: Hyperparameters) -> train_outputs:
    kwargs = {"num_workers": 1, "pin_memory": True} if torch.cuda.is_available() else {}
    training_data_loader = mnist_dataloader(hp.batch_size, train=True, **kwargs)
    ...

Task execution will be terminated if the runtime exceeds 1 hour.

Varsha Parthasarathy, Senior Software Engineer at Woven Planet

“Because we are a spot-based company, a lot of our workflows run into the majority of issues. Thankfully, with Flyte, we can debug and do quick iterations.”

— Varsha Parthasarathy, Senior Software Engineer at Woven Planet

Immutability

Immutable executions help ensure reproducibility by preventing any changes to the state of an execution. This lets you completely restructure a data/ML workflow between versions without worrying about potential deficits in production. Flyte maintains immutability in your workflows.

Flyte Immutability
Jeev Balakrishnan, Software Engineer at Freenome 3

“Flyte has this concept of immutable transformation — it turns out the executions cannot be deleted, and so having immutable transformation is a really nice abstraction for our data-engineering stack.”

— Jeev Balakrishnan, Software Engineer at Freenome

Analyze

Get visibility into your data at every step of your data/ML workflow, across versions. Avoid the uncertainty of the “data blackbox.” 

Deploy

Deployment shouldn’t be a tough nut to crack. Flyte’s cost-saving, cloud-native features simplify deployment to the cloud or on-prem.

Dev to prod

Promoting your workflows from development to production is as simple as changing your domain from development or staging to production.

Copied to clipboard!
pyflyte register --project ml-finance-forecast --domain staging 2022-finance-insights.py
Copied to clipboard!
pyflyte register --project ml-finance-forecast --domain production 2022-finance-insights.py
Krishna Yeramsetty, Principal Data Scientist at Infinome

“One thing that I really like compared to my previous experience with some of these tools: the local dev experience with pyflyte and the sandbox are super, super nice to reduce friction between production and dev environment.”

— Krishna Yeramsetty, Principal Data Scientist at Infinome

Spot or preemptible instances

Leveraging spot instances doesn’t have to be hard. Schedule your workflows on spot instances by setting `interruptible` to `True` in the `@task` decorator to cut costs.

Copied to clipboard!
@task(interruptible=True)
def data_processing(google_trend_csv: pyspark.sql.DataFrame):
    google_trend_all = google_trend_csv.withColumn(
        "Date", F.regexp_extract(google_trend_csv.week, "(.*?) -", 1)
    ).withColumn(
        "State", F.regexp_extract(google_trend_csv.file, "Rossmann_DE_(.*)", 1)
    )
    ...

Scheduling on spot instances is as simple as setting the `interruptible` argument.

Calvin Leather, Staff Engineer and Tech Lead at Embark Veterinary

“You can say, ‘Give me imputation’ and [Flyte will] launch 40 spot instances that are cheaper than your on-demand instance that you're using for your notebook and return the results back in memory.”

— Calvin Leather, Staff Engineer and Tech Lead at Embark Veterinary

Jeev Balakrishnan, Software Engineer at Freenome 2

“Given the scale at which some of these tasks run, compute can get really expensive. So being able to add an interruptible argument to the task decorator for certain tasks has been really useful to cut costs.”

— Jeev Balakrishnan, Software Engineer at Freenome

Cloud-native deployment

Deploy Flyte on AWS, GCP, Azure and other cloud services, and get infinite scalability.

Cloud-native deployment
Maarten de Jong, Python Developer at Blackshark.ai

“We're mainly using Flyte because of its cloud native capabilities. We do everything in the cloud, and we also don't want to be limited to a single cloud provider. So having the ability to run everything through Kubernetes is amazing for us.”

— Maarten de Jong, Python Developer at Blackshark.ai

Scheduling

Use Flyte to schedule your data and machine learning workflows to run at a specific time.

Copied to clipboard!
cron_lp = LaunchPlan.get_or_create(
    name="fetch_data_lp",
    workflow=fetch_data_wf,
    schedule=CronSchedule(
        schedule="0 1 * * *",
        kickoff_time_input_arg="kickoff_time",
    ),
)

The workflow is set to run at 1 a.m. every day.

Secrets

Access secrets in your Flyte tasks, whether locally or remotely, by mounting them as files or environment variables.

Copied to clipboard!
@task(
    secret_requests=[
        Secret(
            group=SECRET_GROUP,
            key=SECRET_NAME,
            mount_requirement=Secret.MountType.ENV_VAR,
        )
    ]
)
def secret_file_task() -> str:
    ...

Monitor

Keep tabs on the status of your data/ML workflows with Flyte. Identify potential bottlenecks to debug issues quickly.

Notifications

Stay informed about changes to your workflow's state by configuring notifications through Slack, PagerDuty or email.

Notifications

Scale

Flyte is built for high performance. Effortlessly scale your data and ML workflows leveraging the infrastructure-as-code approach.

GPU acceleration

GPUs are becoming essential to run data and ML workflows. Enable and control your tasks’ GPU demands by requesting resources in the `@task` decorator — just like that!

Copied to clipboard!
@task(requests=Resources(gpu="2"), limits=Resources(gpu="4"))
def train_mnist_pipeline(hp: Hyperparameters) -> train_outputs:
    kwargs = {"num_workers": 1, "pin_memory": True} if torch.cuda.is_available() else {}
    training_data_loader = mnist_dataloader(hp.batch_size, train=True, **kwargs)
    ...

Request GPUs and control the usage by setting limits.

Dependency isolation via containers

Different tasks may have different resource requirements and library dependencies. That can cause conflicts if they aren’t managed carefully. With Flyte, you can maintain separate sets of dependencies for your tasks so no dependency conflicts arise.

Copied to clipboard!
@task(container_image="ghcr.io/flyteorg/flytecookbook:core-latest")
def get_data() -> Tuple[np.ndarray, np.ndarray]:
    from sklearn import datasets

    iris = datasets.load_iris()
    X, y = (iris.data[:, :2], iris.target)
    return (X, y)


@task(container_image="ghcr.io/flyteorg/flytekit:py3.9-1.2.7")
def normalize(X: np.ndarray) -> np.ndarray:
    return (X - X.mean(axis=0)) / X.std(axis=0)


@workflow
def wf() -> np.ndarray:
    X, _ = get_data()
    X = normalize(X=X)
    return X

No more dependency hell!

Parallelism

Flyte tasks are inherently parallel to optimize resource consumption and improve performance, so you don't have to do anything special to enable parallelism.

Parallelism
Dylan Wilder, Engineering Manager at Spotify

“When you write Python scripts, everything runs and takes a certain amount of time, whereas now for free we get parallelism across tasks. Our data scientists think that's really cool.”

— Dylan Wilder, Engineering Manager at Spotify

Allocate resources dynamically

Sometimes, the resources needed for a task may depend on user-provided inputs or real-time resource calculations. Flyte makes it easy to allocate resources using the `with_overrides` method and dynamic workflows. This allows you to adjust resources on the fly, ensuring that your tasks have the resources they need to run efficiently.

Copied to clipboard!
@task
def mean(x: List[float]) -> float:
    return sum(x) / len(x)


@dynamic
def dynamic_wf(x: List[float], mem: str) -> float:
    return mean(x=x).with_overrides(requests=Resources(mem=mem))


@workflow
def mean_wf(x: List[float] = [1.3, 2.4, 3.9], mem: str = "600Mi") -> float:
    return dynamic_wf(x=x, mem=mem)

Override the default resources of a task from within a Flyte workflow in mere seconds.

Board Flyte to leverage the best-in-class features to build data, ML & analytics workflows