Skip to content

Commit 6421c74

Browse files
authored
Use a part reader + max 10k parts (#52)
* Use a part reader to avoid reading the files into memory * Fix log * Fix more vars * Max parts is 10000
1 parent 221c613 commit 6421c74

File tree

2 files changed

+49
-16
lines changed

2 files changed

+49
-16
lines changed

pkg/amazon/s3.go

+22-15
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package amazon
22

33
import (
4-
"bytes"
54
"context"
65
"errors"
76
"fmt"
7+
"io"
88
"log/slog"
99
"os"
1010
"sync"
@@ -100,8 +100,9 @@ func MultiPartUpload(input MultiPartUploadInput) error {
100100

101101
// Get the total number of parts we will upload
102102
numParts := getTotalNumberParts(fileSize, input.PartSize)
103+
partSize := getPartSize(fileSize, numParts, input.PartSize)
103104
if input.Logger != nil {
104-
input.Logger.Debug("will upload file in parts", "file", input.Filepath, "parts", numParts)
105+
input.Logger.Debug("will upload file in parts", "file", input.Filepath, "parts", numParts, "partSize", partSize, "fileSize", fileSize)
105106
}
106107

107108
var (
@@ -114,8 +115,8 @@ func MultiPartUpload(input MultiPartUploadInput) error {
114115
orderedParts := make([]*s3.CompletedPart, numParts)
115116
for i := int64(0); i < numParts; i++ {
116117
partNumber := i + 1
117-
offset := i * input.PartSize
118-
bytesToRead := min(input.PartSize, fileSize-offset)
118+
offset := i * partSize
119+
bytesToRead := min(partSize, fileSize-offset)
119120

120121
wg.Add(1)
121122
go func(partNumber int64, bytesToRead int64, offset int64) {
@@ -125,23 +126,18 @@ func MultiPartUpload(input MultiPartUploadInput) error {
125126
}()
126127
defer wg.Done()
127128

128-
partBuffer := make([]byte, bytesToRead)
129-
_, err := file.ReadAt(partBuffer, offset)
130-
if err != nil {
131-
ch <- err
132-
return
133-
}
129+
partReader := io.NewSectionReader(file, offset, bytesToRead)
134130

135131
if input.Logger != nil {
136-
input.Logger.Debug("uploading file part", "file", input.Filepath, "part", partNumber, "size", len(partBuffer))
132+
input.Logger.Debug("uploading file part", "file", input.Filepath, "part", partNumber, "size", bytesToRead)
137133
}
138134

139135
resp, err := input.Svc.UploadPart(&s3.UploadPartInput{
140136
Bucket: aws.String(input.DestinationBucket),
141137
Key: aws.String(input.DestinationKey),
142138
UploadId: &uploadID,
143139
PartNumber: aws.Int64(partNumber),
144-
Body: bytes.NewReader(partBuffer),
140+
Body: partReader,
145141
})
146142
if err != nil {
147143
ch <- fmt.Errorf("error uploading part %d : %w", partNumber, err)
@@ -155,7 +151,7 @@ func MultiPartUpload(input MultiPartUploadInput) error {
155151
}
156152

157153
if input.Logger != nil {
158-
input.Logger.Debug("finished uploading file part", "file", input.Filepath, "part", partNumber, "size", len(partBuffer))
154+
input.Logger.Debug("finished uploading file part", "file", input.Filepath, "part", partNumber, "size", bytesToRead)
159155
}
160156
}(partNumber, bytesToRead, offset)
161157
}
@@ -197,7 +193,18 @@ func min(a, b int64) int64 {
197193

198194
func getTotalNumberParts(filesize int64, partsize int64) int64 {
199195
if filesize%partsize == 0 {
200-
return filesize / partsize
196+
return min(10000, filesize/partsize)
197+
}
198+
return min(10000, filesize/partsize+1)
199+
}
200+
201+
func getPartSize(filesize int64, numParts int64, defaultPartSize int64) int64 {
202+
if numParts < 10000 {
203+
return defaultPartSize
204+
}
205+
if filesize%numParts == 0 {
206+
return filesize / numParts
201207
}
202-
return filesize/partsize + 1
208+
// numParts-1 to account for any rounding (makes the parts slightly larger)
209+
return filesize / (numParts - 1)
203210
}

pkg/amazon/s3_test.go

+27-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ func TestGetTotalNumberParts(t *testing.T) {
1212
{2, 1, 2}, // Test quotient where division produced a whole number
1313
// Test with a couple much larger numbers
1414
{104857600, 5242880, 20}, // 100MB file, 5MB chunks, 20 total chunks
15-
{132070244351, 8388200, 15745}, // ~123GB file, ~8MB chunks, 15745 total chunks
15+
{132070244351, 8388200, 10000}, // ~123GB file, ~8MB chunks, 15745 total chunks - limit is 10,000 so expect 10k
16+
{100658400000, 8388200, 10000}, // Testing a perfect division into parts
1617
}
1718

1819
for _, test := range tests {
@@ -22,3 +23,28 @@ func TestGetTotalNumberParts(t *testing.T) {
2223
}
2324
}
2425
}
26+
27+
func TestGetPartSize(t *testing.T) {
28+
tests := []struct {
29+
FileSize int64
30+
PartSize int64
31+
Expected int64
32+
}{
33+
{5, 2, 2}, // Test quotient where division produced a non-whole number
34+
{2, 1, 1}, // Test quotient where division produced a whole number
35+
// Test with a couple much larger numbers
36+
{104857600, 5242880, 5242880}, // 100MB file, 5MB chunks, 20 total chunks
37+
{132070244351, 8388200, 13208345}, // ~123GB file, ~8MB chunks, 15745 total chunks - limit is 10,000 so expect 10k
38+
{132083450000, 8388200, 13208345}, // Should be an exact filesize to fit perfectly into the parts
39+
{132083450001, 8388200, 13209665}, // One too big to fit perfectly into the parts, so expect larger part sizes to accommodate
40+
{132096658344, 8388200, 13210986}, // -1 too big to fit perfectly into the parts, so expect larger part sizes to accommodate
41+
}
42+
43+
for _, test := range tests {
44+
numParts := getTotalNumberParts(test.FileSize, test.PartSize)
45+
result := getPartSize(test.FileSize, numParts, test.PartSize)
46+
if result != test.Expected {
47+
t.Errorf("operation failed for %d / %d. Expected %d, got %d", test.FileSize, test.PartSize, test.Expected, result)
48+
}
49+
}
50+
}

0 commit comments

Comments
 (0)