Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions .github/workflows/python-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ jobs:
strategy:
fail-fast: false
matrix:
runner: [ubuntu-22.04, macos-13, macos-14]
# macos-15-intel planned to end August 2027
runner: [ubuntu-22.04, macos-15-intel, macos-14]
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13"]
test-run:
- name: "S3"
Expand All @@ -50,8 +51,8 @@ jobs:
profile-role: ${{ vars.PROFILE_IAM_ROLE }}
profile-bucket: ${{ vars.S3_EXPRESS_PROFILE_BUCKET }}
exclude:
# For Python 3.13, PyTorch does not support macos-13/x86_64, only macos-14/arm64.
- runner: macos-13
# For Python 3.13, PyTorch does not support macos-15-intel/x86_64, only macos-14/arm64.
- runner: macos-15-intel
python-version: "3.13"
- runner: macos-14
python-version: "3.8"
Expand Down Expand Up @@ -136,11 +137,11 @@ jobs:
pytest s3torchconnector/tst/e2e/test_e2e_s3_lightning_checkpoint.py -n auto

- name: Install DCP dependencies
if: matrix.runner != 'macos-13'
if: matrix.runner != 'macos-15-intel'
run: |
python -m pip install './s3torchconnector[dcp-test]'
- name: Run s3torchconnector DCP e2e tests
if: matrix.runner != 'macos-13'
if: matrix.runner != 'macos-15-intel'
run: |
CI_REGION=${{ matrix.test-run.region }} \
CI_BUCKET=${{ matrix.test-run.bucket }} \
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/rust-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ jobs:
name: Rust tests
strategy:
matrix:
runner: [ubuntu-22.04, macos-13]
# macos-15-intel planned to end August 2027
runner: [ubuntu-22.04, macos-15-intel]
steps:
- name: Checkout code
uses: actions/checkout@v5
Expand Down
7 changes: 4 additions & 3 deletions .github/workflows/wheels.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,18 @@ jobs:
# - runner: ubuntu-20.04
# kind: manylinux
# arch: x86_64
- runner: macos-13
# macos-15-intel planned to end August 2027
- runner: macos-15-intel
kind: macosx
arch: x86_64
- runner: macos-14
kind: macosx
arch: arm64
# cp313 macos-13 (x86_64) is not supported by PyTorch
# cp313 macos-15-intel (x86_64) is not supported by PyTorch
exclude:
- python: cp313
builder:
runner: macos-13
runner: macos-15-intel
kind: macosx
arch: x86_64
permissions:
Expand Down
13 changes: 11 additions & 2 deletions s3torchconnector/tst/e2e/test_distributed_training.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ def dataloader_for_map(
)
sampler = DistributedSampler(dataset)
dataloader = DataLoader(
dataset, batch_size=batch_size, num_workers=num_workers, sampler=sampler
dataset,
batch_size=batch_size,
num_workers=num_workers,
sampler=sampler,
multiprocessing_context=mp.get_context(),
)
return dataloader

Expand All @@ -93,7 +97,12 @@ def dataloader_for_iterable(
enable_sharding=True,
reader_constructor=reader_constructor,
)
dataloader = DataLoader(dataset, batch_size=batch_size, num_workers=num_workers)
dataloader = DataLoader(
dataset,
batch_size=batch_size,
num_workers=num_workers,
multiprocessing_context=mp.get_context(),
)
return dataloader


Expand Down
21 changes: 16 additions & 5 deletions s3torchconnector/tst/e2e/test_e2e_s3_lightning_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from lightning.pytorch.demos import WikiText2
from lightning.pytorch.plugins import AsyncCheckpointIO
from torch.utils.data import DataLoader
import torch.multiprocessing as mp

from s3torchconnector import S3Checkpoint
from s3torchconnector._s3client import S3Client
Expand Down Expand Up @@ -79,7 +80,9 @@ def test_delete_checkpoint(checkpoint_directory):
def test_load_trained_checkpoint(checkpoint_directory):
nonce = random.randrange(2**64)
dataset = WikiText2(data_dir=Path(f"/tmp/data/{nonce}"))
dataloader = DataLoader(dataset, num_workers=3)
dataloader = DataLoader(
dataset, num_workers=3, multiprocessing_context=mp.get_context()
)
model = LightningTransformer(vocab_size=dataset.vocab_size)
trainer = L.Trainer(accelerator=LIGHTNING_ACCELERATOR, fast_dev_run=2)
trainer.fit(model=model, train_dataloaders=dataloader)
Expand All @@ -95,7 +98,9 @@ def test_load_trained_checkpoint(checkpoint_directory):
def test_compatibility_with_trainer_plugins(checkpoint_directory):
nonce = random.randrange(2**64)
dataset = WikiText2(data_dir=Path(f"/tmp/data/{nonce}"))
dataloader = DataLoader(dataset, num_workers=3)
dataloader = DataLoader(
dataset, num_workers=3, multiprocessing_context=mp.get_context()
)
model = LightningTransformer(vocab_size=dataset.vocab_size)
s3_lightning_checkpoint = S3LightningCheckpoint(region=checkpoint_directory.region)
_verify_user_agent(s3_lightning_checkpoint)
Expand All @@ -121,7 +126,9 @@ def test_compatibility_with_trainer_plugins(checkpoint_directory):
def test_compatibility_with_checkpoint_callback(checkpoint_directory):
nonce = random.randrange(2**64)
dataset = WikiText2(data_dir=Path(f"/tmp/data/{nonce}"))
dataloader = DataLoader(dataset, num_workers=3)
dataloader = DataLoader(
dataset, num_workers=3, multiprocessing_context=mp.get_context()
)

model = LightningTransformer(vocab_size=dataset.vocab_size)
s3_lightning_checkpoint = S3LightningCheckpoint(checkpoint_directory.region)
Expand Down Expand Up @@ -161,7 +168,9 @@ def test_compatibility_with_checkpoint_callback(checkpoint_directory):
def test_compatibility_with_async_checkpoint_io(checkpoint_directory):
nonce = random.randrange(2**64)
dataset = WikiText2(data_dir=Path(f"/tmp/data/{nonce}"))
dataloader = DataLoader(dataset, num_workers=3)
dataloader = DataLoader(
dataset, num_workers=3, multiprocessing_context=mp.get_context()
)

model = LightningTransformer(vocab_size=dataset.vocab_size)
s3_lightning_checkpoint = S3LightningCheckpoint(checkpoint_directory.region)
Expand Down Expand Up @@ -192,7 +201,9 @@ def test_compatibility_with_async_checkpoint_io(checkpoint_directory):
def test_compatibility_with_lightning_checkpoint_load(checkpoint_directory):
nonce = random.randrange(2**64)
dataset = WikiText2(data_dir=Path(f"/tmp/data/{nonce}"))
dataloader = DataLoader(dataset, num_workers=3)
dataloader = DataLoader(
dataset, num_workers=3, multiprocessing_context=mp.get_context()
)
model = LightningTransformer(vocab_size=dataset.vocab_size)
s3_lightning_checkpoint = S3LightningCheckpoint(region=checkpoint_directory.region)
trainer = L.Trainer(
Expand Down
16 changes: 13 additions & 3 deletions s3torchconnector/tst/e2e/test_multiprocess_dataloading.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pytest
from torch.utils.data import DataLoader, get_worker_info
from torchdata.datapipes.iter import IterableWrapper
import torch.multiprocessing as mp

from s3torchconnector import (
S3IterableDataset,
Expand Down Expand Up @@ -85,7 +86,12 @@ def test_s3iterable_dataset_multiprocess_torchdata(
batch_size = 2
num_workers = 3

dataloader = DataLoader(dataset, batch_size=batch_size, num_workers=num_workers)
dataloader = DataLoader(
dataset,
batch_size=batch_size,
num_workers=num_workers,
multiprocessing_context=mp.get_context(),
)

total_objects = 0
uris_seen = Counter()
Expand Down Expand Up @@ -123,7 +129,9 @@ def test_s3iterable_dataset_multiprocess(
num_epochs = 2
num_images = len(image_directory.contents)

dataloader = DataLoader(dataset, num_workers=num_workers)
dataloader = DataLoader(
dataset, num_workers=num_workers, multiprocessing_context=mp.get_context()
)
counter = 0
for epoch in range(num_epochs):
s3keys = Counter()
Expand Down Expand Up @@ -160,7 +168,9 @@ def test_s3mapdataset_multiprocess(
num_epochs = 2
num_images = len(image_directory.contents)

dataloader = DataLoader(dataset, num_workers=num_workers)
dataloader = DataLoader(
dataset, num_workers=num_workers, multiprocessing_context=mp.get_context()
)

for epoch in range(num_epochs):
s3keys = Counter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def _start_subprocess(
debug_logs_config: str = "",
logs_directory: str = "",
):
process = subprocess.Popen(
result = subprocess.run(
[
sys.executable,
"-c",
Expand All @@ -190,11 +190,10 @@ def _start_subprocess(
debug_logs_config,
logs_directory,
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
capture_output=True,
text=True,
)
return process.communicate()
return result.stdout, result.stderr


def _read_log_file(log_file: str):
Expand Down
Loading