Samhita Alla
Samhita Alla

Orchestrate and Track Your LangChain Experiments with Flyte

LangChain is an open-source library that’s designed to simplify working with large language models (LLMs) and empowers developers to build sophisticated applications with ease. When developing LLM-based applications, managing the underlying infrastructure is crucial to optimal performance and scalability. This is where the combination of LangChain and Flyte, a robust ML orchestrator, truly shines.

By integrating LangChain into Flyte, you can use a production-grade orchestrator to automate the entire infrastructure: ingesting documents, creating vector stores and performing batch inferences. Not only can you iterate quickly on your LangChain experiments, you’ll also benefit from many of Flyte's built-in capabilities like fine-grained resource allocation, secrets management and caching. In this article, you will learn how to integrate LangChain seamlessly into Flyte and employ the Flyte callback feature for efficient tracking and monitoring of your LangChain experiments.

LangChain Retrieval QA with Pinecone and Flyte

Let's leverage LangChain, Pinecone and Flyte to develop a retrieval-based question-answering (QA) system. This will allow you to explore how Flyte can effectively orchestrate a LangChain experiment and understand the advantages it offers. Here's what you'll be doing:

  1. Building a data ingestion pipeline: You will learn how to construct a pipeline that can efficiently ingest data into a vector database in parallel while caching the outputs.
  2. Querying the vector database: You will perform queries on the vector database using user-provided queries to retrieve the predictions.
  3. Logging LangChain experiment metrics: You will gain insights into how to log the metrics of the LangChain experiment.

After successfully completing this experiment, you will have the ability to:

  • Understand the limitations of LangChain in building production-grade pipelines
  • Recognize how Flyte addresses these limitations and enables running scalable and secure experiments
  • Utilize the current experiment as a starting point template to develop advanced LangChain-based applications with the help of Flyte

Prerequisites

If you are new to Flyte, we recommend the Getting Started guide. You can also watch recordings of conference talks for further insights: Machine Learning for Production Workloads with Flyte and Run Your Data and Machine Learning Workflows on Kubernetes with Flyte.

If you are new to LangChain, Pinecone's introduction to LangChain serves as a helpful introduction.

To get started with the experiment, please follow the instructions below:

  • Install the Flytekit library by running the command `pip install flytekit`
  • Install the Flytekit-Envd plugin by running the command `pip install flytekitplugins-envd`
  • Install Docker on your system
  • Install flytectl by running either of the following commands:
       • `brew install flyteorg/homebrew-tap/flytectl`
       • `curl -sL https://ctl.flyte.org/install | sudo bash -s -- -b /usr/local/bin`
  • Spin up a Flyte cluster by running the command `flytectl demo start`
  • Export the environment variable `FLYTECTL_CONFIG=~/.flyte/config-sandbox.yaml`

You should now have a functional Flyte cluster running on your system. Let's proceed with creating a Flyte workflow that performs the following tasks:

  1. Transcribes a couple of Flyte YouTube videos in parallel using OpenAI Whisper.
  2. Splits the transcriptions into chunks to accommodate the context window of the LLM.
  3. Embeds the transcriptions.
  4. Stores the generated embeddings in the Pinecone vector database.
  5. Queries the vector database.

Data Ingestion: Transcribe, Split, Embed and Store

To start, configure a Flyte task that accepts `url` and `index_name` as input arguments. Enable caching by setting `cache` to `True`, which helps avoid re-execution of the data ingestion pipeline for the same URL.

In addition, ensure secure access to the Pinecone API key, environment and OpenAI API key by defining the necessary secrets.

To customize the task further, create a custom image using `ImageSpec` and assign it to the `container_image` parameter of the task decorator. This allows you to incorporate specific dependencies or configurations required for the task's execution.

To ensure adequate resources for execution, consider increasing the memory request allocated to the task. This adjustment ensures that the task has sufficient memory to handle its processing needs effectively.

Copied to clipboard!
import json
import os

import flytekit
from flytekit import ImageSpec, Resources, Secret, map_task, task, workflow

embed_image = ImageSpec(
    name="langchain-flyte-pinecone",
    packages=[
        "langchain",
        "pinecone-client",
        "huggingface_hub",
        "sentence_transformers",
        "yt_dlp",
        "pydub",
        "openai",
    ],
    apt_packages=["ffmpeg"],
    registry="ghcr.io/flyteorg",
    base_image="ghcr.io/flyteorg/flytekit:py3.11-1.7.0",
)


@task(
    cache=True,
    cache_version="1.0",
    secret_requests=[
        Secret(
            group=SECRET_GROUP,
            key=SECRET_KEY,
            mount_requirement=Secret.MountType.FILE,
        ),
    ],
    container_image=embed_image,
    requests=Resources(mem="5Gi"),
)
def embed_and_store(url: str, index_name: str) -> str:
    ...

Next, let's proceed with the following operations:

  1. Import the required libraries within the task to ensure all the necessary functionality is available.
  2. Initialize Pinecone to facilitate efficient storage and retrieval of embeddings.
  3. Leverage OpenAI Whisper to transcribe the YouTube videos.
  4. Generate embeddings from the transcriptions and store them in the Pinecone vector database. Embeddings are numerical representations of the transcribed text that capture its semantic meaning, enabling efficient similarity searches and information retrieval.
Copied to clipboard!
@task(...)
def embed_and_store(url: str, index_name: str) -> str:
    import openai
    import pinecone
    from langchain.document_loaders.blob_loaders.youtube_audio import YoutubeAudioLoader
    from langchain.document_loaders.generic import GenericLoader
    from langchain.document_loaders.parsers import OpenAIWhisperParser
    from langchain.embeddings import HuggingFaceEmbeddings
    from langchain.text_splitter import RecursiveCharacterTextSplitter
    from langchain.vectorstores import Pinecone

    pinecone.init(
        api_key=json.loads(
            flytekit.current_context().secrets.get(SECRET_GROUP, SECRET_KEY)
        )["pinecone_api_key"],
        environment=json.loads(
            flytekit.current_context().secrets.get(SECRET_GROUP, SECRET_KEY)
        )["pinecone_environment"],
    )
    openai.api_key = json.loads(
       flytekit.current_context().secrets.get(SECRET_GROUP, SECRET_KEY)
    )["openai_api_key"]

    # Directory to save audio files
    save_dir = os.path.join(flytekit.current_context().working_directory, "youtube")

    # Transcribe the videos to text
    loader = GenericLoader(YoutubeAudioLoader([url], save_dir), OpenAIWhisperParser())
    docs = loader.load()

    combined_docs = [doc.page_content for doc in docs]
    text = " ".join(combined_docs)

    # Split them
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1500, chunk_overlap=150)
    splits = text_splitter.split_text(text)

    huggingface_embeddings = HuggingFaceEmbeddings(
        cache_folder=os.path.join(
            flytekit.current_context().working_directory, "embeddings-cache-folder"
        )
    )

    Pinecone.from_texts(
        texts=splits, embedding=huggingface_embeddings, index_name=index_name
    )
    return f"{url} data is stored in the vectordb."

Next, define a Flyte workflow that iterates through a set of YouTube URLs and invokes the `embed_and_store` task for each URL. With Flyte's `map_task` functionality, you can effortlessly execute tasks in parallel on a series of inputs, enabling efficient processing of multiple URLs simultaneously.

Copied to clipboard!
@workflow
def flyte_youtube_embed_wf(
    index_name: str = "flyte-youtube-data",
    urls: list[str] = [
        "https://youtu.be/CNmO1q3MamM",
        "https://youtu.be/8rLj_YVOpzE",
        "https://youtu.be/sGqS8PFQz6c",
        "https://youtu.be/1668vZczslw",
        "https://youtu.be/NrFOXQKrREA",
        "https://youtu.be/4ktHNeT8kq4",
        "https://youtu.be/gMyTz8gKWVc",
    ],
) -> list[str]:
    partial_embed_and_store = partial(embed_and_store, index_name=index_name)
    return map_task(partial_embed_and_store)(url=urls)

You can find the complete pipeline code here.

Breaking down the ingestion pipeline

If you desire fine-grained control over each operation and the ability to run them independently, you can split the transcribe-split-embed-store pipeline into multiple tasks. This approach allows for greater flexibility and reusability of individual operations.

To begin, define image specifications using `ImageSpec`. This step involves specifying the desired configurations and dependencies required for each task.

Here are the image specifications for the three tasks:

Copied to clipboard!
load_data_image = ImageSpec(
    name="langchain-flyte-load-data",
    packages=[
        "langchain",
        "yt_dlp",
        "pydub",
        "openai",
    ],
    apt_packages=["ffmpeg"],
    registry="ghcr.io/flyteorg",
    base_image="ghcr.io/flyteorg/flytekit:py3.11-1.7.0",
)

split_data_image = ImageSpec(
    name="langchain-flyte-split-data",
    packages=["langchain"],
    registry="ghcr.io/flyteorg",
    base_image="ghcr.io/flyteorg/flytekit:py3.11-1.7.0",
)

store_in_vectordb_image = ImageSpec(
    name="langchain-flyte-vectordb",
    packages=[
        "langchain",
        "pinecone-client",
        "huggingface_hub",
        "sentence_transformers",
    ],
    registry="ghcr.io/flyteorg",
    base_image="ghcr.io/flyteorg/flytekit:py3.11-1.7.0",
)

After defining the image specifications, you can proceed to define the `load_data`, `split_data`, and `store_in_vectordb` tasks separately. For detailed code examples, you can refer to this gist.

Next, create a Flyte workflow to map over all the tasks.

Copied to clipboard!
@workflow
def flyte_youtube_embed_wf(
    index_name: str = "flyte-youtube-data",
    urls: list[str] = [
        "https://youtu.be/CNmO1q3MamM",
        "https://youtu.be/8rLj_YVOpzE",
        "https://youtu.be/sGqS8PFQz6c",
        "https://youtu.be/1668vZczslw",
        "https://youtu.be/NrFOXQKrREA",
        "https://youtu.be/4ktHNeT8kq4",
        "https://youtu.be/gMyTz8gKWVc",
    ],
) -> list[str]:
    text = map_task(load_data)(url=urls)
    splits = map_task(split_data)(text=text)
    partial_store_in_vectordb = partial(store_in_vectordb, index_name=index_name)
    return map_task(partial_store_in_vectordb)(splits=splits)

To trigger either workflow on the Flyte backend, execute the following command:

Copied to clipboard!
pyflyte run --remote langchain_flyte_retrieval_qa.py flyte_youtube_embed_wf

Note: If you need to increase the memory limits of your demo cluster, you can update the task resource attributes of your cluster by following these steps:

  1. Create a config file `custom_resources.yaml` specifying the desired resource settings:
Copied to clipboard!
domain: development
project: flytesnacks
defaults:
  cpu: "3"
  memory: "8Gi"
limits:
  cpu: "5"
  memory: "12Gi"
  1. Run the following CLI command:
Copied to clipboard!
flytectl update task-resource-attribute --attrFile custom_resources.yaml

Retrieve and Query 

This step involves querying the vector database. Let's define a `query_vectordb` task that accepts `index_name` and `query` as inputs. In the task decorator, initialize the necessary secrets, specify a custom image for the task, and increase the memory request as done previously. Additionally, set `disable_deck` to `False` to enable rendering a Flyte Deck that captures the relevant metrics.

Copied to clipboard!
@task(
    disable_deck=False,
    secret_requests=[
        Secret(
            group=SECRET_GROUP,
            key=SECRET_KEY,
            mount_requirement=Secret.MountType.FILE,
        ),
    ],
    container_image=query_image,
    requests=Resources(mem="5Gi"),
)
def query_vectordb(index_name: str, query: str) -> str:
    ...

Next, let's proceed with the following operations within the task:

  1. Import the necessary libraries within the task to ensure all the required functionality is available.
  2. Initialize Pinecone, which enables efficient storage and retrieval of embeddings.
  3. Retrieve the vector database and configure the search type to be `similarity`.
  4. Define a `RetrievalQA` chain and initialize the Flyte callback.
  5. Execute the chain by passing the user-provided query and return the result.
Copied to clipboard!
@task(
    disable_deck=False,
    secret_requests=[
        Secret(
            group=SECRET_GROUP,
            key=SECRET_KEY,
            mount_requirement=Secret.MountType.FILE,
    	 ),
    ],
    container_image=query_image,
    requests=Resources(mem="5Gi"),
)
def query_vectordb(index_name: str, query: str) -> str:
    import pinecone
    from langchain.callbacks import FlyteCallbackHandler
    from langchain.chains import RetrievalQA
    from langchain.chat_models import ChatOpenAI
    from langchain.embeddings import HuggingFaceEmbeddings
    from langchain.vectorstores import Pinecone

    pinecone.init(
        api_key=json.loads(
            flytekit.current_context().secrets.get(SECRET_GROUP, SECRET_KEY)
        )["pinecone_api_key"],
        environment=json.loads(
            flytekit.current_context().secrets.get(SECRET_GROUP, SECRET_KEY)
        )["pinecone_environment"],
    )

    huggingface_embeddings = HuggingFaceEmbeddings(
        cache_folder=os.path.join(
            flytekit.current_context().working_directory, "embeddings-cache-folder"
        )
    )

    vectordb = Pinecone.from_existing_index(index_name, huggingface_embeddings)
    retriever = vectordb.as_retriever(search_type="similarity", search_kwargs={"k": 2})

    qa_chain = RetrievalQA.from_chain_type(
        llm=ChatOpenAI(
            model_name="gpt-3.5-turbo",
            callbacks=[FlyteCallbackHandler()],
            temperature=0,
            openai_api_key=json.loads(
                flytekit.current_context().secrets.get(SECRET_GROUP, SECRET_KEY)
            )["openai_api_key"],
        ),
        chain_type="stuff",
        retriever=retriever,
    )

    result = qa_chain.run(query)
    return result

To trigger this task on the Flyte backend, execute the following command:

Copied to clipboard!
pyflyte run \
--remote langchain_flyte_retrieval_qa.py query_vectordb \
--index_name "flyte-youtube-data" \
--query "What are some noteworthy features of the Flyte 1.7 release?"

The metrics will be displayed on the Flyte UI as follows:

These metrics correspond to the metrics captured at the start and end of the LLM, as well as the metrics related to text complexity and the dependency tree.

Monitor LangChain Experiments with the Flyte Callback

Triggering LangChain experiments within Flyte is a straightforward process that allows you effortlessly to orchestrate your LangChain experiments. By utilizing the `FlyteCallback` in a LangChain LLM, chain or agent, you can seamlessly integrate and coordinate your LangChain experiments with Flyte. To delve deeper into this integration, refer to the comprehensive Flyte x LangChain documentation available at: https://python.langchain.com/docs/ecosystem/integrations/flyte.

Advantages of using Flyte for LangChain

  • Parallel data ingestion: Generate embeddings and store them in a vector database, leveraging Flyte to ingest the data in parallel
  • Declarative resource assignment: Assign resources declaratively to LangChain tasks for efficient utilization
  • Monitoring within Flyte UI: Monitor your LangChain experiments seamlessly within the Flyte user interface
  • Caching: Avoid redundant ingestion by caching steps for identical data inputs.
  • Enhanced security: Benefit from increased security for your LangChain experiments within the Flyte cluster
  • Scalability for multiple teams: Seamlessly scale LangChain experiments to multiple teams with Flyte

Running the ingestion in parallel took approximately 6 minutes, and multiple ingestion steps took about 11 minutes. On the other hand, running the data ingestion sequentially on the Flyte cluster took about 19 minutes.

Next steps

Here are some exciting directions to explore further:

  • Orchestrate advanced LangChain experiments: Explore the possibilities of orchestrating advanced LangChain experiments using agents with Flyte
  • Productionize your pipeline: Take your pipeline to the next level by deploying Flyte on-premises or in the cloud. This allows you to run your pipeline on a schedule and achieve reliable and scalable production-grade workflows
  • Build a full-fledged data ingestion pipeline: If you're dealing with large volumes of data, consider constructing a comprehensive data ingestion pipeline in Flyte. Leverage tools like DuckDB, Spark, Snowflake, and others to efficiently process and manage your data at scale
  • Construct a batch inference pipeline: Seamlessly create a batch inference pipeline that processes a set of inputs. Refer to this detailed blog post at https://www.unionai.com/blog-post/parallel-audio-transcription-using-whisper-jax-and-flyte-map-tasks-for-streamlined-batch-inference for insights and guidance on leveraging Flyte's map tasks to streamline batch inference