Skip to content

Commit 79955e2

Browse files
authored
Add data operations for reading from zip, iterating over csv and json records, and writing to parquet (#6)
* Add data operations: reading from zip file, json and csv record parsing, writing to parquet * Update version to 0.1 * Fix linting; update to run multiple dependency versions * Run dependency version checks with specific python versions
1 parent a52122d commit 79955e2

File tree

19 files changed

+773
-11
lines changed

19 files changed

+773
-11
lines changed

.github/workflows/publish.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ jobs:
3030
- run: curl -sSL https://install.python-poetry.org | python - -y
3131
- run: poetry config virtualenvs.in-project true
3232
- run: make test
33+
- run: make test-dep-versions
3334

3435
build:
3536
needs:

.github/workflows/test.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,6 @@ jobs:
6868

6969
- name: Run tests
7070
run: make test
71+
72+
- name: Run tests on different dependency versions
73+
run: make test-dep-versions

Makefile

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ sources = src tests
44

55
.PHONY: prepare
66
prepare:
7-
poetry install
7+
poetry install --with ops
88

99

1010
.PHONY: lintable
@@ -21,13 +21,35 @@ lint: prepare
2121
poetry run mypy $(sources)
2222

2323

24-
2524
.PHONY: test
2625
test: prepare
2726
poetry run coverage run -m pytest
2827
poetry run coverage report
2928

3029

30+
.PHONY: test-python-versions
31+
test-python-versions:
32+
poetry env use python3.8
33+
make test
34+
35+
poetry env use python3.9
36+
make test
37+
38+
poetry env use python3.10
39+
make test
40+
41+
poetry env use python3.11
42+
make test
43+
44+
poetry env use python3.12
45+
make test
46+
47+
48+
.PHONY: test-dep-versions
49+
test-dep-versions: prepare
50+
./scripts/test_dependency_versions.sh
51+
52+
3153
.PHONY: clean
3254
clean:
3355
rm -rf `find . -name __pycache__`

README.md

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,68 @@ Chained operations in Python, applied to data processing.
88
pip install pipedata
99
```
1010

11-
## An Example
11+
## Examples
12+
13+
### Chaining Data Operations
14+
15+
pipedata.ops provides some operations for streaming data through memory.
16+
17+
```py
18+
import json
19+
import zipfile
20+
21+
import pyarrow.parquet as pq
22+
23+
from pipedata.core import StreamStart
24+
from pipedata.ops import json_records, parquet_writer, zipped_files
25+
26+
27+
data1 = [
28+
{"col1": 1, "col2": "Hello"},
29+
{"col1": 2, "col2": "world"},
30+
]
31+
data2 = [
32+
{"col1": 3, "col2": "!"},
33+
]
34+
35+
with zipfile.ZipFile("test_input.json.zip", "w") as zipped:
36+
zipped.writestr("file1.json", json.dumps(data1))
37+
zipped.writestr("file2.json", json.dumps(data2))
38+
39+
result = (
40+
StreamStart(["test_input.json.zip"])
41+
.flat_map(zipped_files)
42+
.flat_map(json_records())
43+
.flat_map(parquet_writer("test_output.parquet"))
44+
.to_list()
45+
)
46+
47+
table = pq.read_table("test_output.parquet")
48+
print(table.to_pydict())
49+
#> {'col1': [1, 2, 3], 'col2': ['Hello', 'world', '!']}
50+
```
51+
52+
Alternatively, you can construct the pipeline as a chain:
53+
54+
```py
55+
import pyarrow.parquet as pq
56+
57+
from pipedata.core import ChainStart, StreamStart
58+
from pipedata.ops import json_records, parquet_writer, zipped_files
59+
60+
# Running this after input file created in above example
61+
chain = (
62+
ChainStart()
63+
.flat_map(zipped_files)
64+
.flat_map(json_records())
65+
.flat_map(parquet_writer("test_output_2.parquet"))
66+
)
67+
result = StreamStart(["test_input.json.zip"]).flat_map(chain).to_list()
68+
table = pq.read_table("test_output_2.parquet")
69+
print(table.to_pydict())
70+
#> {'col1': [1, 2, 3], 'col2': ['Hello', 'world', '!']}
71+
72+
```
1273

1374
### Core Framework
1475

poetry.lock

Lines changed: 260 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
55

66
[tool.poetry]
77
name = "pipedata"
8-
version = "0.0.1"
8+
version = "0.1"
99
description = "Framework for building pipelines for data processing"
1010
authors = ["Simon Wicks <[email protected]>"]
1111
readme = "README.md"
@@ -35,17 +35,36 @@ packages = [{include = "pipedata", from = "src"}]
3535
[tool.poetry.dependencies]
3636
python = "^3.8"
3737

38+
[tool.poetry.group.ops.dependencies]
39+
fsspec = [
40+
{ version = ">=0.9.0", python = "<3.12" },
41+
{ version = ">=2022.1.0", python = ">=3.12,<3.13"},
42+
]
43+
ijson = "^3.0.0"
44+
pyarrow = [
45+
{ version = ">=9.0.0", python = "<3.11" },
46+
{ version = ">=11.0.0", python = ">=3.11,<3.12" },
47+
{ version = ">=14.0.0", python = ">=3.12,<=3.13" },
48+
]
49+
# We don't have a direct numpy dependency, but pyarrow depends on numpy
50+
# and numpy has python version constraints with python 3.12
51+
numpy = [
52+
{ version = "<1.25.0", python = "<3.9" },
53+
{ version = "^1.26.0", python = ">=3.12,<3.13" }
54+
]
3855

3956
[tool.poetry.group.lint.dependencies]
4057
black = "^23.9.1"
4158
ruff = "^0.1.3"
4259
mypy = "^1.6.0"
4360

44-
4561
[tool.poetry.group.test.dependencies]
4662
pytest = "^7.4.2"
4763
coverage = "^7.3.2"
4864

65+
[tool.poetry.group.ops]
66+
optional = true
67+
4968

5069
[tool.mypy]
5170
strict = true
@@ -103,7 +122,9 @@ keep-runtime-typing = true
103122
testpaths = "tests"
104123
xfail_strict = true
105124
filterwarnings = [
106-
"error"
125+
"error",
126+
"ignore:distutils Version classes:DeprecationWarning",
127+
"ignore:SelectableGroups dict:DeprecationWarning",
107128
]
108129

109130

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
#!/usr/bin/env bash
2+
3+
set -o errexit # Abort on non-zero exit status
4+
set -o nounset # Abort on unbound variable
5+
set -o pipefail # Abort on non-zero exit in pipeline
6+
7+
main() {
8+
PYTHON_MINOR_VERSION=$(poetry run python -c 'import sys; version=sys.version_info[:3]; print("{1}".format(*version))')
9+
echo "Python minor version: $PYTHON_MINOR_VERSION"
10+
11+
# The errors are mostly / all installation errors,
12+
# about building from source. Could lower
13+
# the requirements if able to build from source.
14+
if (( $PYTHON_MINOR_VERSION < "11" )); then
15+
poetry run pip install pyarrow==9.0.0
16+
poetry run python -m pytest
17+
18+
poetry run pip install pyarrow==10.0.0
19+
poetry run python -m pytest
20+
fi
21+
22+
if (( $PYTHON_MINOR_VERSION < "12" )); then
23+
poetry run pip install pyarrow==11.0.0
24+
poetry run python -m pytest
25+
26+
poetry run pip install pyarrow==13.0.0
27+
poetry run python -m pytest
28+
29+
poetry run pip install fsspec==0.9.0
30+
poetry run python -m pytest
31+
fi
32+
33+
poetry run pip install pyarrow==14.0.0
34+
poetry run python -m pytest
35+
36+
poetry run pip install ijson==3.0.0
37+
poetry run python -m pytest
38+
39+
poetry run pip install fsspec==2022.1.0
40+
poetry run python -m pytest
41+
}
42+
43+
main

src/pipedata/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
__version__ = "0.0.1"
1+
__version__ = "0.1"
22

33
__all__ = [
44
"__version__",

src/pipedata/core/chain.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
TOther = TypeVar("TOther")
2323

2424

25-
def _batched(iterable: Iterator[TEnd], n: Optional[int]) -> Iterator[Tuple[TEnd, ...]]:
25+
def batched(iterable: Iterator[TEnd], n: Optional[int]) -> Iterator[Tuple[TEnd, ...]]:
2626
"""Can be replaced by itertools.batched once using Python 3.12+."""
2727
while (elements := tuple(itertools.islice(iterable, n))) != ():
2828
yield elements
@@ -160,7 +160,7 @@ def batched_map(
160160

161161
@functools.wraps(func)
162162
def new_action(previous_step: Iterator[TEnd]) -> Iterator[TOther]:
163-
for elements in _batched(previous_step, n):
163+
for elements in batched(previous_step, n):
164164
yield func(elements)
165165

166166
return self.flat_map(new_action)

src/pipedata/ops/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from .files import zipped_files
2+
from .records import csv_records, json_records
3+
from .storage import parquet_writer
4+
5+
__all__ = [
6+
"zipped_files",
7+
"csv_records",
8+
"json_records",
9+
"parquet_writer",
10+
]

0 commit comments

Comments
 (0)