Skip to content

Commit 3bfc8df

Browse files
authored
fix: prevent S3 path conflicts using tempfile (#569)
## Problem S3 downloads were sometimes failing with `NotADirectoryError` and `FileExistsError` when S3 buckets contained objects with conflicting naming patterns that cannot be represented in traditional filesystem hierarchies. **Example conflict:** - S3 object: `foo` (file) - S3 object: `foo/documents` (file requiring foo to be a directory) This created a race condition where download order determined success/failure ## Solution Used tempfile to create unique download paths for each S3 object: **Before:** ``` S3: "foo" → Local: /downloads/foo S3: "foo/documents" → Local: /downloads/foo/documents Conflict: foo cannot be both file and directory ``` **After:** ``` S3: "foo" → Local: /downloads/a1b2c3d4e5f6/foo S3: "foo/documents" → Local: /downloads/9g8h7i6j5k4l/documents No conflicts: Each file gets unique directory ``` ## Future Work This PR targets only the s3 downloads. I think it would make sense to use tempfiles for all downloads (as in [PR #571](#571)), but that requires more extensive changes to implement cleanly. This fix provides immediate relief from the path conflict issues while we work on the more comprehensive tempfile solution.
1 parent 93121f1 commit 3bfc8df

File tree

8 files changed

+47
-9
lines changed

8 files changed

+47
-9
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 1.2.2
2+
3+
* **Fix**: prevent S3 path conflicts using tempfile for directory isolation
4+
15
## 1.2.1
26

37
* **Fix**: Embeddings are properly assigned when embedding in batches
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"directory_structure": [
2+
"s3_keys": [
33
"wiki_movie_plots_small.csv"
44
]
55
}

test/integration/connectors/expected_results/s3-specialchar/directory_structure.json renamed to test/integration/connectors/expected_results/s3-specialchar/expected_s3_keys.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"directory_structure": [
2+
"s3_keys": [
33
"Why_is_the_sky_blue?.txt",
44
"[test]?*.txt"
55
]

test/integration/connectors/expected_results/s3/directory_structure.json renamed to test/integration/connectors/expected_results/s3/expected_s3_keys.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"directory_structure": [
2+
"s3_keys": [
33
"2023-Jan-economic-outlook.pdf",
44
"Silent-Giant-(1).pdf",
55
"page-with-formula.pdf",

test/integration/connectors/utils/validation/source.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -167,11 +167,24 @@ def run_expected_download_files_validation(
167167

168168

169169
def run_directory_structure_validation(expected_output_dir: Path, download_files: list[str]):
170-
directory_record = expected_output_dir / "directory_structure.json"
171-
with directory_record.open("r") as directory_file:
172-
directory_file_contents = json.load(directory_file)
173-
directory_structure = directory_file_contents["directory_structure"]
174-
assert directory_structure == download_files
170+
s3_keys_file = expected_output_dir / "expected_s3_keys.json"
171+
172+
if s3_keys_file.exists():
173+
with s3_keys_file.open("r") as f:
174+
s3_keys = json.load(f)["s3_keys"]
175+
176+
expected_filenames = {Path(s3_key).name for s3_key in s3_keys}
177+
actual_filenames = {Path(download_file).name for download_file in download_files}
178+
179+
assert expected_filenames == actual_filenames, (
180+
f"Expected filenames: {sorted(expected_filenames)}, "
181+
f"Got filenames: {sorted(actual_filenames)}"
182+
)
183+
else:
184+
directory_record = expected_output_dir / "directory_structure.json"
185+
with directory_record.open("r") as f:
186+
directory_structure = json.load(f)["directory_structure"]
187+
assert directory_structure == download_files
175188

176189

177190
def update_fixtures(

unstructured_ingest/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "1.2.1" # pragma: no cover
1+
__version__ = "1.2.2" # pragma: no cover

unstructured_ingest/interfaces/downloader.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,11 @@ class Downloader(BaseProcess, BaseConnector, ABC):
3636
def get_download_path(self, file_data: FileData) -> Optional[Path]:
3737
if not file_data.source_identifiers:
3838
return None
39+
3940
rel_path = file_data.source_identifiers.relative_path
4041
if not rel_path:
4142
return None
43+
4244
rel_path = rel_path[1:] if rel_path.startswith("/") else rel_path
4345
return self.download_dir / Path(rel_path)
4446

unstructured_ingest/processes/connectors/fsspec/fsspec.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,12 +264,31 @@ class FsspecDownloaderConfig(DownloaderConfig):
264264

265265
@dataclass
266266
class FsspecDownloader(Downloader):
267+
TEMP_DIR_PREFIX = "unstructured_"
268+
267269
protocol: str
268270
connection_config: FsspecConnectionConfigT
269271
connector_type: str = CONNECTOR_TYPE
270272
download_config: Optional[FsspecDownloaderConfigT] = field(
271273
default_factory=lambda: FsspecDownloaderConfig()
272274
)
275+
276+
def get_download_path(self, file_data: FileData) -> Optional[Path]:
277+
has_source_identifiers = file_data.source_identifiers is not None
278+
has_filename = has_source_identifiers and file_data.source_identifiers.filename
279+
280+
if not (has_source_identifiers and has_filename):
281+
return None
282+
283+
filename = file_data.source_identifiers.filename
284+
285+
mkdir_concurrent_safe(self.download_dir)
286+
287+
temp_dir = tempfile.mkdtemp(
288+
prefix=self.TEMP_DIR_PREFIX,
289+
dir=self.download_dir
290+
)
291+
return Path(temp_dir) / filename
273292

274293
def is_async(self) -> bool:
275294
with self.connection_config.get_client(protocol=self.protocol) as client:

0 commit comments

Comments
 (0)