Niels Bantilan
Niels Bantilan

Reproducing Liger Kernel Benchmarks on Phi3 Mini

Reproducibility is the cornerstone of science and engineering: Without it, we can’t reliably build on top of each other’s findings and work. The “reproducibility crisis” is well-known in many scientific fields, and machine learning is no exception: It’s important to run independent benchmarks that come from scientific and engineering advancements so that we can verify their correctness, generalizability, and reliability.

As one of the most active users of Flyte, the ML team at LinkedIn has recently moved all of their LLM training infrastructure to Flyte. They’ve recently published the Liger Kernels Python package, which is a collection of triton kernels that accelerate LLM training by increasing token throughput and reducing the memory footprint of models.

Union helps ML engineers build production-grade AI/ML applications, and we strive to support and amplify open source projects that make LLM training more accessible. The Liger Kernel project does this by allowing ML practitioners to train models more efficiently in both single-node and multi-node setups. In this blog post, we’ll reproduce the LinkedIn team’s findings using a scaled-down training setup – using the Phi3 3B parameter model and a single A100. We find similar throughput and memory efficiency gains in these training conditions.

Creating a reproducible experiment workflow

To reproduce the training conditions as closely as possible, we’ve adapted the benchmarking example in the official repo into a Flyte workflow. The full codebase can be found here. We used Union Serverless to power this test. Learn more about our Serverless offering here.

Benchmarking Metrics

First, we use the same metrics of interest as defined by the official repo example: token throughput and peak reserved memory.

Copied to clipboard!
@dataclass
class Memory:
   """
   Memory is a dataclass to store the memory-related metrics.
   """

   step_peak_memory_allocated_MB: float = 0.0
   step_peak_memory_reserved_MB: float = 0.0
   total_peak_memory_allocated_MB: float = 0.0
   total_peak_memory_reserved_MB: float = 0.0

@dataclass
class TPS:
   """
   TPS is a dataclass to store the tokens per second metrics.
   """

   step_tokens_per_second: float = 0.0
   avg_tokens_per_second: float = 0.0

Supervised Fine-tuning with `trl` and `wandb`

Then, we define a task that performs the training loop. In this training run, we’re going to use:

  • The trl library’s `SFTTrainer` to do instruction fine-tuning on the alpaca dataset.
  • The Weights and Biases flytekit plugin to automatically save training logs across experiment runs.
  • An `A100` GPU to run each training run in our experiment.
Copied to clipboard!
WANDB_SECRET = Secret(key="wandb_api_key")

@task(
    container_image=image,
    limits=Resources(mem="24Gi", cpu="12", gpu="1"),
    accelerator=A100,
    cache=True,
    cache_version="v1",
    secret_requests=[WANDB_SECRET],
)
@wandb_init(project=WANDB_PROJECT, entity=WANDB_ENTITY, secret=WANDB_SECRET)
def train_model(
    training_args: TrainingArguments,
    dataset_cache_dir: FlyteDirectory,
    model_cache_dir: FlyteDirectory,
) -> TrainingResult:
   from trl import SFTTrainer

    ...
    trainer = SFTTrainer(
        model=model,
        args=hf_training_args,
        data_collator=collator,
        tokenizer=tokenizer,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
        ...
    )
    trainer.train()
    ...

The `@wandb_init` decorator will automatically create runs on the specific `project` and `entity` (which is typically your username) on your Weights and Biases account. This allows you to track training metrics as the experiments run on Union.

Parallelizing training runs with map tasks

To parallelize the experiment, we use a `map_task` wrapped inside a `@dynamic` workflow so that we can cache the results of the entire experiment. In the code below, we:

  • Use `partial` to pre-load the `dataset_cache_dir` and `model_cache_dir` inputs into the `train_model` task.
  • Wrap the partial task in a `map_task` to parallelize the experiment conditions specified in `training_args_list`.
Copied to clipboard!
@dynamic(
    container_image=image,
    cache=True,
    cache_version="1",
)
def run_cached_training_benchmark(
    training_args_list: list[TrainingArguments],
    dataset_cache_dir: FlyteDirectory,
    model_cache_dir: FlyteDirectory,
) -> list[Optional[TrainingResult]]:
    partial_train_model = partial(
        train_model,
        dataset_cache_dir=dataset_cache_dir,
        model_cache_dir=model_cache_dir,
    )
    results = map_task(
        partial_train_model,
        max_concurrency=4,
        min_success_ratio=0.1,
    )(training_args=training_args_list)
    return results

An important thing to note about the `map_task` is that we specify the `max_concurrency` to four so that we can control the number of `A100`s that are provisioned at any given time. We also expect that some of the experiment conditions will lead to `OOMKilled` (out-of-memory) errors, so we set `min_success_ratio` to `0.1` so that the map task can still succeed even if 90% of the subtasks fail. As you can see in the image of an example run below, six runs failed while executing the map task.

Visualizing a Report with Flyte Decks

To report the results of our experiment, we create a task to aggregate and analyze the log history of all of the training runs and produce a Flyte Deck.

Copied to clipboard!
@task(
    container_image=image,
    enable_deck=True,
    limits=Resources(mem="2Gi", cpu="2"),
)
def analyze_results(results: list[Optional[TrainingResult]]) -> pd.DataFrame:
    import matplotlib.pyplot as plt
    import seaborn as sns

    ...

    fig, ax = plt.subplots(2, 1, figsize=(12, 10))
    sns.barplot(..., data=avg_tokens_per_second)
    sns.barplot(..., data=step_peak_memory_reserved_mb)

    benchmark_deck = Deck("Benchmarking Results")
    benchmark_deck.append(_convert_fig_into_html(fig))

Composing a benchmarking workflow

Finally, we encapsulate the different components of the pipeline into a Flyte workflow with the following inputs:

  • `experiment_args` input specifies a mapping of all the different conditions we want to map over in our `map_task.`
  • `training_args` contains the default training arguments that the `train_model` task uses to set up and run fine-tuning.
  • `n_runs` specifies the number of repeats for each experimental condition so that we can get aggregate metrics.
Copied to clipboard!
@workflow
def benchmarking_experiment(
    experiment_args: ExperimentArguments = ExperimentArguments(),
    training_args: TrainingArguments = TrainingArguments(),
    n_runs: int = 3,
) -> tuple[list[Optional[TrainingResult]], pd.DataFrame]:
    dataset_cache_dir = download_dataset(training_args.dataset_name)
    model_cache_dir = download_model(training_args.model_name)
    training_args_list = prepare_experiment_args(experiment_args, training_args, n_runs)
    results = run_cached_training_benchmark(
        training_args_list, dataset_cache_dir, model_cache_dir
    )
    analysis = analyze_results(results=results)
    return results, analysis

To run the experiment, we specify the inputs to the `benchmarking_experiment` workflow as a yaml file:

Copied to clipboard!
experiment_args:
 use_liger: [true, false]
 per_device_train_batch_size: [16, 24, 32, 40, 48, 56]
training_args:
 model_name: "microsoft/Phi-3-mini-4k-instruct"
 dataset: "tatsu-lab/alpaca"
 ...
n_runs: 3

We then pass this yaml file into the `union run` command to run the benchmarking experiment via the `--inputs-file` argument:

Copied to clipboard!
union run --remote liger_kernel_finetuning.py benchmarking_experiment --inputs-file phi3_inputs.yaml

Analyzing the results

As we can see from the plot below, using Liger Kernel during training prevents out-of-memory errors in the larger batch sizes that we tested. We also found that the token throughput was 20-30% higher and the peak memory reserved was 6-13% lower when using the Liger Kernel vs. the vanilla `transformers` layers, which reproduces the findings reported by the official Liger Kernel repo.

Conclusion

In this benchmark experiment, we were able to reproduce the throughput and memory efficiency improvements reported by the original benchmark, but with a smaller model and a single A100 GPU. In addition, Union allowed us to:

  • Cache the outputs of intermediate steps in our pipeline so that we didn’t have to re-run certain steps, like downloading the dataset and model from HuggingFace Hub.
  • Leverage any Python framework we wanted to perform model training.
  • Easily integrate with existing monitoring tools like Weights and Biases.
  • Parallelize the runs of the experiment with map tasks, allowing us to control how many tasks to run concurrently and how many subtasks need to succeed to consider the map task run as successful.
  • Produce rich visualizations and reports using Flyte Decks to report on the final experiment analysis.

If you’re training AI models you can access GPUs on Union serverless!

<a href="https://docs.union.ai/serverless/tutorials/language-models/liger-kernel-finetuning" target="_blank" class="button w-inline-block">Run this benchmark on Union Serverless ↗</a>