Samhita Alla
Samhita Alla
No items found.

Analyzing COVID-19 Impact on NYC Taxis with DuckDB

Data analytics made easy with DuckDB integration: Run complex queries on your data effortlessly.

The world of data analytics is constantly evolving, with new tools and technologies emerging every day. One popular new tool is DuckDB, open-source software touted as a faster, more efficient alternative to traditional databases.

So, what exactly is DuckDB, and why is it gaining so much attention in the data analytics community? In simple terms, DuckDB is a SQL database management system that is designed to handle analytical workloads. Unlike traditional databases that are optimized for transactional workloads, DuckDB is optimized for analytical workloads, making it the preferred choice for data analysts and data scientists.

DuckDB stands out for its speed and capacity to handle sizable datasets. In this article, we'll look at DuckDB's capabilities by running analytical queries on a few gigabytes of NYC taxi data, all within a Flyte workflow. In the process, we'll provide insight into how COVID-19 affected pickups, drop-offs, and peak and off-peak hours for NYC taxis, and how Flyte can expedite analytical queries.

Fetching pickup and drop-off hours over a span of 4 years

Start by retrieving the pickup and drop-off hours of NYC taxis from 2019 to 2022 from the parquet files. Since each parquet file corresponds to a specific year and month, execute queries on each data file using a loop. To handle this dynamic task, utilize flytekit's `@dynamic` workflow.

Send the resulting list of pickup and drop-off structured datasets to a `coalesce_dfs` task. This task is responsible for coalescing the structured datasets into a single list, grouping the pickup and drop-off values by year.

Copied to clipboard!
@dynamic(requests=Resources(mem="1Gi", cpu="2"))
def fetch_trips_data(years: List[int], months: List[int]) -> vaex_dfs_list:
    result = []
    for year in years:
        for month in months:
            url = f"https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_{year}-{month:02d}.parquet"
            pickup_sd = pickup_query(params=[url])
            dropoff_sd = dropoff_query(params=[url])
            result.append(
                {
                    "pickup_df": pickup_sd,
                    "dropoff_df": dropoff_sd,
                }
            )
    return coalesce_dfs(sds=result, years=years)

Define `peak_hour_output` and `vaex_dfs_list` types as follows:

Copied to clipboard!
peak_hour_output = List[Dict[str, StructuredDataset]]
vaex_dfs_list = List[vaex.dataframe.DataFrameLocal]

`pickup_query` and `dropoff_query` refer to the DuckDB queries that need to be executed on the parquet files. In Flytekit, a DuckDB query can be defined using the `DuckDBQuery` task. This task requires specification of the task name, the query or list of queries to be executed, and any input parameters required by the query. 

To install the DuckDB plugin, run the following command:

Copied to clipboard!
pip install flytekitplugins-duckdb

To obtain the number of trips for every hour, along with their corresponding pickup and drop-off values, we construct two queries. Design the queries to operate on the parquet file, which will be used as the input data source.

Copied to clipboard!
from flytekitplugins.duckdb import DuckDBQuery 


pickup_query = DuckDBQuery(
    name="pickup_query",
    query=[
        "install httpfs",
        "load httpfs",
        """select hour(lpep_pickup_datetime) as hour,
        count(*) as count
        from read_parquet(?)
        group by hour""",
    ],
    inputs=kwtypes(params=List[str]),
)

dropoff_query = DuckDBQuery(
    name="dropoff_query",
    query=[
        "install httpfs",
        "load httpfs",
        """
        select hour(lpep_dropoff_datetime) as hour,
        count(*) as count
        from read_parquet(?)
        group by hour
        """,
    ],
    inputs=kwtypes(params=List[str]),
)

The `pickup_query` and `dropoff_query` tasks are triggered simultaneously in the `@dynamic` workflow, resulting in expedited executions!

Now, we define a `coalesce_df` task to merge the structured datasets for each year.

  1. For each pickup and drop-off structured dataset dictionary, add the corresponding year as a key-value pair to the dictionary.
  2. Sort the structured datasets by year.
  3. Iterate through the structured datasets, and for each dataset, load it into a Vaex dataframe. Concatenate the pickup dataframe with the previous pickup dataframe, and the drop-off dataframe with the previous drop-off dataframe until all the 12 months have been considered.
  4. Once the Vaex dataframes for a year are obtained, group the pickup and drop-off dataframes by hour, and join them together.
  5. Store the resulting dataframe in a list and repeat the process for the next year.

The result will be a list of Vaex dataframes.

Copied to clipboard!
@task(requests=Resources(mem="500Mi", cpu="2"))
def coalesce_dfs(sds: peak_hour_output, years: List[int]) -> vaex_dfs_list:
    # add year to every dictionary
    partition_index = int(len(sds) / len(years))
    for index, item in enumerate(sds):
        item["year"] = years[index // partition_index]

    # sort by year
    sds.sort(key=lambda x: x["year"])

    result_list = []
    end_index = len(sds) / len(years)
    pickup_df, dropoff_df = None, None

    for index, item in enumerate(sds, start=1):
        materialized_pickup_df = (
            item["pickup_sd"].open(vaex.dataframe.DataFrameLocal).all()
        )
        materialized_dropoff_df = (
            item["dropoff_sd"].open(vaex.dataframe.DataFrameLocal).all()
        )
        pickup_df = (
            vaex.concat([pickup_df, materialized_pickup_df])
            if pickup_df
            else materialized_pickup_df
        )
        dropoff_df = (
            vaex.concat([dropoff_df, materialized_dropoff_df])
            if dropoff_df
            else materialized_dropoff_df
        )
        # for every year, store the number of pickups and drop-offs
        if index % end_index == 0:
            year = item["year"]
            pickup_df = pickup_df.groupby(by="hour").agg({"count": "sum"})
            dropoff_df = dropoff_df.groupby(by="hour").agg({"count": "sum"})
            pickup_df.rename("count", "pickup_count")
            dropoff_df.rename("count", "dropoff_count")
            result_df = pickup_df.join(dropoff_df, on="hour")
            result_df["year"] = np.array([year] * len(result_df))
            result_list.append(result_df)
            pickup_df, dropoff_df = None, None

    return result_list

Note: Vaex is designed to handle and process large datasets. It can be helpful in our case to expedite the aggregations and joins of dataframes. Flyte provides native support for Vaex dataframes. You can install the library by running the following command:

Copied to clipboard!
pip install flytekitplugins-vaex

Visualizing data through plot rendering

To estimate the impact of COVID-19 on NYC taxi trips, we can create a visual representation of the pickups and drop-offs between 2019 and 2022. 

Copied to clipboard!
@task(disable_deck=False)
def generate_plot(dfs: vaex_dfs_list):
    fig = plt.figure()
    ax = fig.add_subplot(1, 1, 1)
    for df in dfs:
        ax.plot(
            df.hour,
            df.pickup_count,
            label=f"{df.year.values[0]} Pickup Count",
            linestyle="--",
            marker="o",
        )
        ax.plot(
            df.hour,
            df.dropoff_count,
            label=f"{df.year.values[0]} Dropoff Count",
            linestyle="--",
            marker="o",
        )
    ax.set_xlabel("hour")
    ax.set_ylabel("count")
    ax.set_title("Number of pickups/dropoffs")
    ax.legend(loc="best")
    fig.savefig("plot.png")

    flytekit.Deck("matplotlib", mpld3.fig_to_html(fig))
Pickups and drop-offs between 2019 and 2022

Install the following libraries prior to rendering the plot:

Copied to clipboard!
pip install mpld3
pip install vaex-viz

The plot suggests that in 2019, there were significantly more instances of pickups and drop-offs compared to 2020, 2021, and 2022. At 18:00 hour, for instance, there was a noticeable drop in pickups from about 400,000 in 2019 to roughly 90,000 in 2020, and even lower at around 50,000 in 2021 and 2022.

Retrieving the peak and off-peak hours

To retrieve the peak and off-peak hours for each year from 2019 to 2022, we can follow these steps:

  1. Define a `@dynamic` workflow to loop over the Vaex dataframes retrieved from the `coalesce_dfs` task.
  2. Generate an Arrow table from every Vaex dataframe.
  3. Execute two DuckDB queries to retrieve peak and off-peak hours by sending the arrow tables to the queries.
  4. Define a post-processing task to retrieve the peak and off-peak hours from the dataframes.
  5. Return the result.
Copied to clipboard!
@task
def peak_offpeak_postprocess(
    df: vaex.dataframe.DataFrameLocal,
    peak_hour_sd: StructuredDataset,
    off_peak_hour_sd: StructuredDataset,
) -> PeakOffPeak:
    peak_hour = peak_hour_sd.open(pd.DataFrame).all().iloc[0]["hour"].item()
    off_peak_hour = off_peak_hour_sd.open(pd.DataFrame).all().iloc[0]["hour"].item()

    return PeakOffPeak(df.year.values[0].as_py(), peak_hour, off_peak_hour)


@dynamic(requests=Resources(mem="500Mi", cpu="2"))
def peak_offpeak_hours(dfs: vaex_dfs_list) -> List[PeakOffPeak]:
    result = []
    for df in dfs:
        arrow_table = df.to_arrow_table()
        peak_hour_sd = peak_hour_query(arrow_table=arrow_table)
        off_peak_hour_sd = off_peak_hour_query(arrow_table=arrow_table)
        df_result = peak_offpeak_postprocess(
            df=df,
            peak_hour_sd=peak_hour_sd,
            off_peak_hour_sd=off_peak_hour_sd,
        )
        result.append(df_result)
    return result
Peak and off-peak hours

`peak_hour_query` and `off_peak_hour_query` execute SQL queries on the Arrow tables and retrieve the peak and off-peak hours, respectively.

Peak hour refers to the hour with the highest number of pickups and drop-offs, while off-peak hour refers to the hour with the lowest number of pickups and drop-offs.

Copied to clipboard!
peak_hour_query = DuckDBQuery(
    name="peak_hour_query",
    query="""select hour
        from arrow_table
        where pickup_count + dropoff_count = (select max(pickup_count + dropoff_count)
                                               from arrow_table)""",
    inputs=kwtypes(arrow_table=pa.Table),
)

off_peak_hour_query = DuckDBQuery(
    name="off_peak_hour_query",
    query="""select hour
        from arrow_table
        where pickup_count + dropoff_count = (select min(pickup_count + dropoff_count)
                                               from arrow_table)""",
    inputs=kwtypes(arrow_table=pa.Table),
)

`PeakOffPeak` is a data class that specifies the format for storing the year, peak hour, and off-peak hour.

Copied to clipboard!
@dataclass_json
@dataclass
class PeakOffPeak:
    year: int
    peak_hour: int
    off_peak_hour: int

Running the workflow

You can execute the workflow either locally or on a Flyte cluster. To establish dependencies between tasks, build a Flyte workflow using the following steps:

Copied to clipboard!
@workflow
def duckdb_wf(
    years: List[int] = [2019, 2020, 2021, 2022], months: List[int] = list(range(1, 13))
) -> List[PeakOffPeak]:
    result_dfs = fetch_trips_data(years=years, months=months)
    generate_plot(dfs=result_dfs)
    return peak_offpeak_hours(dfs=result_dfs)

Note: The source code is available on GitHub Gist.

An illustration of the end-to-end workflow presented on the Flyte UI
Zoomed in…
An illustration of the `peak_offpeak_hours` dynamic workflow

Note: You may come across access denied errors when hitting multiple CloudFront URLs to retrieve the data. In such a scenario, wait for about an hour and recover the existing execution. This way, you can prevent re-execution of successful Parquet file executions. Flyte's recoverability feature can help!

Summary

In conclusion, leveraging the Flyte x DuckDB integration can simplify the execution of complex analytical workloads on Flyte. If you are interested in running DuckDB queries within Flyte, here are some valuable resources to explore:

  • Review our documentation to gain a better understanding of how to use the Flyte x DuckDB integration.
  • Take a look at our plugin code to get a better sense of how the integration works under the hood.
  • Join our Slack community if you have any questions or want to engage with other users who are using Flyte x DuckDB.
  • Give the Flyte repository a ⭐ on GitHub and follow us on Twitter to stay informed about the latest updates and enhancements.
  • Create a GitHub issue if you run into any bugs and let us know.

By leveraging the Flyte x DuckDB integration, you can optimize your analytical workflows for greater efficiency.