Flyte

Dynamic, resilient AI orchestration. 80M+ downloads.

Flyte 2 is available today locally. For distributed execution, try Flyte 1.

Copied to clipboard!
"""Run durable agents with full observability"""
  
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent

import flyte

env = flyte.TaskEnvironment(
    name="langgraph_env",
    image=flyte.Image.from_debian_base().with_requirements("requirements.txt"),
    secrets=[
        flyte.Secret(key="OPENAI_API_KEY", as_env_var="OPENAI_API_KEY"),
    ],
    resources=flyte.Resources(cpu=1, memory="1Gi"),
)

@tool
@flyte.trace
async def add(a: float, b: float) -> float:
    """Add two numbers."""
    return a + b

@tool
@flyte.trace
async def multiply(a: float, b: float) -> float:
    """Multiply two numbers."""
    return a * b

tools = [add, multiply,]

@env.task
async def agent(request: str) -> str:
    """Run a LangGraph ReAct agent with math tools."""
    llm = ChatOpenAI(model="gpt-4o-mini")
    react_agent = create_react_agent(llm, tools)

    result = await react_agent.ainvoke(
        {"messages": [{"role": "user", "content": request}]}
    )
    return result["messages"][-1].content

#flyte run langgraph_react_agent.py agent --request "What is 12 * 7 plus 3?"
Copied to clipboard!
"""Run high-throughout inference for generative AI."""
  
import base64
import torch

import flyte
import flyte.io
import flyte.report

env = flyte.TaskEnvironment(
    name="stable-diffusion",
    image=(
        flyte.Image.from_debian_base()
        .with_commands(["pip install torch torchvision --index-url https://download.pytorch.org/whl/cu124"])
        .with_requirements("requirements.txt")
    ),
    resources=flyte.Resources(cpu=2, memory="8Gi", gpu=1),
)

@env.task(report=True)
async def generate(prompt: str, steps: int = 30) -> flyte.io.File:
    """Generate an image from a text prompt using Stable Diffusion."""
    from diffusers import AutoPipelineForText2Image

    device = "cuda" if torch.cuda.is_available() else "cpu"
    pipe = AutoPipelineForText2Image.from_pretrained(
        "stabilityai/sdxl-turbo",
        torch_dtype=torch.float16 if device == "cuda" else torch.float32,
        variant="fp16" if device == "cuda" else None,
    ).to(device)

    image = pipe(prompt, num_inference_steps=steps, guidance_scale=0.0).images[0]

    path = "/tmp/output.png"
    image.save(path)

    with open(path, "rb") as f:
        img_b64 = base64.b64encode(f.read()).decode()
    await flyte.report.replace.aio(
        f"<h2>Stable Diffusion</h2>"
        f"<p><b>Prompt:</b> {prompt}</p>"
        f'<img src="data:image/png;base64,{img_b64}" style="max-width:512px" />'
    )
    await flyte.report.flush.aio()

    return await flyte.io.File.from_local(path)

#flyte run stable_diffusion.py generate --prompt "a cat astronaut floating in space, digital art"
Copied to clipboard!
"""Run data ETL jobs at scale with modern frameworks."""
  
import duckdb
import pandas as pd

import flyte
import flyte.report

env = flyte.TaskEnvironment(
    name="duckdb-etl",
    image=flyte.Image.from_debian_base().with_pip_packages("duckdb", "pandas", "pyarrow"),
    resources=flyte.Resources(cpu=1, memory="1Gi"),
)

SAMPLE_CSV = "https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv"

@env.task
async def extract() -> pd.DataFrame:
    """Load raw data from a CSV source."""
    df = duckdb.sql(f"SELECT * FROM read_csv_auto('{SAMPLE_CSV}')").df()
    print(f"Extracted {len(df)} rows")
    return df

@env.task
async def transform(raw: pd.DataFrame) -> pd.DataFrame:
    """Aggregate survival stats by passenger class using DuckDB SQL."""
    summary = duckdb.sql("""
        SELECT
            Pclass AS passenger_class,
            COUNT(*) AS total,
            SUM(Survived) AS survived,
            ROUND(AVG(Survived) * 100, 1) AS survival_rate,
            ROUND(AVG(Fare), 2) AS avg_fare
        FROM raw
        GROUP BY Pclass
        ORDER BY Pclass
    """).df()
    print(summary.to_string(index=False))
    return summary

@env.task(report=True)
async def pipeline() -> pd.DataFrame:
    """Extract → Transform pipeline."""
    raw = await extract()
    summary = await transform(raw)

    await flyte.report.replace.aio(
        f"<h2>DuckDB ETL Results</h2>"
        f"<p>Processed {len(raw)} rows into {len(summary)} groups</p>"
        f"<h3>Survival by Passenger Class</h3>"
        f"{summary.to_html(index=False)}"
    )
    await flyte.report.flush.aio()

    return summary

#flyte run duckdb_etl.py pipeline

Introducing Flyte 2

The most intuitive, developer-loved way to orchestrate AI workflows in open source. Now available for local execution.

AI orchestration & runtime

Dynamically orchestrate complex, long-running, and agentic workflows with autoscaling and infrastructure awareness.

Author in pure Python

Write workflows in actual Python, no need to learn a DSL. Write, test, and version workflows locally, then run them at scale.

Durable by default

Build fault-tolerant, resilient workflows that retry automatically, pick up where they leave off, and make failures inconsequential.

Choose your engine

Flyte 2 OSS

Build durable AI/ML pipelines and agents with OSS.

Open-source

Build and scale dynamic AI/ML workflows using Flyte’s open-source platform and community.

Infra-aware orchestration

Author in pure Python to provision and scale resources for workflows.

Dynamic workflow execution

Workflows can make on-the-fly decisions at runtime with real-time logic, conditions, and retries.

Self-healing workflows

Workflows can autonomously recover from failures and continue where they left off.

Run locally

Test and debug tasks in your local environment using the same Python SDK that runs in production on Kubernetes.

Union.ai

Union.ai

The enterprise Flyte platform. Build scalable AI and agents in your cloud.

Everything in Flyte 2 OSS, plus:

Massive scale at 50k+ actions/run

Massive scale and ultra-low latency to accelerate AI from experiment to production

Orchestrate, train, and serve

Orchestrate, deploy, and optimize AI/ML systems one unified platform.

Real-time inference

Serve performant agents and models with sub-second latency.

Live remote debugger

Debug remote tasks, line-by-line, on the actual infrastructure where your tasks run.

Reusable, warm-start containers

Achieve task startup time of <100ms by eliminating cold starts.

Observability

Get visibility into resource usage, data lineage, and versioning.

White-glove support

Get dedicated help from a team of expert AI engineers.

Trusted by thousands of AI builders
Integrations

Expand your workflows with powerful integrations.

Apache Spark

Run Spark jobs on ephemeral clusters.

BigQuery

Query a BigQuery table.

PyTorch Elastic v1

Pytorch-native multi-node distributed training.

Ray

Connect to Ray cluster to perform distributed model training and hyperparameter tuning.

Snowflake

Query a Snowflake service.

Weights & Biases v1

Best in class ML/AI experiment- and inference-time tracking.

Community

Ask questions, share ideas, and
get advice.

Events

Join us for events, workshops, and trainings.