Skip to content

Commit 9411812

Browse files
committed
Core: JSONL/NDJSON support, and invoke single resource on project file
1 parent b9e405b commit 9411812

File tree

9 files changed

+131
-36
lines changed

9 files changed

+131
-36
lines changed

CHANGES.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
## Unreleased
44
- CLI: Added CLI interface, per `tikray` program
5-
- Core: Started using `orjson` package
5+
- Core: Started using `orjson` and `orjsonl` packages, for performance
6+
reasons and JSONL/NDJSON support
7+
- Core: Optionally invoke single resource on project file
68

79
## 2025/02/05 v0.0.23
810
- Renamed package to `loko`, then `tikray`

docs/backlog.md

+7-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
# Backlog
22

33
## Iteration +1
4+
- Support for JSONL
5+
- Read and write compressed files. orjson uses `xopen` already, which can do it transparently
6+
- Allow processing larger-than-memory files
7+
8+
## Iteration +2
49
- [ ] Documentation: jqlang stdlib's `to_object` function for substructure management
510
- [ ] Documentation: Type casting
611
`echo '{"a": 42, "b": {}, "c": []}' | jq -c '.|= (.b |= objects | .c |= objects)'`
@@ -36,8 +41,7 @@ and ._id != "55d71c8ce4b02210dc47b10f"
3641
#and (.value.urls | type) != "object"
3742
```
3843

39-
40-
## Iteration +2
44+
## Iteration +3
4145
- [ ] CLI interface
4246
- [ ] Documentation: Add Python example to "Synopsis" section on /index.html
4347
- [ ] Documentation: Compare with Seatunnel
@@ -55,7 +59,7 @@ Demonstrate more use cases, like...
5559
- https://github.com/MeltanoLabs/meltano-map-transform/issues/252
5660
- [ ] Use JSONPath, see https://sdk.meltano.com/en/v0.39.1/code_samples.html#use-a-jsonpath-expression-to-extract-the-next-page-url-from-a-hateoas-response
5761

58-
## Iteration +3
62+
## Iteration +4
5963
- [ ] Moksha transformations on Buckets
6064
- [ ] Fluent API interface
6165
```python

docs/cli.md

+25-2
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,41 @@ tikray \
2020

2121
### Single collection
2222

23-
Process a single collection / file.
23+
Process a single resource (collection, file) using a Tikray collection transformation.
2424
```shell
2525
tikray -t transformation-collection.yaml -i eai-warehouse.json
2626
```
2727

2828
### Multiple collections
2929

30-
Process multiple collections / files from a directory.
30+
Process multiple resources (collections, files) from a directory.
31+
The Tikray project file enumerates multiple transformation rules per resource.
3132
```shell
3233
tikray -t examples/transformation-project.yaml -i examples/acme -o tmp/acme
3334
```
3435

36+
If you are using a Tikray project file, but would like to only invoke a
37+
single-resource transformation on it, you need to explicitly specify the
38+
resource address using `--address`/`-a`, so the engine will only select
39+
this particular collection.
40+
```shell
41+
tikray -t examples/transformation-project.yaml -i examples/acme/conversation.json -a acme.conversation
42+
```
43+
44+
### JSONL support
45+
46+
Tikray supports reading and writing the JSONL/NDJSON format, i.e. newline-
47+
delimited JSON. Tikray will automatically use this mode if it receives
48+
input files suffixed with `.jsonl` or `.ndjson`, or if you explicitly
49+
toggle the mode per `--jsonl` option flag.
50+
```shell
51+
tikray -t transformation.yaml -i input.jsonl
52+
```
53+
```shell
54+
tikray -t transformation.yaml -i input.json -a acme.conversation --jsonl -o output.jsonl
55+
```
56+
57+
3558
## Example
3659

3760
`$ cat eai-warehouse.json`

pyproject.toml

+1
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ dependencies = [
101101
"jq<1.9",
102102
"jsonpointer<4",
103103
"orjson<4",
104+
"orjsonl<2",
104105
"python-dateutil<2.10",
105106
"pyyaml<7",
106107
"toolz<0.13",

src/tikray/cli.py

+8-5
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,26 @@
1414
@click.option("--transformation", "-t", type=Path, required=True, help="Transformation YAML file")
1515
@click.option("--input", "-i", "input_", type=Path, required=True, help="Input file or directory")
1616
@click.option("--output", "-o", type=Path, required=False, help="Output file or directory")
17+
@click.option("--address", "-a", type=str, required=False, help="Select specific collection address")
18+
@click.option("--jsonl", type=bool, is_flag=True, required=False, help="Select JSONL/NDJSON processing")
1719
@click.version_option()
1820
@click.pass_context
19-
def cli(ctx: click.Context, transformation: Path, input_: Path, output: Path) -> None:
21+
def cli(ctx: click.Context, transformation: Path, input_: Path, output: Path, address: str, jsonl: bool) -> None:
2022
setup_logging()
2123
tdata = yaml.safe_load(transformation.read_text())
2224
type_ = tdata["meta"]["type"]
2325

24-
if type_ in ["tikray-project", "zyp-project"]:
26+
if type_ in ["tikray-collection", "zyp-collection"] or address is not None: # noqa: RET506
27+
return process_collection(transformation, input_, output, address, jsonl)
28+
29+
elif type_ in ["tikray-project", "zyp-project"]:
2530
if not input_.is_dir():
2631
raise click.ClickException(f"Input is not a directory: {input_}")
2732
if output is None:
2833
raise click.ClickException("Processing multiple collections requires an output directory")
2934
if not output.is_dir():
3035
raise click.ClickException(f"Output is not a directory: {output}")
31-
return process_project(transformation, input_, output)
36+
return process_project(transformation, input_, output, jsonl)
3237

33-
elif type_ in ["tikray-collection", "zyp-collection"]: # noqa: RET506
34-
return process_collection(transformation, input_, output)
3538
else:
3639
raise NotImplementedError(f"Unknown transformation type: {type_}")

src/tikray/core.py

+19-15
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,15 @@
11
import logging
2-
import sys
32
import typing as t
43
from pathlib import Path
54

6-
import orjson as json
7-
85
from tikray.model.collection import CollectionAddress, CollectionTransformation
96
from tikray.model.project import TransformationProject
10-
from tikray.util.data import jd
7+
from tikray.util.data import load_json, save_json
118

129
logger = logging.getLogger(__name__)
1310

1411

15-
def process_project(transformation: Path, input_: Path, output: Path):
12+
def process_project(transformation: Path, input_: Path, output: Path, use_jsonl: bool = False):
1613
logger.info(f"Using transformation '{transformation}' on multi-collection input '{input_}'")
1714

1815
project = TransformationProject.from_yaml(transformation.read_text())
@@ -24,22 +21,29 @@ def process_project(transformation: Path, input_: Path, output: Path):
2421
except KeyError as ex:
2522
logger.warning(f"Could not find transformation definition for collection: {ex}")
2623
continue
27-
data = json.loads(Path(item).read_text())
24+
data = load_json(Path(item), use_jsonl=use_jsonl)
2825
output_path = output / item.name
29-
with open(output_path, "wb") as output_stream:
30-
output_stream.write(jd(tikray_transformation.apply(data)))
31-
logger.info(f"Processed output: {output_path}")
26+
save_json(tikray_transformation.apply(data), output_path, use_jsonl=use_jsonl)
27+
logger.info(f"Processed output: {output_path}")
3228

3329

34-
def process_collection(transformation: Path, input_: Path, output: t.Optional[Path] = None):
30+
def process_collection(
31+
transformation: Path,
32+
input_: Path,
33+
output: t.Optional[Path] = None,
34+
address: t.Optional[str] = None,
35+
use_jsonl: bool = False,
36+
):
3537
logger.info(f"Using transformation '{transformation}' on single-collection input '{input_}'")
36-
data = json.loads(input_.read_text())
3738
ct = CollectionTransformation.from_yaml(transformation.read_text())
39+
if address is not None:
40+
pt = TransformationProject.from_yaml(transformation.read_text())
41+
ct = pt.get(CollectionAddress(*address.split(".")))
42+
logger.info(f"Processing input: {input_}")
43+
data = load_json(input_, use_jsonl=use_jsonl)
3844
result = ct.apply(data)
39-
output_stream = sys.stdout.buffer
4045
if output is not None:
4146
if output.is_dir():
4247
output = output / input_.name
43-
output_stream = open(output, "wb")
44-
output_stream.write(jd(result))
45-
output_stream.flush()
48+
save_json(result, output, use_jsonl=use_jsonl)
49+
logger.info(f"Processed output: {output or 'stdout'}")

src/tikray/util/data.py

+27-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
import sys
12
import typing as t
3+
from pathlib import Path
24

3-
import orjson as json
5+
import orjson
6+
import orjsonl
47

58

69
def no_privates_no_nulls_no_empties(key, value) -> bool:
@@ -22,5 +25,26 @@ def no_disabled_false(key, value):
2225
return not (key.name == "disabled" and value is False)
2326

2427

25-
def jd(thing: t.Any):
26-
return json.dumps(thing)
28+
def load_json(path: Path, use_jsonl: bool = False) -> t.Any:
29+
if path.suffix in [".jsonl", ".ndjson"] or use_jsonl:
30+
return orjsonl.load(path)
31+
else:
32+
return orjson.loads(path.read_text())
33+
34+
35+
def save_json(data: t.Any, path: t.Optional[Path] = None, use_jsonl: bool = False) -> None:
36+
# Sanity checks.
37+
if use_jsonl and not path:
38+
raise NotImplementedError("JSONL not supported on STDOUT yet, please raise an issue")
39+
40+
# Output JSONL.
41+
if path and (path.suffix in [".jsonl", ".ndjson"] or use_jsonl):
42+
orjsonl.save(path, data)
43+
44+
# Output JSON.
45+
elif path:
46+
with open(path, "wb") as stream:
47+
stream.write(orjson.dumps(data))
48+
49+
else:
50+
sys.stdout.buffer.write(orjson.dumps(data))

tests/test_cli.py

+23-7
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88
{"id": 34, "meta": {"name": "bar", "location": "BY"}, "data": {"value": -84.01}},
99
]
1010

11+
acme_conversation_reference = json.loads((Path("tests") / "examples" / "conversation.json").read_text())
12+
1113

1214
def test_cli_collection_stdout_success(cli_runner):
1315
"""
14-
CLI test: Invoke `tikray` with example data.
16+
CLI test: Single resource to STDOUT.
1517
"""
1618

1719
result = cli_runner.invoke(
@@ -26,7 +28,7 @@ def test_cli_collection_stdout_success(cli_runner):
2628

2729
def test_cli_collection_file_output_success(cli_runner, tmp_path):
2830
"""
29-
CLI test: Invoke `tikray` with example data.
31+
CLI test: Single resource to file.
3032
"""
3133

3234
output_path = tmp_path / "output.json"
@@ -43,7 +45,7 @@ def test_cli_collection_file_output_success(cli_runner, tmp_path):
4345

4446
def test_cli_collection_directory_output_success(cli_runner, tmp_path):
4547
"""
46-
CLI test: Invoke `tikray` with example data.
48+
CLI test: Single resource to directory.
4749
"""
4850

4951
result = cli_runner.invoke(
@@ -60,7 +62,7 @@ def test_cli_collection_directory_output_success(cli_runner, tmp_path):
6062

6163
def test_cli_project_success(cli_runner, tmp_path):
6264
"""
63-
CLI test: Invoke `tikray` with example data.
65+
CLI test: Multiple resources (project) to directory.
6466
"""
6567

6668
result = cli_runner.invoke(
@@ -69,14 +71,28 @@ def test_cli_project_success(cli_runner, tmp_path):
6971
catch_exceptions=False,
7072
)
7173
assert result.exit_code == 0
72-
reference = json.loads((Path("tests") / "examples" / "conversation.json").read_text())
7374
output = json.loads(Path(tmp_path / "conversation.json").read_text())
74-
assert output == reference
75+
assert output == acme_conversation_reference
76+
77+
78+
def test_cli_collection_from_project_file_success(cli_runner, tmp_path):
79+
"""
80+
CLI test: Single resource from Tikray project file.
81+
"""
82+
83+
result = cli_runner.invoke(
84+
cli,
85+
args="-t examples/transformation-project.yaml -i examples/acme/conversation.json -a acme.conversation",
86+
catch_exceptions=False,
87+
)
88+
assert result.exit_code == 0
89+
data = json.loads(result.output)
90+
assert data == acme_conversation_reference
7591

7692

7793
def test_cli_project_warning_no_transformation(cli_runner, tmp_path, caplog):
7894
"""
79-
CLI test: Invoke `tikray` with example data.
95+
CLI test: Verify processing multiple resources emits warnings on missing ones.
8096
"""
8197

8298
project = tmp_path / "project"

tests/test_util.py

+18
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1+
from pathlib import Path
2+
13
import jmespath
24
import jq
35
import jsonpointer
6+
import orjsonl
47
import pytest
58
import transon
9+
from tikray.util.data import load_json, save_json
610
from tikray.util.expression import compile_expression
711
from tikray.util.locator import to_pointer
812

@@ -48,3 +52,17 @@ def test_compile_expression_unknown():
4852
with pytest.raises(TypeError) as ex:
4953
compile_expression(type="foobar", expression=None)
5054
assert ex.match("Compilation failed. Type must be either jmes or jq or transon: foobar")
55+
56+
57+
def test_load_jsonl_by_suffix(tmp_path: Path):
58+
data = load_json(Path("examples/eai-warehouse.json"))
59+
tmp_path = tmp_path / "testdrive.jsonl"
60+
orjsonl.save(tmp_path, [data])
61+
assert load_json(tmp_path) == [data]
62+
63+
64+
def test_load_jsonl_by_flag(tmp_path: Path):
65+
data = load_json(Path("examples/eai-warehouse.json"))
66+
tmp_path = tmp_path / "testdrive.json"
67+
save_json([data], tmp_path, use_jsonl=True)
68+
assert load_json(tmp_path, use_jsonl=True) == [data]

0 commit comments

Comments
 (0)