From e8d838f6fb1968d4ec9ed63144284d53e1386691 Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Thu, 25 May 2017 17:07:01 +0200 Subject: [PATCH 1/5] Create mappings using index:schema from collection metadata --- kinto_elasticsearch/indexer.py | 8 ++- kinto_elasticsearch/listener.py | 3 +- tests/test_elasticsearch.py | 86 +++++++++++++++++++++++++++++++++ 3 files changed, 94 insertions(+), 3 deletions(-) diff --git a/kinto_elasticsearch/indexer.py b/kinto_elasticsearch/indexer.py index 646be7c..1a07177 100644 --- a/kinto_elasticsearch/indexer.py +++ b/kinto_elasticsearch/indexer.py @@ -18,11 +18,15 @@ def __init__(self, hosts, prefix="kinto", force_refresh=False): def indexname(self, bucket_id, collection_id): return "{}-{}-{}".format(self.prefix, bucket_id, collection_id) - def create_index(self, bucket_id, collection_id): + def create_index(self, bucket_id, collection_id, schema=None): indexname = self.indexname(bucket_id, collection_id) # Only if necessary. if not self.client.indices.exists(index=indexname): - self.client.indices.create(index=indexname) + if schema: + body = {"mappings": {indexname: schema}} + else: + body = None + self.client.indices.create(index=indexname, body=body) def delete_index(self, bucket_id, collection_id=None): if collection_id is None: diff --git a/kinto_elasticsearch/listener.py b/kinto_elasticsearch/listener.py index 06929e3..750dcb3 100644 --- a/kinto_elasticsearch/listener.py +++ b/kinto_elasticsearch/listener.py @@ -12,7 +12,8 @@ def on_collection_created(event): bucket_id = event.payload["bucket_id"] for created in event.impacted_records: collection_id = created["new"]["id"] - indexer.create_index(bucket_id, collection_id) + schema = created["new"].get("index:schema") + indexer.create_index(bucket_id, collection_id, schema=schema) def on_collection_deleted(event): diff --git a/tests/test_elasticsearch.py b/tests/test_elasticsearch.py index 955cfb9..2e94339 100644 --- a/tests/test_elasticsearch.py +++ b/tests/test_elasticsearch.py @@ -183,3 +183,89 @@ def test_search_is_not_allowed_if_only_read_on_certain_records(self): headers=headers) self.app.post("/buckets/bid/collections/cid/search", status=403, headers=headers) + + +class SchemaSupport(BaseWebTest, unittest.TestCase): + + schema = { + "properties": { + "id": {"type": "keyword"}, + "last_modified": {"type": "long"}, + "build": { + "properties": { + "date": {"type": "date"}, + "id": {"type": "keyword"} + } + } + } + } + + def setUp(self): + self.app.put("/buckets/bid", headers=self.headers) + body = {"data": {"index:schema": self.schema}} + self.app.put_json("/buckets/bid/collections/cid", body, headers=self.headers) + self.app.post_json("/buckets/bid/collections/cid/records", + {"data": {"build": {"id": "abc", "date": "2017-05-24"}}}, + headers=self.headers) + self.app.post_json("/buckets/bid/collections/cid/records", + {"data": {"build": {"id": "efg", "date": "2017-02-01"}}}, + headers=self.headers) + + def get_mapping(self, bucket_id, collection_id): + indexer = self.app.app.registry.indexer + indexname = indexer.indexname(bucket_id, collection_id) + index_mapping = indexer.client.indices.get_mapping(indexname) + return index_mapping[indexname]["mappings"][indexname] + + def test_index_has_mapping_if_collection_has_schema(self): + mapping = self.get_mapping("bid", "cid") + assert mapping == { + "properties": { + "id": {"type": "keyword"}, + "last_modified": {"type": "long"}, + "build": { + "properties": { + "date": {"type": "date"}, + "id": {"type": "keyword"} + } + } + } + } + + def test_can_search_for_subproperties(self): + body = { + "query": { + "bool" : { + "must" : { + "term" : { "build.id" : "abc" } + } + } + } + } + resp = self.app.post_json("/buckets/bid/collections/cid/search", body, + headers=self.headers) + result = resp.json + assert len(result["hits"]["hits"]) == 1 + assert result["hits"]["hits"][0]["_source"]["build"]["id"] == "abc" + + def test_can_aggregate_values(self): + body = { + "aggs" : { + "build_dates" : { + "terms": { + "field" : "build.id", + "size" : 1000 + } + } + } + } + resp = self.app.post_json("/buckets/bid/collections/cid/search", body, + headers=self.headers) + result = resp.json + assert result["aggregations"]["build_dates"]["buckets"] == [ + {"key": "abc", "doc_count": 1}, + {"key": "efg", "doc_count": 1}, + ] + + def test_mapping_is_updated_on_collection_update(self): + pass From 8d729f4746c85e620f5237571e8b9c7f8fa55793 Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Fri, 26 May 2017 09:52:51 +0200 Subject: [PATCH 2/5] Force date format in tests --- tests/test_elasticsearch.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_elasticsearch.py b/tests/test_elasticsearch.py index 2e94339..e47db3a 100644 --- a/tests/test_elasticsearch.py +++ b/tests/test_elasticsearch.py @@ -193,7 +193,7 @@ class SchemaSupport(BaseWebTest, unittest.TestCase): "last_modified": {"type": "long"}, "build": { "properties": { - "date": {"type": "date"}, + "date": {"type": "date", "format": "strict_date"}, "id": {"type": "keyword"} } } @@ -225,7 +225,7 @@ def test_index_has_mapping_if_collection_has_schema(self): "last_modified": {"type": "long"}, "build": { "properties": { - "date": {"type": "date"}, + "date": {"type": "date", "format": "strict_date"}, "id": {"type": "keyword"} } } From 5d52d58091dbd5cb3afc6b3fb4c36a5e5388f0d4 Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Fri, 26 May 2017 10:43:35 +0200 Subject: [PATCH 3/5] Manage collection metadata update --- kinto_elasticsearch/__init__.py | 2 ++ kinto_elasticsearch/indexer.py | 8 ++++++++ kinto_elasticsearch/listener.py | 14 +++++++++++++ tests/test_elasticsearch.py | 36 +++++++++++++++++++-------------- 4 files changed, 45 insertions(+), 15 deletions(-) diff --git a/kinto_elasticsearch/__init__.py b/kinto_elasticsearch/__init__.py index 2eda70f..f5c9f2f 100644 --- a/kinto_elasticsearch/__init__.py +++ b/kinto_elasticsearch/__init__.py @@ -26,6 +26,8 @@ def includeme(config): config.add_subscriber(listener.on_server_flushed, ServerFlushed) config.add_subscriber(listener.on_collection_created, AfterResourceChanged, for_resources=("collection",), for_actions=("create",)) + config.add_subscriber(listener.on_collection_updated, AfterResourceChanged, + for_resources=("collection",), for_actions=("update",)) config.add_subscriber(listener.on_collection_deleted, AfterResourceChanged, for_resources=("collection",), for_actions=("delete",)) config.add_subscriber(listener.on_bucket_deleted, AfterResourceChanged, diff --git a/kinto_elasticsearch/indexer.py b/kinto_elasticsearch/indexer.py index 1a07177..68e0244 100644 --- a/kinto_elasticsearch/indexer.py +++ b/kinto_elasticsearch/indexer.py @@ -28,6 +28,14 @@ def create_index(self, bucket_id, collection_id, schema=None): body = None self.client.indices.create(index=indexname, body=body) + def update_index(self, bucket_id, collection_id, schema=None): + indexname = self.indexname(bucket_id, collection_id) + if schema is None: + schema = {"properties": {}} + self.client.indices.put_mapping(index=indexname, + doc_type=indexname, + body=schema) + def delete_index(self, bucket_id, collection_id=None): if collection_id is None: collection_id = "*" diff --git a/kinto_elasticsearch/listener.py b/kinto_elasticsearch/listener.py index 750dcb3..e9b8265 100644 --- a/kinto_elasticsearch/listener.py +++ b/kinto_elasticsearch/listener.py @@ -16,6 +16,20 @@ def on_collection_created(event): indexer.create_index(bucket_id, collection_id, schema=schema) +def on_collection_updated(event): + indexer = event.request.registry.indexer + bucket_id = event.payload["bucket_id"] + for updated in event.impacted_records: + collection_id = updated["new"]["id"] + old_schema = updated["old"].get("index:schema") + new_schema = updated["new"].get("index:schema") + # Create if there was no index before. + if old_schema is None and new_schema is not None: + indexer.create_index(bucket_id, collection_id, schema=new_schema) + elif old_schema != new_schema: + indexer.update_index(bucket_id, collection_id, schema=new_schema) + + def on_collection_deleted(event): indexer = event.request.registry.indexer bucket_id = event.payload["bucket_id"] diff --git a/tests/test_elasticsearch.py b/tests/test_elasticsearch.py index e47db3a..0a8a3e7 100644 --- a/tests/test_elasticsearch.py +++ b/tests/test_elasticsearch.py @@ -1,3 +1,4 @@ +import copy import mock import time import unittest @@ -219,18 +220,26 @@ def get_mapping(self, bucket_id, collection_id): def test_index_has_mapping_if_collection_has_schema(self): mapping = self.get_mapping("bid", "cid") - assert mapping == { - "properties": { - "id": {"type": "keyword"}, - "last_modified": {"type": "long"}, - "build": { - "properties": { - "date": {"type": "date", "format": "strict_date"}, - "id": {"type": "keyword"} - } - } - } - } + assert sorted(mapping["properties"].keys()) == ["build", "id", "last_modified"] + + def test_mapping_is_updated_on_collection_update(self): + new_schema = copy.deepcopy(self.schema) + new_schema["properties"]["build"]["properties"]["id"]["ignore_above"] = 12 + + self.app.patch_json("/buckets/bid/collections/cid", + {"data": {"index:schema": new_schema}}, + headers=self.headers) + + mapping = self.get_mapping("bid", "cid") + assert mapping["properties"]["build"]["properties"]["id"]["ignore_above"] == 12 + + def test_mapping_is_preserved_when_index_metadata_is_removed(self): + self.app.put_json("/buckets/bid/collections/cid", + {"data": {}}, + headers=self.headers) + + mapping = self.get_mapping("bid", "cid") + assert "build" in mapping["properties"] def test_can_search_for_subproperties(self): body = { @@ -266,6 +275,3 @@ def test_can_aggregate_values(self): {"key": "abc", "doc_count": 1}, {"key": "efg", "doc_count": 1}, ] - - def test_mapping_is_updated_on_collection_update(self): - pass From 2f32c1f38a0344e7b7161a8de3251313e6c854f3 Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Fri, 26 May 2017 11:51:28 +0200 Subject: [PATCH 4/5] Fix edge case --- kinto_elasticsearch/indexer.py | 2 ++ tests/test_elasticsearch.py | 15 ++++++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/kinto_elasticsearch/indexer.py b/kinto_elasticsearch/indexer.py index 68e0244..1291579 100644 --- a/kinto_elasticsearch/indexer.py +++ b/kinto_elasticsearch/indexer.py @@ -27,6 +27,8 @@ def create_index(self, bucket_id, collection_id, schema=None): else: body = None self.client.indices.create(index=indexname, body=body) + else: + self.update_index(bucket_id, collection_id, schema) def update_index(self, bucket_id, collection_id, schema=None): indexname = self.indexname(bucket_id, collection_id) diff --git a/tests/test_elasticsearch.py b/tests/test_elasticsearch.py index 0a8a3e7..71d7336 100644 --- a/tests/test_elasticsearch.py +++ b/tests/test_elasticsearch.py @@ -216,7 +216,8 @@ def get_mapping(self, bucket_id, collection_id): indexer = self.app.app.registry.indexer indexname = indexer.indexname(bucket_id, collection_id) index_mapping = indexer.client.indices.get_mapping(indexname) - return index_mapping[indexname]["mappings"][indexname] + mappings = index_mapping[indexname]["mappings"] + return mappings.get(indexname, {}) def test_index_has_mapping_if_collection_has_schema(self): mapping = self.get_mapping("bid", "cid") @@ -233,6 +234,18 @@ def test_mapping_is_updated_on_collection_update(self): mapping = self.get_mapping("bid", "cid") assert mapping["properties"]["build"]["properties"]["id"]["ignore_above"] == 12 + def test_mapping_is_created_when_index_metadata_is_added(self): + self.app.put("/buckets/bid/collections/cid2", headers=self.headers) + mapping = self.get_mapping("bid", "cid2") + assert "build" not in mapping.get("properties", {}) + + self.app.patch_json("/buckets/bid/collections/cid2", + {"data": {"index:schema": self.schema}}, + headers=self.headers) + + mapping = self.get_mapping("bid", "cid2") + assert "build" in mapping["properties"] + def test_mapping_is_preserved_when_index_metadata_is_removed(self): self.app.put_json("/buckets/bid/collections/cid", {"data": {}}, From 7963ed24c9cf0df2298dd6574f2d6d70b08345ab Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Fri, 26 May 2017 12:07:22 +0200 Subject: [PATCH 5/5] Update docs --- CHANGELOG.rst | 1 + README.rst | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 5d01fc7..08c2aeb 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -13,6 +13,7 @@ Changelog - Add heartbeat (fixes #3) - Delete indices when buckets and collections are deleted (fixes #21) - Support quick search from querystring (fixes #34) +- Support defining mapping from the ``index:schema`` property in the collection metadata (ref #8) **Bug fixes** diff --git a/README.rst b/README.rst index c3909f9..591ac98 100644 --- a/README.rst +++ b/README.rst @@ -126,6 +126,38 @@ Or an advanced search using request body: } +Custom index mapping +-------------------- + +By default, ElasticSearch infers the data types from the indexed records. + +But it's possible to define the index mappings (ie. schema) from the collection metadata, +in the ``index:schema`` property: + +.. code-block:: bash + + $ echo '{ + "data": { + "index:schema": { + "properties": { + "id": {"type": "keyword"}, + "last_modified": {"type": "long"}, + "build": { + "properties": { + "date": {"type": "date", "format": "strict_date"}, + "id": {"type": "keyword"} + } + } + } + } + } + }' | http PATCH "http://localhost:8888/v1/buckets/blog/collections/builds" --auth token:admin-token --verbose + +Refer to ElasticSearch official documentation for more information about mappings. + +See also, `domapping `_ a CLI tool to convert JSON schemas to ElasticSearch mappings. + + Running the tests =================