@@ -82,6 +82,171 @@ func getSourceModTimeKey(metadata map[string]string) string {
8282 return ""
8383}
8484
85+ // layerDifference performs a breadth-first search (BFS) comparison between source and target.
86+ // Unlike the standard recursive listing approach, this function traverses the object hierarchy
87+ // layer by layer (directory by directory), which prevents overwhelming the server with
88+ // large recursive listing operations that could cause timeouts or connection failures.
89+ //
90+ // This approach is especially useful for buckets containing millions of objects where
91+ // a standard recursive listing might cause server-side resource exhaustion. By exploring
92+ // the hierarchy level by level and comparing objects at each layer, this function provides
93+ // a more scalable solution for large object stores.
94+ //
95+ // The BFS approach:
96+ // 1. Starts with the root prefix ("") for both source and target
97+ // 2. Lists objects at the current level/prefix (non-recursively)
98+ // 3. Compares objects found at this level
99+ // 4. Queues any directories found for exploration in the next iteration
100+ // 5. Continues until all directories in both source and target are explored
101+ //
102+ // This is enabled with the --bfs parameter to avoid the limitations of recursive listing.
103+ func layerDifference (ctx context.Context , sourceClnt , targetClnt Client , opts mirrorOptions ) chan diffMessage {
104+ diffCh := make (chan diffMessage , 10000 )
105+
106+ go func () {
107+ defer close (diffCh )
108+
109+ // Channels to feed items found by BFS into the difference engine
110+ srcClientCh := make (chan * ClientContent , 1000 )
111+ tgtClientCh := make (chan * ClientContent , 1000 )
112+
113+ // Goroutine to perform BFS on the source
114+ go func () {
115+ defer close (srcClientCh )
116+ // Queue for *relative object prefixes* to explore
117+ queue := []string {"" } // "" represents the root prefix
118+
119+ for len (queue ) > 0 {
120+ // Dequeue the next relative prefix
121+ prefix := queue [0 ]
122+ queue = queue [1 :]
123+
124+ // List items at the current prefix level using the relative prefix
125+ listCtx , listCancel := context .WithCancel (ctx )
126+ contentsCh := sourceClnt .List (listCtx , ListOptions {
127+ Recursive : false , // List only the current level
128+ WithMetadata : opts .isMetadata ,
129+ ShowDir : DirLast , // Ensure directories are listed
130+ Prefix : prefix , // Pass the relative prefix
131+ })
132+
133+ for content := range contentsCh {
134+ select {
135+ case <- ctx .Done ():
136+ listCancel ()
137+ return
138+ default :
139+ if content != nil && content .Err != nil {
140+ srcClientCh <- content
141+ listCancel ()
142+ continue
143+ }
144+ if content == nil {
145+ continue
146+ }
147+
148+ // Send the valid content (file or directory) for comparison
149+ srcClientCh <- content
150+
151+ // If it's a directory, queue its *relative object key* for the next level
152+ if content .Type .IsDir () {
153+ relativeKey := content .ObjectKey // Get the relative key
154+ // Prevent infinite loops: don't re-queue the prefix we just listed,
155+ // especially the root ("") which might list itself as "/" depending on backend.
156+ // Also check if ObjectKey is populated.
157+ if relativeKey != "" && relativeKey != prefix {
158+ // Ensure the key ends with a separator if it's a directory prefix
159+ // The S3 ListObjects usually returns directory keys ending with '/'
160+ if ! strings .HasSuffix (relativeKey , string (content .URL .Separator )) {
161+ // This case might indicate a non-standard directory representation, handle cautiously
162+ // For standard S3, common prefixes already end in '/'
163+ // If needed, append separator: relativeKey += string(content.URL.Separator)
164+ }
165+ // Add the relative key (prefix) to the queue
166+ queue = append (queue , relativeKey )
167+ }
168+ }
169+ }
170+ }
171+ listCancel ()
172+ }
173+ }()
174+
175+ // Goroutine to perform BFS on the target (symmetric to the source)
176+ go func () {
177+ defer close (tgtClientCh )
178+ // Queue for *relative object prefixes*
179+ queue := []string {"" }
180+
181+ for len (queue ) > 0 {
182+ prefix := queue [0 ]
183+ queue = queue [1 :]
184+
185+ listCtx , listCancel := context .WithCancel (ctx )
186+ contentsCh := targetClnt .List (listCtx , ListOptions {
187+ Recursive : false ,
188+ WithMetadata : opts .isMetadata ,
189+ ShowDir : DirLast ,
190+ Prefix : prefix , // Pass the relative prefix
191+ })
192+
193+ for content := range contentsCh {
194+ select {
195+ case <- ctx .Done ():
196+ listCancel ()
197+ return
198+ default :
199+ if content != nil && content .Err != nil {
200+ tgtClientCh <- content
201+ listCancel ()
202+ continue
203+ }
204+ if content == nil {
205+ continue
206+ }
207+
208+ tgtClientCh <- content
209+
210+ // If it's a directory, queue its *relative object key*
211+ if content .Type .IsDir () {
212+ relativeKey := content .ObjectKey
213+ if relativeKey != "" && relativeKey != prefix {
214+ // Ensure trailing slash if needed (usually present from S3 List)
215+ if ! strings .HasSuffix (relativeKey , string (content .URL .Separator )) {
216+ // Handle non-standard directory representation if necessary
217+ }
218+ queue = append (queue , relativeKey )
219+ }
220+ }
221+ }
222+ }
223+ listCancel ()
224+ }
225+ }()
226+
227+ // Comparison logic remains the same
228+ err := differenceInternal (
229+ sourceClnt .GetURL ().String (),
230+ srcClientCh ,
231+ targetClnt .GetURL ().String (),
232+ tgtClientCh ,
233+ opts ,
234+ false , // returnSimilar is false
235+ diffCh ,
236+ )
237+
238+ if err != nil {
239+ select {
240+ case <- ctx .Done ():
241+ default :
242+ diffCh <- diffMessage {Error : err }
243+ }
244+ }
245+ }()
246+
247+ return diffCh
248+ }
249+
85250// activeActiveModTimeUpdated tries to calculate if the object copy in the target
86251// is older than the one in the source by comparing the modtime of the data.
87252func activeActiveModTimeUpdated (src , dst * ClientContent ) bool {
@@ -167,7 +332,12 @@ func bucketObjectDifference(ctx context.Context, sourceClnt, targetClnt Client)
167332 })
168333}
169334
170- func objectDifference (ctx context.Context , sourceClnt , targetClnt Client , opts mirrorOptions ) (diffCh chan diffMessage ) {
335+ func objectDifference (ctx context.Context , sourceClnt , targetClnt Client , opts mirrorOptions ) chan diffMessage {
336+ if opts .bfs {
337+ // Use layer-by-layer difference for regular objects
338+ return layerDifference (ctx , sourceClnt , targetClnt , opts )
339+ }
340+
171341 sourceURL := sourceClnt .GetURL ().String ()
172342 sourceCh := sourceClnt .List (ctx , ListOptions {Recursive : true , WithMetadata : opts .isMetadata , ShowDir : DirNone })
173343
0 commit comments