Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ commands:
openjdk-11-jdk \
postgresql-${PG_VERSION} \
ruby2.3 \
ruby2.3-dev
ruby2.3-dev \
redis-server
sudo chown -R circleci /etc/elasticsearch
sed -i "1s;^;export PATH=${ES_BIN}:${PG_BIN}:$PATH\n;" $BASH_ENV
sudo apt-get install -y python3.7-dev python3-pip
Expand Down
7 changes: 5 additions & 2 deletions base.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[app:app]
use = egg:snovault#snowflakes
config_name = app
sqlalchemy.url = postgresql:///snowflakes
retry.attempts = 3
file_upload_bucket = snowflakes-files-dev
Expand Down Expand Up @@ -43,7 +44,8 @@ embed_cache.capacity = 5000
use = egg:snovault#indexer
app = app
path = /index
timeout = 60
set timeout = 60
set config_name = indexer
set embed_cache.capacity = 5000
set indexer = true
set queue_type = Simple
Expand All @@ -57,7 +59,8 @@ set queue_worker_batch_size = 2000000
use = egg:snovault#indexer
app = app
path = /index_file
timeout = 60
set timeout = 60
set config_name = regionindexer
set embed_cache.capacity = 5000
set regionindexer = true

Expand Down
9 changes: 9 additions & 0 deletions development.ini
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,27 @@ use = config:base.ini#app
sqlalchemy.url = postgresql://postgres@:5432/postgres?host=/tmp/snovault/pgdata
snp_search.server = localhost:9200
load_test_only = true
local_tz = US/Pacific
create_tables = true
testing = true
postgresql.statement_timeout = 20
indexer.processes =


pyramid.reload_templates = true
pyramid.debug_authorization = false
pyramid.debug_notfound = true
pyramid.debug_routematch = false
pyramid.default_locale_name = en

snovault.load_test_data = snowflakes.loadxl:load_test_data
# Local Storage: Settings must exist in...
# snovault/tests/[testappsettings.py, test_key.py]
# snowflakes/tests/conftest.py
local_storage_host = localhost
local_storage_port = 6378
local_storage_redis_index = 1
local_storage_timeout = 5

[pipeline:debug]
pipeline =
Expand Down
2 changes: 2 additions & 0 deletions requirements.osx.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
pip==20.0.2
psycopg2==2.8.4
redis==3.5.3
redis-server==5.0.7
setuptools==45.1.0
zc.buildout==2.13.2
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
pip==20.0.2
psycopg2==2.8.4
redis==3.5.3
redis-server==5.0.7
setuptools==45.1.0
zc.buildout==2.13.2
1 change: 1 addition & 0 deletions src/snovault/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def includeme(config):
config.include('.crud_views')
config.include('.indexing_views')
config.include('.resource_views')
config.include('.elasticsearch.local_indexer_store')


def main(global_config, **local_config):
Expand Down
18 changes: 14 additions & 4 deletions src/snovault/dev_servers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

"""
from pkg_resources import resource_filename
from pyramid.paster import get_app
from pyramid.paster import get_app, get_appsettings
from multiprocessing import Process

import atexit
Expand Down Expand Up @@ -62,17 +62,25 @@ def main():
parser.add_argument('--datadir', default='/tmp/snovault', help="path to datadir")
args = parser.parse_args()

appsettings = get_appsettings(args.config_uri, name='app')
# Required settings in config
local_storage_host = appsettings['local_storage_host']
local_storage_port = appsettings['local_storage_port']
local_storage_redis_index = appsettings['local_storage_redis_index']
local_storage_timeout = appsettings['local_storage_timeout']

logging.basicConfig()
# Loading app will have configured from config file. Reconfigure here:
logging.getLogger('snovault').setLevel(logging.INFO)

from snovault.tests import elasticsearch_fixture, postgresql_fixture
from snovault.tests import elasticsearch_fixture, postgresql_fixture, redis_storage_fixture
from snovault.elasticsearch import create_mapping
datadir = os.path.abspath(args.datadir)
pgdata = os.path.join(datadir, 'pgdata')
esdata = os.path.join(datadir, 'esdata')
redisdata = os.path.join(datadir, 'redisdata')
if args.clear:
for dirname in [pgdata, esdata]:
for dirname in [pgdata, esdata, redisdata]:
if os.path.exists(dirname):
shutil.rmtree(dirname)
if args.init:
Expand All @@ -81,7 +89,9 @@ def main():
postgres = postgresql_fixture.server_process(pgdata, echo=True)
elasticsearch = elasticsearch_fixture.server_process(esdata, echo=True)
nginx = nginx_server_process(echo=True)
processes = [postgres, elasticsearch, nginx]
redis_config_path = redis_storage_fixture.initdb(redisdata, local_storage_port, echo=True)
redis = redis_storage_fixture.server_process(redis_config_path, local_storage_port, local_storage_redis_index, echo=True)
processes = [postgres, elasticsearch, nginx, redis]

print_processes = []

Expand Down
1 change: 1 addition & 0 deletions src/snovault/elasticsearch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def includeme(config):

config.include('.indexer')
config.include('.indexer_state')
config.include('.local_indexer_store')
if asbool(settings.get('indexer')) and not PY2:
config.include('.mpindexer')

Expand Down
23 changes: 22 additions & 1 deletion src/snovault/elasticsearch/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
from snovault.storage import (
TransactionRecord,
)
from snovault.elasticsearch.local_indexer_store import IndexerStore

from urllib3.exceptions import ReadTimeoutError
from .interfaces import (
ELASTIC_SEARCH,
INDEXER,
INDEXER_STORE,
RESOURCES_INDEX,
)
from .indexer_state import (
Expand Down Expand Up @@ -90,6 +93,9 @@ def includeme(config):
_update_for_uuid_queues(registry)
if not processes:
registry[INDEXER] = Indexer(registry)
if not registry.get(INDEXER_STORE):
registry[INDEXER_STORE] = IndexerStore(config.registry.settings)



def get_related_uuids(request, es, updated, renamed):
Expand Down Expand Up @@ -336,6 +342,8 @@ def _get_nodes(request, indexer_state):

@view_config(route_name='index', request_method='POST', permission="index")
def index(request):
indexer_store = request.registry[INDEXER_STORE]
indexer_store.set_state(indexer_store.state_endpoint_start)
# Setting request.datastore here only works because routed views are not traversed.
request.datastore = 'database'
session = request.registry[DBSESSION]()
Expand Down Expand Up @@ -366,9 +374,11 @@ def index(request):
return {'this_node': this_node, 'other_node': other_node}

def _load_indexing(request, session, connection, indexer_state):
indexer_store.set_state(indexer_store.state_load_indexing)
first_txn = None
snapshot_id = None
(xmin, invalidated, restart) = indexer_state.priority_cycle(request)
indexer_store.set_state(indexer_store.state_load_indexing)
indexer_state.log_reindex_init_state()
# OPTIONAL: restart support
if restart: # Currently not bothering with restart!!!
Expand Down Expand Up @@ -493,14 +503,23 @@ def _run_indexing(
result = indexer_state.start_cycle(invalidated, result)

# Do the work...

local_state, event_tag = indexer_store.set_state(
indexer_store.state_run_indexing,
# 'invalidated' indicates the start of indexing
invalidated_cnt=len(invalidated)
)
indexing_update_infos, errors, err_msg = request.registry[INDEXER].serve_objects(
request,
invalidated,
xmin,
snapshot_id=snapshot_id,
restart=restart,
)
indexer_store.set_state(
indexer_store.state_run_indexing,
errors_cnt=len(errors),
event_tag=event_tag,
)
if err_msg:
log.warning('Could not start indexing: %s', err_msg)
result = indexer_state.finish_cycle(result,errors)
Expand Down Expand Up @@ -544,6 +563,7 @@ def _run_indexing(
output_tuple = _load_indexing(request, session, connection, indexer_state)
result, invalidated, flush, first_txn, snapshot_id, restart, xmin, return_now = output_tuple
if return_now:
indexer_store.set_state(indexer_store.state_waiting)
return result
indexing_update_infos = []
dry_run = request.json.get('dry_run', False)
Expand Down Expand Up @@ -625,6 +645,7 @@ def _run_indexing(
# opposed to in the indexer or just after serve_objects,
# so a crash in logging does not interupt indexing complietion
request.registry[INDEXER].check_log_indexing_times(indexing_update_infos)
indexer_store.set_state(indexer_store.state_waiting)
return result


Expand Down
4 changes: 4 additions & 0 deletions src/snovault/elasticsearch/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
ELASTIC_SEARCH = 'elasticsearch'
SNP_SEARCH_ES = 'snp_search'
INDEXER = 'indexer'
INDEXER_STORE = f"{INDEXER}_store"
INDEXER_STATE_TAG = f"{INDEXER}_state:hash"
INDEXER_EVENTS_TAG = f"{INDEXER}_event"
INDEXER_EVENTS_LIST = f"{INDEXER_EVENTS_TAG}:list"
RESOURCES_INDEX = 'snovault-resources'


Expand Down
Loading