Skip to content
This repository has been archived by the owner on Aug 5, 2024. It is now read-only.

Commit

Permalink
Merge pull request #56 from Kinto/20-index-existing-records
Browse files Browse the repository at this point in the history
Add command to reindex existing data (fixes #20)
  • Loading branch information
leplatrem authored Sep 12, 2017
2 parents 6e564ed + c583578 commit 9f344cd
Show file tree
Hide file tree
Showing 10 changed files with 238 additions and 19 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,6 @@ tests-once: install-dev

tests: tox
$(VENV)/bin/tox

flake8: install-dev
$(VENV)/bin/flake8 kinto_elasticsearch tests
1 change: 1 addition & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
flake8
pytest
pytest-cache
pytest-cover
Expand Down
123 changes: 123 additions & 0 deletions kinto_elasticsearch/command_reindex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import argparse
import elasticsearch
import logging
import sys

from pyramid.paster import bootstrap

from kinto.core.storage.exceptions import RecordNotFoundError
from kinto.core.storage import Sort, Filter
from kinto.core.utils import COMPARISON


DEFAULT_CONFIG_FILE = 'config/kinto.ini'

logger = logging.getLogger(__package__)


def main(cli_args=None):
if cli_args is None:
cli_args = sys.argv[1:]

parser = argparse.ArgumentParser()
parser.add_argument('--ini',
help='Application configuration file',
dest='ini_file',
required=False,
default=DEFAULT_CONFIG_FILE)
parser.add_argument('-b', '--bucket',
help='Bucket name.',
type=str)
parser.add_argument('-c', '--collection',
help='Collection name.',
type=str)
args = parser.parse_args(args=cli_args)

print("Load config...")
env = bootstrap(args.ini_file)
registry = env['registry']

# Make sure that kinto-elasticsearch is configured.
try:
indexer = registry.indexer
except AttributeError:
logger.error("kinto-elasticsearch not available.")
return 62

bucket_id = args.bucket
collection_id = args.collection

# Get index schema from collection metadata.
try:
schema = get_index_schema(registry.storage, bucket_id, collection_id)
except RecordNotFoundError:
logger.error("No collection '%s' in bucket '%s'" % (collection_id, bucket_id))
return 63

# Give up if collection has no index mapping.
if schema is None:
logger.error("No `index:schema` attribute found in collection metadata.")
return 64

# XXX: Are you sure?
recreate_index(indexer, bucket_id, collection_id, schema)
reindex_records(indexer, registry.storage, bucket_id, collection_id)

return 0


def get_index_schema(storage, bucket_id, collection_id):
# Open collection metadata.
# XXX: https://github.com/Kinto/kinto/issues/710
metadata = storage.get(parent_id="/buckets/%s" % bucket_id,
collection_id="collection",
object_id=collection_id)
return metadata.get("index:schema")


def recreate_index(indexer, bucket_id, collection_id, schema):
index_name = indexer.indexname(bucket_id, collection_id)
# Delete existing index.
indexer.delete_index(bucket_id, collection_id)
print("Old index '%s' deleted." % index_name)
# Recreate the index with the new schema.
indexer.create_index(bucket_id, collection_id, schema=schema)
print("New index '%s' created." % index_name)


def get_paginated_records(storage, bucket_id, collection_id, limit=5000):
# We can reach the storage_fetch_limit, so we use pagination.
parent_id = "/buckets/%s/collections/%s" % (bucket_id, collection_id)
sorting = [Sort('last_modified', -1)]
pagination_rules = []
while "not gone through all pages":
records, _ = storage.get_all(parent_id=parent_id,
collection_id="record",
pagination_rules=pagination_rules,
sorting=sorting,
limit=limit)
if len(records) == 0:
break # Done.

yield records

smallest_timestamp = records[-1]["last_modified"]
pagination_rules = [
[Filter("last_modified", smallest_timestamp, COMPARISON.LT)]
]


def reindex_records(indexer, storage, bucket_id, collection_id):
total = 0
for records in get_paginated_records(storage, bucket_id, collection_id):
try:
with indexer.bulk() as bulk:
for record in records:
bulk.index_record(bucket_id,
collection_id,
record=record)
print(".", end="")
total += len(bulk.operations)
except elasticsearch.ElasticsearchException:
logger.exception("Failed to index record")
print("\n%s records reindexed." % total)
4 changes: 2 additions & 2 deletions kinto_elasticsearch/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def __init__(self, indexer):
self.indexer = indexer
self.operations = []

def index_record(self, bucket_id, collection_id, record, id_field):
def index_record(self, bucket_id, collection_id, record, id_field="id"):
indexname = self.indexer.indexname(bucket_id, collection_id)
record_id = record[id_field]
self.operations.append({
Expand All @@ -81,7 +81,7 @@ def index_record(self, bucket_id, collection_id, record, id_field):
'_source': record,
})

def unindex_record(self, bucket_id, collection_id, record, id_field):
def unindex_record(self, bucket_id, collection_id, record, id_field="id"):
indexname = self.indexer.indexname(bucket_id, collection_id)
record_id = record[id_field]
self.operations.append({
Expand Down
6 changes: 2 additions & 4 deletions kinto_elasticsearch/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,11 @@ def on_record_changed(event):
if action == ACTIONS.DELETE.value:
bulk.unindex_record(bucket_id,
collection_id,
record=change["old"],
id_field="id")
record=change["old"])
else:
bulk.index_record(bucket_id,
collection_id,
record=change["new"],
id_field="id")
record=change["new"])
except elasticsearch.ElasticsearchException:
logger.exception("Failed to index record")

Expand Down
15 changes: 11 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,23 @@
with open('CHANGELOG.rst') as history_file:
history = history_file.read()

requirements = [
REQUIREMENTS = [
'elasticsearch',
'kinto>=6.0.0'
]

test_requirements = [
TEST_REQUIREMENTS = [
'mock',
'unittest2',
'webtest',
]

ENTRY_POINTS = {
'console_scripts': [
'kinto-elasticsearch-reindex = kinto_elasticsearch.command_reindex:main'
],
}

setup(
name='kinto-elasticsearch',
version='0.3.0.dev0',
Expand All @@ -33,7 +39,7 @@
],
package_dir={'kinto_elasticsearch': 'kinto_elasticsearch'},
include_package_data=True,
install_requires=requirements,
install_requires=REQUIREMENTS,
license="Apache License (2.0)",
zip_safe=False,
keywords='kinto elasticsearch index',
Expand All @@ -48,5 +54,6 @@
'Programming Language :: Python :: 3.5',
],
test_suite='tests',
tests_require=test_requirements
tests_require=TEST_REQUIREMENTS,
entry_points=ENTRY_POINTS
)
6 changes: 6 additions & 0 deletions tests/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
use = egg:kinto
kinto.userid_hmac_secret = some-secret-string

kinto.storage_backend = kinto.core.storage.postgresql
kinto.storage_url = postgres://postgres:postgres@localhost:5432/postgres

kinto.permission_backend = kinto.core.permission.postgresql
kinto.permission_url = postgres://postgres:postgres@localhost:5432/postgres

kinto.includes = kinto_elasticsearch
kinto.plugins.flush

Expand Down
85 changes: 85 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import elasticsearch
import mock
import os
import unittest
from kinto_elasticsearch.command_reindex import main, reindex_records
from . import BaseWebTest

HERE = os.path.abspath(os.path.dirname(__file__))


class TestMain(BaseWebTest, unittest.TestCase):

schema = {
"properties": {
"id": {"type": "keyword"},
"last_modified": {"type": "long"},
"build": {
"properties": {
"date": {"type": "date", "format": "strict_date"},
"id": {"type": "keyword"}
}
}
}
}

def test_cli_fail_if_elasticsearch_plugin_not_installed(self):
with mock.patch('kinto_elasticsearch.command_reindex.logger') as logger:
exit_code = main(['--ini', os.path.join(HERE, 'wrong_config.ini'),
'--bucket', 'bid', '--collection', 'cid'])
assert exit_code == 62
logger.error.assert_called_with('kinto-elasticsearch not available.')

def test_cli_fail_if_collection_or_bucket_do_not_exists(self):
with mock.patch('kinto_elasticsearch.command_reindex.logger') as logger:
exit_code = main(['--ini', os.path.join(HERE, 'config.ini'),
'--bucket', 'bid', '--collection', 'cid'])
assert exit_code == 63
logger.error.assert_called_with(
"No collection 'cid' in bucket 'bid'")

def test_cli_fail_if_collection_has_no_index_schema(self):
# Create collection or bucket
self.app.put("/buckets/bid", headers=self.headers)
self.app.put("/buckets/bid/collections/cid", headers=self.headers)

with mock.patch('kinto_elasticsearch.command_reindex.logger') as logger:
exit_code = main(['--ini', os.path.join(HERE, 'config.ini'),
'--bucket', 'bid', '--collection', 'cid'])
assert exit_code == 64
logger.error.assert_called_with(
'No `index:schema` attribute found in collection metadata.')

def test_cli_reindexes_if_collection_has_an_index_schema(self):
# Create collection or bucket
self.app.put("/buckets/bid", headers=self.headers)
body = {"data": {"index:schema": self.schema}}
self.app.put_json("/buckets/bid/collections/cid", body, headers=self.headers)
self.app.post_json("/buckets/bid/collections/cid/records",
{"data": {"build": {"id": "efg", "date": "2017-02-01"}}},
headers=self.headers)

exit_code = main(['--ini', os.path.join(HERE, 'config.ini'),
'--bucket', 'bid', '--collection', 'cid'])
assert exit_code == 0

def test_cli_logs_elasticsearch_exceptions(self):
indexer = mock.MagicMock()
indexer.bulk().__enter__().index_record.side_effect = elasticsearch.ElasticsearchException

with mock.patch('kinto_elasticsearch.command_reindex.logger') as logger:
with mock.patch('kinto_elasticsearch.command_reindex.get_paginated_records',
return_value=[[{}, {}]]) as get_paginated_records:
reindex_records(indexer,
mock.sentinel.storage,
mock.sentinel.bucket_id,
mock.sentinel.collection_id)
get_paginated_records.assert_called_with(mock.sentinel.storage,
mock.sentinel.bucket_id,
mock.sentinel.collection_id)
logger.exception.assert_called_with('Failed to index record')

def test_cli_default_to_sys_argv(self):
with mock.patch('sys.argv', ['cli', '--ini', os.path.join(HERE, 'wrong_config.ini')]):
exit_code = main()
assert exit_code == 62
9 changes: 0 additions & 9 deletions tests/test_elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,6 @@ def test_returns_false_if_connection_fails(self):

class PostActivation(BaseWebTest, unittest.TestCase):

@classmethod
def get_app_settings(cls, extras=None):
settings = super().get_app_settings(extras)
settings['storage_backend'] = 'kinto.core.storage.postgresql'
settings['storage_url'] = 'postgres://postgres:postgres@localhost:5432/postgres'
settings['permission_backend'] = 'kinto.core.permission.postgresql'
settings['permission_url'] = settings['storage_url']
return settings

def setUp(self):
app = self.make_app(settings={"kinto.includes": ""})
capabilities = app.get("/").json["capabilities"]
Expand Down
5 changes: 5 additions & 0 deletions tests/wrong_config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[app:main]
use = egg:kinto
kinto.userid_hmac_secret = some-secret-string

kinto.includes = kinto.plugins.flush

0 comments on commit 9f344cd

Please sign in to comment.