Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[poc] implement record_stream when using CUDA streams during group offloading #11081

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

sayakpaul
Copy link
Member

@sayakpaul sayakpaul commented Mar 17, 2025

What does this PR do?

With group offloading, we shipped the possibility to overlap computation with communication (data transfers). This PR shows (thanks to the work of @gau-nernst) that when using CUDA streams, the speed can further be improved with record_stream.

Record Stream Memory Time
False 1.6688 32.521
True 1.6688 31.825
Code
from diffusers import DiffusionPipeline
import torch.utils.benchmark as benchmark
from diffusers.hooks import apply_group_offloading
import torch 
import argparse
import json


def benchmark_fn(f, *args, **kwargs):
    t0 = benchmark.Timer(
        stmt="f(*args, **kwargs)",
        globals={"args": args, "kwargs": kwargs, "f": f},
        num_threads=torch.get_num_threads(),
    )
    return f"{(t0.blocked_autorange().mean):.3f}"


def run_inference(pipe, prompt):
    _ = pipe(
        prompt=prompt,
        num_images_per_prompt=1,
        width=576,
        height=1024,
        num_inference_steps=30,
        guidance_scale=3.5,
        max_sequence_length=512
    )


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--record_stream", action="store_true")
    args = parser.parse_args()


    model_id = "black-forest-labs/FLUX.1-dev"
    dtype = torch.bfloat16
    pipe = DiffusionPipeline.from_pretrained(model_id, torch_dtype=dtype)

    apply_group_offloading(
        pipe.transformer,
        offload_type="leaf_level",
        offload_device=torch.device("cpu"),
        onload_device=torch.device("cuda"),
        use_stream=True,
        record_stream=args.record_stream
    )
    apply_group_offloading(
        pipe.text_encoder, 
        offload_device=torch.device("cpu"),
        onload_device=torch.device("cuda"),
        offload_type="leaf_level",
        use_stream=True,
        record_stream=args.record_stream
    )
    apply_group_offloading(
        pipe.text_encoder_2, 
        offload_device=torch.device("cpu"),
        onload_device=torch.device("cuda"),
        offload_type="leaf_level",
        use_stream=True,
        record_stream=args.record_stream
    )
    apply_group_offloading(
        pipe.vae, 
        offload_device=torch.device("cpu"),
        onload_device=torch.device("cuda"),
        offload_type="leaf_level",
        use_stream=True,
        record_stream=args.record_stream
    )

    pipe.set_progress_bar_config(disable=True)

    prompt="A cat wearing sunglasses and working as a lifeguard at pool."
    for _ in range(2):
        image = pipe(
            prompt=prompt,
            num_images_per_prompt=1,
            width=576,
            height=1024,
            num_inference_steps=30,
            guidance_scale=3.5,
            max_sequence_length=512,
            generator=torch.manual_seed(0)
        ).images[0]
    time  = benchmark_fn(run_inference, pipe, prompt)
    inference_memory = torch.cuda.max_memory_allocated() / (1024 ** 3)

    image = pipe(
        prompt=prompt,
        num_images_per_prompt=1,
        width=576,
        height=1024,
        num_inference_steps=30,
        guidance_scale=3.5,
        max_sequence_length=512,
        generator=torch.manual_seed(0)
    ).images[0]

    filename = f"record_stream@{args.record_stream}"
    image.save(f"{filename}.png")
    with open(f"{filename}.json", "w") as f:
        json.dump(
            {"record_stream": args.record_stream, "memory": f"{inference_memory:.4f}", "time": time}, f
        )

Doesn't impact the quality.

Please note that this is just a PoC PR wherein I wanted to show the benefits of using record streams (it was easier with a PoC PR for me than opening a feature request). If this is good, I won't mind buttoning it up further myself for making it merge-ready or if the other maintainers directly commit to the PR branch for doing so. If the improvements are not good enough, I am happy to close the PR without asking anything.

@sayakpaul sayakpaul requested review from DN6 and a-r-r-o-w March 17, 2025 06:52
@HuggingFaceDocBuilderDev

The docs for this PR live here. All of your documentation changes will be reflected on that endpoint. The docs are available until 30 days after the last update.

Copy link
Member

@a-r-r-o-w a-r-r-o-w left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I think we could consider adding. I did think about it during initial addition of group offloading, but like you reported, the time difference seemed very low, and I attributed it to randomness because it wasn't a ~20%-ish difference (as shown in gau-nerst's gist) and because I didn't spend time actually looking at the stream profiles.

Could you also run the benchmarks again with a warmup and the changes from #11094 and #11097?

@sayakpaul
Copy link
Member Author

Even though the benchmark module accounts for the warmup (reference one, reference two), I added it to my snippet (updated snippet is included in the OP).

I merged the PR branches into this PR branch and obtained the following numbers on the DGX:

{"record_stream": false, "memory": "1.3514", "time": "32.065"}
{"record_stream": true, "memory": "1.3514", "time": "30.901"}

On audace, I got:

{"record_stream": false, "memory": "1.3514", "time": "430.398"}
{"record_stream": true, "memory": "1.3514", "time": "429.695"}

for param in group_module.parameters():
param.data = param.data.to(self.onload_device, non_blocking=self.non_blocking)
if self.record_stream:
param.data.record_stream(current_stream)
Copy link
Member

@a-r-r-o-w a-r-r-o-w Mar 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

current_stream should be self.stream here I think. We need to tell pytorch that the param.data and buffer.data here is owned by the non-default stream. Currently, we're telling it that it is owned by the default stream, which seems incorrect to me

Sorry for the back and forth but I think we will have to run the benchmark once more with the change 😅

Apart from this, everything else looks good. We can button up the docs and merge after @DN6 gives a look. Let's make sure to mention that this may use more memory in comparison to record_stream=False in certain cases and link to the torch::Tensor::record_stream docs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nevermind, ignore my comment.

This method is most suitable for use cases where you are providing a function that created a tensor on a side stream, and want users to be able to make use of the tensor without having to think carefully about stream safety when making use of them.

We don't create anything on the non-default stream, so torch.cuda.current_stream is correct

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@a-r-r-o-w no problem. Better to be rigorous and double-check everything.

Just ran the benchmark after merging main into this branch (on DGX on a single 80GB A100):

{"record_stream": false, "memory": "1.3514", "time": "32.792"}
{"record_stream": true, "memory": "1.3514", "time": "30.944"}

Feel free to run it yourself if you want.

Let's make sure to mention that this may use more memory in comparison to record_stream=False in certain cases and link to the torch::Tensor::record_stream docs

Absolutely. I will mention in the docstrings. Is there any other place you wanted me to mention it?

@sayakpaul
Copy link
Member Author

Will let @DN6 review and will propagate the Doc changes after that.

@sayakpaul
Copy link
Member Author

@a-r-r-o-w @DN6 I have pulled in the latest improvements done to group offloading in this PR and resolved the conflicts. Have also run the benchmarks:

{"record_stream": true, "memory": "1.3514", "time": "31.047"}
{"record_stream": false, "memory": "1.3514", "time": "32.213"}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants