|
| 1 | +import json |
| 2 | +import types |
| 3 | +import unittest |
| 4 | +from collections.abc import Iterator |
| 5 | +from pathlib import Path |
| 6 | +from tempfile import NamedTemporaryFile |
| 7 | + |
| 8 | +from hyperbase.hyperedge import hedge |
| 9 | +from hyperbase.load_edges import load_edges |
| 10 | +from hyperbase.parsers.parse_result import ParseResult |
| 11 | + |
| 12 | + |
| 13 | +def _make_parse_result() -> ParseResult: |
| 14 | + edge = hedge("(is a/1 b/1)") |
| 15 | + tok_pos = hedge("(1 0 2)") |
| 16 | + return ParseResult( |
| 17 | + edge=edge, |
| 18 | + text="a is b", |
| 19 | + tokens=["a", "is", "b"], |
| 20 | + tok_pos=tok_pos, |
| 21 | + ) |
| 22 | + |
| 23 | + |
| 24 | +class TestLoadEdgesFromSequence(unittest.TestCase): |
| 25 | + def test_list_of_strings(self): |
| 26 | + edges = load_edges(["(is a/1 b/1)", "(src x/1 y/1)"]) |
| 27 | + assert isinstance(edges, list) |
| 28 | + assert len(edges) == 2 |
| 29 | + assert str(edges[0]) == "(is a/1 b/1)" |
| 30 | + assert str(edges[1]) == "(src x/1 y/1)" |
| 31 | + |
| 32 | + def test_list_of_hyperedges(self): |
| 33 | + h1 = hedge("(is a/1 b/1)") |
| 34 | + h2 = hedge("(src x/1 y/1)") |
| 35 | + edges = load_edges([h1, h2]) |
| 36 | + assert edges[0] == h1 |
| 37 | + assert edges[1] == h2 |
| 38 | + |
| 39 | + def test_list_of_dicts(self): |
| 40 | + pr = _make_parse_result() |
| 41 | + edges = load_edges([pr.to_dict()]) |
| 42 | + assert len(edges) == 1 |
| 43 | + assert str(edges[0][0]) == "is" |
| 44 | + |
| 45 | + def test_mixed_types(self): |
| 46 | + pr = _make_parse_result() |
| 47 | + edges = load_edges(["(src x/1 y/1)", pr.to_dict(), hedge("hello/1")]) |
| 48 | + assert len(edges) == 3 |
| 49 | + |
| 50 | + def test_generator_input(self): |
| 51 | + def gen() -> Iterator[str]: |
| 52 | + yield "(is a/1 b/1)" |
| 53 | + yield "(src x/1 y/1)" |
| 54 | + |
| 55 | + edges = load_edges(gen()) |
| 56 | + assert len(edges) == 2 |
| 57 | + |
| 58 | + def test_parse_result_items(self): |
| 59 | + pr = _make_parse_result() |
| 60 | + edges = load_edges([pr]) |
| 61 | + assert len(edges) == 1 |
| 62 | + |
| 63 | + |
| 64 | +class TestLoadEdgesLazy(unittest.TestCase): |
| 65 | + def test_lazy_returns_generator(self): |
| 66 | + result = load_edges(["(is a/1 b/1)"], lazy=True) |
| 67 | + assert isinstance(result, types.GeneratorType) |
| 68 | + edges = list(result) |
| 69 | + assert len(edges) == 1 |
| 70 | + assert str(edges[0]) == "(is a/1 b/1)" |
| 71 | + |
| 72 | + def test_eager_returns_list(self): |
| 73 | + result = load_edges(["(is a/1 b/1)"], lazy=False) |
| 74 | + assert isinstance(result, list) |
| 75 | + |
| 76 | + |
| 77 | +class TestLoadEdgesFromTextFile(unittest.TestCase): |
| 78 | + def test_text_file(self): |
| 79 | + with NamedTemporaryFile( |
| 80 | + mode="w", suffix=".txt", delete=False, encoding="utf-8" |
| 81 | + ) as f: |
| 82 | + f.write("(is a/1 b/1)\n") |
| 83 | + f.write("\n") |
| 84 | + f.write("(src x/1 y/1)\n") |
| 85 | + path = f.name |
| 86 | + |
| 87 | + edges = load_edges(path) |
| 88 | + assert len(edges) == 2 |
| 89 | + assert str(edges[0]) == "(is a/1 b/1)" |
| 90 | + assert str(edges[1]) == "(src x/1 y/1)" |
| 91 | + Path(path).unlink() |
| 92 | + |
| 93 | + def test_text_file_with_path_object(self): |
| 94 | + with NamedTemporaryFile( |
| 95 | + mode="w", suffix=".txt", delete=False, encoding="utf-8" |
| 96 | + ) as f: |
| 97 | + f.write("hello/1\n") |
| 98 | + path = Path(f.name) |
| 99 | + |
| 100 | + edges = load_edges(path) |
| 101 | + assert len(edges) == 1 |
| 102 | + assert str(edges[0]) == "hello/1" |
| 103 | + path.unlink() |
| 104 | + |
| 105 | + |
| 106 | +class TestLoadEdgesFromJsonlFile(unittest.TestCase): |
| 107 | + def test_jsonl_file(self): |
| 108 | + pr1 = _make_parse_result() |
| 109 | + pr2 = _make_parse_result() |
| 110 | + |
| 111 | + with NamedTemporaryFile( |
| 112 | + mode="w", suffix=".jsonl", delete=False, encoding="utf-8" |
| 113 | + ) as f: |
| 114 | + f.write(pr1.to_json() + "\n") |
| 115 | + f.write("\n") |
| 116 | + f.write(pr2.to_json() + "\n") |
| 117 | + path = f.name |
| 118 | + |
| 119 | + edges = load_edges(path) |
| 120 | + assert len(edges) == 2 |
| 121 | + Path(path).unlink() |
| 122 | + |
| 123 | + |
| 124 | +class TestLoadEdgesFromJsonFile(unittest.TestCase): |
| 125 | + def test_json_file_with_strings(self): |
| 126 | + data = ["(is a/1 b/1)", "(src x/1 y/1)"] |
| 127 | + with NamedTemporaryFile( |
| 128 | + mode="w", suffix=".json", delete=False, encoding="utf-8" |
| 129 | + ) as f: |
| 130 | + json.dump(data, f) |
| 131 | + path = f.name |
| 132 | + |
| 133 | + edges = load_edges(path) |
| 134 | + assert len(edges) == 2 |
| 135 | + assert str(edges[0]) == "(is a/1 b/1)" |
| 136 | + Path(path).unlink() |
| 137 | + |
| 138 | + def test_json_file_with_dicts(self): |
| 139 | + pr = _make_parse_result() |
| 140 | + data = [pr.to_dict()] |
| 141 | + with NamedTemporaryFile( |
| 142 | + mode="w", suffix=".json", delete=False, encoding="utf-8" |
| 143 | + ) as f: |
| 144 | + json.dump(data, f) |
| 145 | + path = f.name |
| 146 | + |
| 147 | + edges = load_edges(path) |
| 148 | + assert len(edges) == 1 |
| 149 | + Path(path).unlink() |
| 150 | + |
| 151 | + |
| 152 | +class TestLoadEdgesStringNotFile(unittest.TestCase): |
| 153 | + def test_string_not_existing_file_iterates_chars(self): |
| 154 | + """A string that is not an existing file path is treated as an |
| 155 | + iterable of characters, each fed to hedge as an atom.""" |
| 156 | + edges = load_edges("ab") |
| 157 | + assert len(edges) == 2 |
| 158 | + assert str(edges[0]) == "a" |
| 159 | + assert str(edges[1]) == "b" |
0 commit comments