Skip to content

Commit f4a8cc5

Browse files
psobotMichel Davit
and
Michel Davit
authored
Fix RemoteFileUtil to download in parallel as expected. (#5515)
Co-authored-by: Michel Davit <[email protected]>
1 parent 489bd7a commit f4a8cc5

File tree

1 file changed

+20
-15
lines changed

1 file changed

+20
-15
lines changed

scio-core/src/main/java/com/spotify/scio/util/RemoteFileUtil.java

+20-15
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717

1818
package com.spotify.scio.util;
1919

20+
import com.google.common.util.concurrent.Futures;
21+
import com.google.common.util.concurrent.ListenableFuture;
22+
import com.google.common.util.concurrent.ListeningExecutorService;
23+
import com.google.common.util.concurrent.MoreExecutors;
2024
import java.io.FileNotFoundException;
2125
import java.io.IOException;
2226
import java.io.Serializable;
@@ -28,14 +32,14 @@
2832
import java.nio.file.Path;
2933
import java.nio.file.Paths;
3034
import java.nio.file.StandardOpenOption;
31-
import java.util.ArrayList;
3235
import java.util.List;
33-
import java.util.Map;
34-
import java.util.concurrent.ExecutionException;
36+
import java.util.concurrent.*;
37+
import java.util.stream.Collectors;
3538
import org.apache.beam.sdk.io.FileSystems;
3639
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
3740
import org.apache.beam.sdk.io.fs.ResourceId;
3841
import org.apache.beam.sdk.options.PipelineOptions;
42+
import org.apache.beam.sdk.transforms.DoFn;
3943
import org.apache.beam.sdk.util.MimeTypes;
4044
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
4145
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
@@ -46,17 +50,19 @@
4650
import org.slf4j.Logger;
4751
import org.slf4j.LoggerFactory;
4852

49-
/**
50-
* A utility class for handling remote file systems designed to be used in a {@link
51-
* org.apache.beam.sdk.transforms.DoFn}.
52-
*/
53+
/** A utility class for handling remote file systems designed to be used in a {@link DoFn}. */
5354
public class RemoteFileUtil implements Serializable {
5455

5556
private static final Logger LOG = LoggerFactory.getLogger(RemoteFileUtil.class);
5657

5758
private static final int CONCURRENCY_LEVEL = Runtime.getRuntime().availableProcessors() * 4;
5859
private static final int HASH_LENGTH = 8;
5960

61+
private static final ListeningExecutorService executor =
62+
MoreExecutors.listeningDecorator(
63+
MoreExecutors.getExitingExecutorService(
64+
(ThreadPoolExecutor) Executors.newFixedThreadPool(CONCURRENCY_LEVEL)));
65+
6066
// Mapping of remote sources to local destinations
6167
private static final LoadingCache<URI, Path> paths =
6268
CacheBuilder.newBuilder()
@@ -105,21 +111,20 @@ public Path download(URI src) {
105111
* @return {@link Path}s to the downloaded local files.
106112
*/
107113
public List<Path> download(List<URI> srcs) {
114+
List<ListenableFuture<Path>> futures =
115+
srcs.stream()
116+
.map(uri -> executor.submit(() -> paths.get(uri)))
117+
.collect(Collectors.toList());
108118
try {
109-
Map<URI, Path> results = paths.getAll(srcs);
110-
List<Path> paths = new ArrayList<>(srcs.size());
111-
for (URI src : srcs) {
112-
paths.add(results.get(src));
113-
}
114-
return paths;
115-
} catch (ExecutionException e) {
119+
return Futures.allAsList(futures).get();
120+
} catch (InterruptedException | ExecutionException e) {
116121
throw new RuntimeException(e);
117122
}
118123
}
119124

120125
/** Delete a single downloaded local file. */
121126
public void delete(URI src) {
122-
Path dst = null;
127+
Path dst;
123128
try {
124129
dst = paths.get(src);
125130
} catch (ExecutionException e) {

0 commit comments

Comments
 (0)