Skip to content

Conversation

@afkbluey
Copy link

@afkbluey afkbluey commented Oct 25, 2025

Community Contribution License

All community contributions in this pull request are licensed to the project maintainers
under the terms of the Apache 2 license.
By creating this pull request I represent that I have the right to license the
contributions to the project maintainers under the Apache 2 license.

Description

mc pipe already includes flags to make it fast. This PR adds mc cat support for --parallel and --buffer-size flags so it can be fast too.

Motivation and Context

I have a fast on prem k8s cluster with Minio object store. I want to move a very large 15TB object from one bucket to another using mc cli. The most obvious choice mc cp is limited by mc put limit of 5TB following same behaviour as AWS. https://docs.min.io/enterprise/aistor-object-store/reference/aistor-server/thresholds/

The next best is mc pipe which works great except is only as fast as the incoming stream. Enter mc cat. Other alternatives for copying across buckets on my k8s Minio instance:

  • mc cat doesn't support --part-size, doesn't support concurrency, supports bucket-to-bucket
  • mc cp doesn't support --part-size, supports --max-workers, supports bucket-to-bucket
  • mc mirror doesn't support --part-size, supports --max-workers, supports bucket-to-bucket
  • mc put supports --part-size, supports --parallel, doesn't support bucket-to-bucket
  • mc pipe supports --part-size, supports --concurrent, only supports stdin-to-bucket
  • mc od supports size=, doesn't support concurrency, supports bucket-to-bucket

Rather than deviate from AWS behaviour i.e. augmenting mc cp to handle files greater than 5TB, this PR proposes to enable mc cat to be fast so as not to limit the benefits when piping to mc pipe

How to test this PR?

run unit tests

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Optimization (provides speedup with no functional changes)
  • Breaking change (fix or feature that would cause existing functionality to change)

Checklist:

  • Fixes a regression (If yes, please add commit-id or PR # here)
  • Unit tests added/updated
  • Internal documentation updated
  • Create a documentation update request here

- Test small and large data reads
- Test single and multi-threaded scenarios
- Test exact and uneven part boundaries
- Test small buffer reads and edge cases
- Test proper resource cleanup
- Add mock client for isolated testing
afkbluey and others added 2 commits October 27, 2025 20:42
* buffer size for user clarify
* context.CancelCauseFunc to avoid duplicated context
* buffer pools for re-using buffers
* worker queue depth * 2 for non blocking parallel
* 16MiB min buffer size
@afkbluey afkbluey requested a review from klauspost October 27, 2025 11:41
@afkbluey afkbluey changed the title Feature: mc cat with parallel and part size Feature: mc cat with parallel and buffer size args Oct 27, 2025
@afkbluey
Copy link
Author

OK so I didn't find much benefit and was head scratching. Here is findings and proposal. Sorry I've made lots of changes but have tried to provide confidence by decent tests:

original PR architecture

chan chan request response pattern:

Read() call #1 ──┐
                 ▼
         Create responseCh1
                 │
                 ▼
         responseChs ────▶ Worker picks up responseCh1
                                    │
                                    ▼
                            Pull from requestCh (part 0)
                                    │
                                    ▼
                            Download part 0 (BLOCKING)
                                    │
                                    ▼
Read() WAITS ◀───────────── Send to responseCh1
         │
         ▼
     Returns data

Read() call #2 ──┐
                 ▼
         Create responseCh2
                 │
                 ▼
         responseChs ────▶ Worker picks up responseCh2
                                    │
                                    ▼
                            Pull from requestCh (part 1)
                                    │
                                    ▼
                            Download part 1 (BLOCKING)
                                    │
                                    ▼
Read() WAITS ◀───────────── Send to responseCh2

The chan chan request response pattern is good for dynamic responses except our use case is for predictable, fixed-size parts. It created a tight coupling of Read() → Worker → Download → Read() and was pretty much just doing complex single-threaded download:

  • responseCh was sent to responseChs and blocked waiting to download and send back
  • Workers couldn't start downloading part N+1 while part N was still downloading
  • workers waiting for Read() to send a responseCh

Proposed improved architecture

Producer consumer pipeline architecture:

scheduleRequests() 
       │
       ▼
   requestCh: [0][1][2][3][4][5][6][7][8][9]...
       │
       ├─────────────────┬─────────────────┬─────────────────┐
    Worker 1          Worker 2          Worker 3          Worker 4
       │                 │                 │                 │
    HTTP GET          HTTP GET          HTTP GET          HTTP GET
   Range: 0-64K      Range: 64K-128K   Range: 128K-192K  Range: 192K-256K
       │                 │                 │                 │
    Download          Download          Download          Download  ← ALL PARALLEL
    TCP conn 1        TCP conn 2        TCP conn 3        TCP conn 4
      └──────────────────┴──────────────────┴────────────────┘
                 ▼
            resultCh: [2][0][3][1][4]...  ← Out of order!
                 │
                 ▼
         collectResults()
                 │
                 ▼
         partBuffer (map):
         {
           0: data,
           1: data,
           2: data,
           3: data
         }
                 │
                 ▼
            Read() ──┐
                     │
                     ├─ Needs part 0? ──▶ Found in buffer ──▶ Return immediately
                     │
                     ├─ Needs part 1? ──▶ Found in buffer ──▶ Return immediately
                     │
                     └─ Needs part 5? ──▶ Not in buffer ──▶ Wait (condition var)
                                                                     │
                                                  Worker finishes ──┘
                                                  part 5 arrives
                                                       │
                                                       ▼
                                              Broadcast signal
                                                       │
                                                       ▼
                                              Read() wakes up
                                                       │
                                                       ▼
                                              Return part 5

Producer consumer pipeline pushes part numbers and workers continuously pull from the queue in parallel:

  • scheduleRequests() feeds work → Workers download in parallel → collectResults() buffers → Read() consumes sequentially
  • out of order downloaded parts stored in a map for 0(1) retrieval by Read() and is blocked only if the part is not yet downloaded
  • read only wakes up when the needed part arrives rather than polling
  • Download speed = min(parallelism, bandwidth_limit) × network_speed

I had to also add RangeEnd to Get method for client-s3.go. I was surprised it wasn't added already and am aware this is method would be used by loads of stuff. Is there a different client/method which already has RangeEnd I should be using instead?

@klauspost
Copy link
Contributor

Error: cmd/accounting-reader.go:193:1: File is not properly formatted (gofumpt)
	return
^
Error: cmd/admin-config-reset.go:81:1: File is not properly formatted (gofumpt)
	return
^
Error: cmd/admin-config-set.go:76:1: File is not properly formatted (gofumpt)
	return
^
Error: cmd/parallel-reader.go:202:43: `cancelled` is a misspelling of `canceled` (misspell)
		// This prevents deadlock if context is cancelled while Read() is waiting
		                                        ^
Error: cmd/parallel-reader.go:218:25: `cancelled` is a misspelling of `canceled` (misspell)
	// Check if context is cancelled
	                       ^
Error: cmd/parallel-reader.go:317:39: `cancelled` is a misspelling of `canceled` (misspell)
	// Cancel the context if not already cancelled
	                                     ^
Error: cmd/cat-main.go:329:24: empty-block: this block is empty, you can remove it (revive)
			if partSize >= size {
				// Fall through to single-threaded reader
			} else {
Error: cmd/parallel-reader.go:75:122: unexported-return: exported func NewParallelReader returns unexported type *cmd.parallelReader, which can be annoying to use (revive)
func NewParallelReader(ctx context.Context, client Client, size int64, partSize int64, parallelism int, opts GetOptions) *parallelReader {
                                                                                                                         ^
Error: cmd/parallel_reader_test.go:47:26: unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
func (m *mockClient) Get(ctx context.Context, opts GetOptions) (io.ReadCloser, *ClientContent, *probe.Error) {
                         ^
Error: cmd/parallel_reader_test.go:66:27: unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
func (m *mockClient) Stat(ctx context.Context, opts StatOptions) (*ClientContent, *probe.Error) {
                          ^
Error: cmd/parallel_reader_test.go:69:27: unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
func (m *mockClient) List(ctx context.Context, opts ListOptions) <-chan *ClientContent { return nil }
                          ^
Error: cmd/parallel_reader_test.go:77:35: unused-parameter: parameter 'app' seems to be unused, consider removing or renaming it as _ (revive)
func (m *mockClient) AddUserAgent(app, version string) {}

You can run golangci-lint run -j8 in the root to check for these. go install github.com/golangci/golangci-lint/v2/cmd/[email protected] to install locally.

gofumpt doesn't want naked returns. The rest should should hopefully be understandable.

@afkbluey afkbluey requested a review from klauspost October 29, 2025 05:58
Copy link
Contributor

@klauspost klauspost left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm.

I sent in a PR with updates for the linter: #5256

You can either wait for that or fix the issues reported by the tests.

@afkbluey afkbluey requested a review from klauspost October 29, 2025 10:44
@afkbluey
Copy link
Author

afkbluey commented Oct 29, 2025

lgtm.
I sent in a PR with updates for the linter: #5256
You can either wait for that or fix the issues reported by the tests.

@klauspost I ran the CI commands in #5256:

go run golang.org/x/tools/gopls/internal/analysis/modernize/cmd/modernize@latest -fix -test ./...
gofumpt -w .

Tests and lints pass but now many changes included as part of this PR. Sorry if I misinterpreted your comment - in hindsight I should have merged that branch.

@klauspost
Copy link
Contributor

Yeah, maybe just revert latest commit.

@afkbluey
Copy link
Author

afkbluey commented Oct 29, 2025

@klauspost reverted now but includes the gofumt fixes to pass current CI

@afkbluey
Copy link
Author

afkbluey commented Oct 31, 2025

@klauspost im still not seeing improvements like I expected and I think it's due to fundamental bottleneck of stdout io.Read() needs to be synchronous. I'm going to close this PR and create a new PR so its easier to copy > 5TB files

@afkbluey afkbluey closed this Oct 31, 2025
@afkbluey
Copy link
Author

afkbluey commented Nov 1, 2025

#5257

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants