Niels Bantilan
Niels Bantilan
No items found.

Building a Weather Forecasting Application with Flyte, Pandera, and Streamlit

This is the first in a series of flyte.org projects that demonstrate an end-to-end application using Flyte as one of the components. All of the source code for this project is available in the flyteorg/flytelab repo.

The MLOps ecosystem is maturing at a rapid pace. It's now at the point where data scientists and machine learning engineers have the tools they need to ship model-driven products that would have otherwise required considerable dev ops and frontend engineering effort. In particular, online machine learning is an area that's particularly challenging from an MLOps perspective, and in this blog we'll dive into the process of building robust, reliable, and interactive applications with Flyte, Pandera, and Streamlit.

Motivation

It's a well-established fact in the data science (DS) and machine learning (ML) communities that only a small component of an ML system involves the model-training code: there are a lot of other components that makes it robust, reliable, and maintainable in production.

Screen_Shot_2021-11-09_at_4.51.55_PM.png
source: https://proceedings.neurips.cc/paper/2015/file/86df7dcfd896fcaf2674f757a2463eba-Paper.pdf

In practice, the feedback loop between R&D and production can be long and arduous: DS/ML practitioners formulate appropriate applications (i.e. "should we build this in the first place?"), prototype, and validate models in the lab, and at least in theory, go through a more stringent holistic evaluation of whether a model should be released into the wild. Only if the answer is "yes" might they deploy these models into production where end-users consume the model's outputs.

This whole process is mediated by social forces within the broader organization, including how much trust the people within it have in these kinds of ML systems, including their builders and also internal stakeholders who have an interest in the organization's longterm success. Part of this trust-building process is the degree to which a data team can demonstrate the value, risks, and limitations of ML models before committing to a production deployment. This is why, as part of this case study, we set out to address the following question:

How far can a (potentially small) data team get building and shipping production-grade model-driven data products?

To begin answering this question, we set ourselves the challenge of building an online learning system that updates a model daily and displays hourly forecasts on a web UI.

Why? Because it forces the practitioner to think about:

  1. Incremental data acquisition and validation
  2. Model updates in production as a first class citizen
  3. Evaluation metrics in the deployment setting

It's tempting to treat these three facets of production ML systems as after thoughts in the offline learning setting. However, by going through the exercise of implementing an online learning ML system, we can extract useful design patterns that we could apply back to an offline learning setting.

Offline vs Online Learning

Offline Learning is a setting where a model learns from a static dataset and needs to be updated at some pre-determined cadence, while online learning is setting where a model learns from examples that present themselves to the model only once, in some temporal fashion.

In principal, you might want to consider online learning when:

  • Your data isn't static, e.g. data is generated as a function of time
  • Your data might be too large to practically store/train a model on all at once
  • You can't assume I.I.D. data, e.g. today's data depends on yesterdays, etc.
  • Your model needs to be as up-to-date as possible due to the dynamic nature of the underlying phenomenon that produces the data.

Use Case: Weather Forecasting

For this case study we picked hourly temperature forecasting as an appropriate application for building an online learning model, as it fulfills many of the criteria outlined above.

Our objective is to train a model on data managed by NOAA.gov, which offers a web API service for fetching weather data from the global hourly integrated surface database (ISD). This database contains weather-related data from around the world at approximately an hourly resolution:

Screen_Shot_2021-11-09_at_6.01.53_PM.png

As you can see from the raw sample above, the <span class=“code-inline”>TMP</span> column contains the temperature data that we're interested in, and we'll need to parse it in order to extract hourly temperatures in degrees Celsius.

The trained model should produce hourly temperature forecasts and update itself daily, which is roughly how often the ISD is updated. For the purpose of the case study, we'll use an autoregressive model, which is one of the simplest models one can use for time series data.

Screen_Shot_2021-11-09_at_5.51.55_PM.png
source: https://en.wikipedia.org/wiki/Autoregressive_model

Again, the model-training component of this ML system can always be swapped out for something fancier: the main purpose of this case study is to demonstrate how to put together the various pieces required to build a reliable online learning system.

Finally, as an evaluation metric, we'll implement an exponentially-weighted mean of the absolute error when comparing the ground truth temperature to the model's prediction. Roughly speaking, this would look like:

Copied to clipboard!
eps = 0.1
ewmae = 0

for X, y in training_data:
    y_pred = model(X)
    ewmae = eps * abs(y - y_pred) + (1 - eps) * ewmae

Pipeline Architecture

Now that we have a good handle on the problem set and high-level specifications of the model, we can dig into some of the details of the ML system more broadly. The purpose of this project is to use Flyte, Pandera, and Streamlit to deliver weather forecasts to the end user. Specifically, each library will serve a specific purpose:

  • Flyte: task orchestration and execution of the data processing and model-training workload in the cloud.
  • Pandera: validation of dataframes as they pass through the pipeline.
  • Streamlit: building and deploying a simple interactive UI that displays forecasts.

At a high level, this is the architecture of the application:

Requirements

The main requirements we focused on in this project are to:

  1. Support incremental model updates with optional pre-training
  2. Implement evaluation metrics in an online fashion
  3. Validate raw and clean data from the NOAA API
  4. Display the latest hourly forecasts in an interactive UI

In the next three sections we'll dive deeper into how each of these tools address these requirements.

Requirement 1: Incremental Model Updates with Recursive, Dynamic Workflows

Flyte is a powerful orchestration platform that enables you to write data processing logic locally and scale it to the resource requirements that you need to go to production. The primary unit of work in Flyte is the <span class=“code-inline”>task</span>, which can be composed together into <span class=“code-inline”>workflow</span>s.

Copied to clipboard!
@task
def process_data(data: pd.Dataframe) -> pd.DataFrame:
    ...

@task
def train_model(data: pd.DataFrame) -> Model:
    ...

@task
def generate_predictions(model: Model, new_data: pd.DataFrame) -> Predictions:
      ...

@workflow
def pipeline(data: pd.DataFrame, new_data: pd.DataFrame) -> Predictions:
        model = train_model(data=process_data(data=data))
        return generate_predictions(model=model, new_data=new_data)

However, this isn't the only thing Flyte is capable of. Because it supports dynamism in addition to simply constructing a static computation graph, we can perform recursion, which is a key piece that was needed to realize requirement #1: to support incremental model updates with optional pre-training.

The <span class=“code-inline”>@dynamic</span> function decorator, along with Flyte's native caching functionality, enables us to create a workflow that calls itself, which unlocks a pretty neat construct: the ability to efficiently obtain the model from the previous timestep in order to update the model for the current timestep.

Copied to clipboard!
@dynamic(cache=True)
def get_latest_model(
    target_datetime: datetime,
    genesis_datetime: datetime,
    n_days_pretraining: int,
) -> ModelUpdate:
    if target_datetime <= genesis_datetime:
       # implement optional pre-training here
        return init_model(n_days_pretraining=n_days_pretraining)
    else:
        previous_datetime = get_previous_target_datetime(target_datetime=target_datetime)

              # recursion here 🤯
        previous_model = get_latest_model(
            target_datetime=previous_datetime,
            genesis_datetime=genesis_datetime,
            n_days_pretraining=n_days_pretraining,
        )

              return update_model(model=previous_model, ...)

Basically we're defining a dynamic workflow called <span class=“code-inline”>get_latest_model</span> which takes two parameters:

  • <span class=“code-inline”>target_datetime</span>: the current time-step for which to get data and update the model.
  • <span class=“code-inline”>genesis_datetime</span>: a datetime <span class=“code-inline”><= target_datetime</span> that indicates when the model was instantiated.
  • <span class=“code-inline”>n_days_pretraining</span>: the number of days looking back from <span class=“code-inline”>genesis_datetime</span> with which to pre-train the initial model.

Within the <span class=“code-inline”>get_latest_model function body, we first check if the <span class=“code-inline”>target_datetime</span> is less than or equal to <span class=“code-inline”>genesis_datetime</span>. If so, we call <span class=“code-inline”>init_model, which itself is a Flyte task that contains the logic for initializing the model with some amount of pre-training. This execution path of the workflow is executed pretty much only the first time the pipeline runs.

If the <span class=“code-inline”>target_datetime</span> is a value some point after <span class=“code-inline”>genesis_datetime</span>, then what we do is first get the <span class=“code-inline”>previous_datetime</span>, which would be <span class=“code-inline”>genesis_datetime</span> when the pipeline runs for the second time. We then invoke <span class=“code-inline”>get_latest_model</span> with the <span class=“code-inline”>previous_datetime</span> to obtain the <span class=“code-inline”>previous_model</span>, and since this dynamic workflow itself is a cached workflow, we immediately get the initialized model from the previous time-step without having to re-initialize it.

Because we're caching the results of <span class=“code-inline”>get_latest_model</span>, when we move forward in time this recursive dynamic workflow will only have to look back one time-step to fetch the most up-to-date version of the model, then update it for the current <span class=“code-inline”>target_datetime</span>.

Pretty powerful 🦾 !!!

Requirement 2: Online Evaluation with Strong Types

Because we're updating the model in an incremental fashion with potentially a single training instance on a given update, we also want to evaluate the training and validation performance of our model in an incremental fashion. Since Flyte is a strongly-typed orchestration language, we can explicitly encode all the required states needed for a model:

Copied to clipboard!
from typing import NamedTuple

from dataclasses import dataclass
from dataclasses_json import dataclass_json
from flytekit.types.file import JoblibSerializedFile

@dataclass_json
@dataclass
class Scores:
    # keep track of mean absolute error
    train_exp_mae: float = 0.0
    valid_exp_mae: float = 0.0

@dataclass_json
@dataclass
class TrainingInstance:
    # definition of the contents of a single training instance
    target_datetime: datetime
        features: ...
    target: ...

ModelUpdate = NamedTuple(
    "ModelUpdate",
    model_file=JoblibSerializedFile,
    scores=Scores,
        training_instance=TrainingInstance,
)

In this code snippet we're using a a Flyte type called <span class=“code-inline”>JoblibSerializedFile</span> to represent a pointer to the model artifact, in this case an sklearn model, along with <span class=“code-inline”>dataclass</span> and <span class=“code-inline”>dataclass_json</span> to encode the type for evaluation <span class=“code-inline”>Scores</span> and <span class=“code-inline”>TrainingInstances</span>.

We can then define a Flyte task that performs a single model update, computing the exponentially-weighted validation metric before the update using <span class=“code-inline”>partial_fit</span>, and then computing the training metric after:

Copied to clipboard!
from sklearn.exceptions import NotFittedError

def exp_weighted_mae(mae, exp_mae, epsilon=0.1):
        """Exponentially-weighted mean absolute error."""
    return epsilon * mae + (1 - epsilon) * exp_mae

@task
def update_model(
        model: JoblibSerializedFile,
        scores: Scores,
        training_instance: TrainingInstance,
) -> ModelUpdate:

        features = training_instance.features
        target = training_instance.target

    try:
                y_pred = model.predict(features)
        except NotFittedError:
                # fallback for when the model hasn't been fit yet, i.e. at initialization,
                # using the mean of air temperature features in the past time steps
                # to predict the target air temperature.
                y_pred = np.mean(features.air_temp_features)

        # compute running validation mean absolute error before updating the model
        valid_exp_mae = exp_weighted_mae(
                abs(y_pred - target.air_temp), scores.valid_exp_mae
        )

        # update the model 
    model.partial_fit(features, target)

    # compute running training mean absolute error after the update
    train_exp_mae = exp_weighted_mae(
        abs(model.predict(features) - target.air_temp),
                scores.train_exp_mae
    )
    return model, Scores(train_exp_mae, valid_exp_mae), training_instance

With the <span class=“code-inline”>ModelUpdate</span> type, we can be confident that, for each update, we have the model artifact along with the metrics and training instance associated with it. What's even better is that we're also guaranteed that the data types have been validated with Flyte's internal type system.

Requirement 3: Data Validation with Pandera

Speaking of validation, validating dataframes (e.g. <span class=“code-inline”>pandas.DataFrame</span> s) can be a challenge since they are mutable and dynamic data structures with no guaranteed types or properties. Flyte offers the FlyteSchema type, which works with a bunch of different tabular formats like parquet, pyarrow, spark, and pandas. However, if you want to validate the contents of a dataframe beyond the types of each column, Flyte also integrates well with Pandera, which is a statistical typing and data testing tool for validating the properties of a dataframe's contents at runtime.

To use pandera with flytekit, we need to install the plugin with:

Copied to clipboard!
pip install flytekitplugins-pandera

Then we can define the pandera schemas that we need to ensure that the data passing through our workflow is what we expect:

Copied to clipboard!
import pandera as pa

class GlobalHourlyDataRaw(pa.SchemaModel):
    DATE: Series[pa.typing.DateTime]
    TMP: Series[str]  # raw temperature field

    class Config:
        coerce = True

class GlobalHourlyData(pa.SchemaModel):

        # validate the min and max temperature range in degrees Celsius
    air_temp: Series[float] = pa.Field(ge=-273.15, le=459.67)
    dew_temp: Series[float] = pa.Field(ge=-273.15, le=459.67)

        # pandera supports validating pandas.Index
    date: Index[pa.typing.DateTime] = pa.Field(unique=True)

    class Config:
        coerce = True

As you can see, we've defined two schemas: one for the raw data coming in from NOAA API called <span class=“code-inline”>GlobalHourlyDataRaw</span>, and another one called <span class=“code-inline”>GlobalHourlyData</span> for the expected type of the dataframe once we've cleaned it and is model-ready.

We can then define helper functions that are called by a Flyte task to fetch and clean the data:

Copied to clipboard!
import flytekitplugins.pandera
from pandera.typing import DataFrame

@pa.check_types
def get_raw_data(...) -> DataFrame[GlobalHourlyDataRaw]:
    ...

def process_raw_training_data(
        raw_data: DataFrame[GlobalHourlyDataRaw]
) -> DataFrame[GlobalHourlyData]:
    ...

@task(cache=True)
def get_weather_data(...) -> DataFrame[GlobalHourlyData]:
        return process_raw_training_data(get_raw_data(...))

Functions decorated with <span class=“code-inline”>@task</span> automatically check the validity of inputs and outputs annotated with the pandera <span class=“code-inline”>DataFrame[Schema]</span> type, and for plain Python functions, <span class=“code-inline”>@p a.check_types</span> makes sure that dataframes types are valid.

You can continue to iterate and refine the schemas over time, and with that you can be more confident that the dataframes in your workflow are property type-checked 🐼 ✅ !

Requirement 4: Display the latest hourly forecasts in an interactive UI

Last but not least, we need a way of displaying our model's forecasts to end users, and for this we'll use Streamlit. To connect Flyte and Streamlit together, all we need to to is use the FlyteRemote client to give us access to the remote execution outputs (i.e. the forecasts), which we can then format and display in the Streamlit script in exactly the way we need.

Copied to clipboard!
import pandera as pa
import pandas as pd
import streamlit as st

from flytekit.remote import FlyteRemote
from app.workflows import Forecast

remote = FlyteRemote()

st.title("Flytelab: Weather Forecasts ⛈☀️☔️")

class ForecastSchema(pa.SchemaModel):
    air_temp: pa.typing.Series[float]
    dew_temp: pa.typing.Series[float]
    index: pa.typing.Index[pa.typing.DateTime]

@pa.check_types
def get_forecasts() -> pa.typing.DataFrame[ForecastSchema]:
        [latest_execution, *_], _ = remote.client.list_executions_paginated(...)
    execution = remote.fetch_workflow_execution(name=latest_execution.id.name)
    remote.sync(execution, sync_nodes=False)
        forecast = execution.outputs["forecast"]
    return create_forecast_dataframe(execution.outputs["forecast"])

st.line_chart(get_forecasts())

You can check out the live demo of the app ✨ here ✨.

Takeaways

To summarize all the components that we've used to realize this online learning weather forecasting app:

Summary

So returning to the question we posed earlier:

How far can a (potentially small) data team get building and shipping production-grade model-driven data products?

Hopefully this post convinces you that we can go almost all the way! What's missing from this application is model monitoring, assessing dataset shift, and app authentication, and these might be subjects for another post.

What these three tools give us is the power to create robust and reliable model-driven applications with interactive UIs. If this sounds like something you'll need, give Flyte, Pandera, and Streamlit a try and a GitHub ⭐️, and let us know what you think in the comments below!

Resources