Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions src/azul/field_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
)

from more_itertools import (
first,
one,
)

Expand Down Expand Up @@ -431,13 +430,23 @@ def from_index(self, value: str) -> str | None:

class Nested(PassThrough[JSON]):
properties: Mapping[str, FieldType]
agg_property: str

def __init__(self, **properties):
super().__init__(JSON, es_type='nested')
self.agg_property = first(properties.keys())
self.properties = properties

def to_index(self, value: JSON) -> JSON:
return {
field: field_type.to_index(value[field])
for field, field_type in self.properties.items()
}

def from_index(self, value: JSON) -> JSON:
return {
field: field_type.from_index(value[field])
for field, field_type in self.properties.items()
}

def api_filter_values_schema(self, operator: str, mode: Mode) -> JSON:
assert operator == 'is'
schema = super().api_filter_values_schema(operator, mode)
Expand Down
1 change: 0 additions & 1 deletion src/azul/indexer/document_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ def field_type(self, catalog: CatalogName, path: FieldPath) -> FieldType:
if isinstance(field_types, Nested):
element = next(elements, None)
if element is not None:
assert element == field_types.agg_property, (element, field_types)
field_types = field_types.properties[element]
assert isinstance(field_types, FieldType), (path, field_types)
element = next(elements, None)
Expand Down
21 changes: 17 additions & 4 deletions src/azul/plugins/metadata/hca/service/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,8 +559,10 @@ def file_type_summary(aggregate_file: JSON) -> FileTypeSummaryForHit:
return summarized_hit

def make_terms(self, agg) -> Terms:
def choose_entry(_term):
if 'key_as_string' in _term:
def choose_entry(_term, nested_keys):
if nested_keys is not None:
return dict(zip(nested_keys, _term['key']))
elif 'key_as_string' in _term:
return _term['key_as_string']
elif (term_key := _term['key']) is None:
return None
Expand All @@ -573,8 +575,19 @@ def choose_entry(_term):

terms: list[Term] = []
for bucket in agg['myTerms']['buckets']:
term = Term(term=choose_entry(bucket),
count=bucket['doc_count'])
if 'reverseNested' in bucket:
# For nested fields, we want the count from the reverse nested
# aggregation which tells us how many (parent) documents
# contain the nested field term, unlike bucket['doc_count']
# which is how many times the nested field term occurs in all
# documents.
doc_count = bucket['reverseNested']['doc_count']
nested_keys = [path[-1] for path in agg['myTerms']['meta']['paths']]
else:
doc_count = bucket['doc_count']
nested_keys = None
term = Term(term=choose_entry(bucket, nested_keys),
count=doc_count)
try:
sub_agg = bucket['myProjectIds']
except KeyError:
Expand Down
79 changes: 59 additions & 20 deletions src/azul/service/query_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
)
from opensearchpy.helpers.aggs import (
Agg,
MultiTerms,
Terms,
)
from opensearchpy.helpers.query import (
Expand Down Expand Up @@ -327,21 +328,40 @@ def _prepare_aggregation(self, *, facet: str, facet_path: FieldPath) -> Agg:

field_type = self.service.field_type(self.catalog, facet_path)
if isinstance(field_type, Nested):
nested_agg = agg.bucket(name='nested',
agg_type='nested',
path=dotted(facet_path))
facet_path = dotted(facet_path, field_type.agg_property)
path = dotted(facet_path)
# A nested aggregation to aggregate on fields inside a nested field
agg.bucket(name='nested',
agg_type='nested',
path=path)
# A multi-terms aggregation to form composite keys made from the
# fields inside a nested field
agg.aggs.nested.bucket(name='myTerms',
agg_type='multi_terms',
terms=[
{'field': path + f'.{field}.keyword'}
for field in field_type.properties
],
size=config.terms_aggregation_size)
# A reverse nested aggregation so that a doc_count can be obtained
# which reflects the parent documents, not the separate documents
# created for the nested field
agg.aggs.nested.aggs.myTerms.bucket(name='reverseNested',
agg_type='reverse_nested')
# A filter aggregation to work around that we can't use a missing
# aggregation with a nested field.
# See https://github.com/elastic/elasticsearch/issues/9571
agg.bucket(name='untagged',
agg_type='filter',
filter=Q('bool', must_not=[
Q('nested', path=path, query=Q('exists', field=path))
]))
else:
nested_agg = agg
# Make an inner agg that will contain the terms in question
path = dotted(facet_path, 'keyword')
# FIXME: Approximation errors for terms aggregation are unchecked
# https://github.com/DataBiosphere/azul/issues/3413
nested_agg.bucket(name='myTerms',
agg_type='terms',
field=path,
size=config.terms_aggregation_size)
nested_agg.bucket('untagged', 'missing', field=path)
path = dotted(facet_path, 'keyword')
agg.bucket(name='myTerms',
agg_type='terms',
field=path,
size=config.terms_aggregation_size)
agg.bucket('untagged', 'missing', field=path)
return agg

def _annotate_aggs_for_translation(self, request: Search):
Expand All @@ -352,13 +372,20 @@ def _annotate_aggs_for_translation(self, request: Search):
"""

def annotate(agg: Agg):
if isinstance(agg, Terms):
path = agg.field.split('.')
if path[-1] == 'keyword':
path.pop()
if isinstance(agg, (Terms, MultiTerms)):
if not hasattr(agg, 'meta'):
agg.meta = {}
agg.meta['path'] = path
if hasattr(agg, 'terms'):
# A MultiTerms agg contains multiple fields, and we need the
# path of each one. We store the paths in the same order the
# fields occur in the `terms` list.
agg.meta['paths'] = []
for term in agg.terms:
path = term['field'].removesuffix('.keyword').split('.')
agg.meta['paths'].append(path)
else:
path = agg.field.removesuffix('.keyword').split('.')
agg.meta['path'] = path
if hasattr(agg, 'aggs'):
subs = agg.aggs
for sub_name in subs:
Expand Down Expand Up @@ -391,14 +418,26 @@ def translate(k, v: MutableJSON):
translate(k, v)
else:
try:
# We annotated Terms aggregates with `path`, a dotted path
# to a field in an index document
path = v['meta']['path']
except KeyError:
pass
else:
field_type = self.service.field_type(self.catalog, tuple(path))
for bucket in buckets:
bucket['key'] = field_type.from_index(bucket['key'])
translate(k, bucket)
try:
# We annotated MultiTerms aggregates with `paths`, a list of
# dotted paths for the fields inside a nested field
paths = v['meta']['paths']
except KeyError:
pass
else:
for i, path in enumerate(paths):
field_type = self.service.field_type(self.catalog, tuple(path))
for bucket in buckets:
bucket['key'][i] = field_type.from_index(bucket['key'][i])

for k, v in aggs.items():
translate(k, v)
Expand Down
36 changes: 36 additions & 0 deletions test/indexer/test_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1746,6 +1746,42 @@ def test_organoid_priority(self):
self.assertEqual(inner_cell_suspensions_in_contributions + inner_cell_suspensions_in_aggregates,
inner_cell_suspensions)

def test_nested_field_aggregation(self):
bundles = [
# Bundles with the following tissue_atlas (atlas/version) values:
# [None/None (x2), Lung/None, Retina/v1.0, Blood/v1.0]
self.bundle_fqid(uuid='2c7d06b8-658e-4c51-9de4-a768322f84c5',
version='2021-09-21T17:27:23.898000Z'),
# [Blood/v1.0]
self.bundle_fqid(uuid='587d74b4-1075-4bbf-b96a-4d1ede0481b2',
version='2018-10-10T02:23:43.182000Z'),
# [] (none)
self.bundle_fqid(uuid='97f0cc83-f0ac-417a-8a29-221c77debde8',
version='2019-10-14T19:54:15.397406Z')
]
for bundle in bundles:
self._index_canned_bundle(bundle)
hits = self._get_all_hits()
expected = {
'50151324-f3ed-4358-98af-ec352a940a61': [
{'atlas': '~null', 'version': '~null'},
{'atlas': '~null', 'version': '~null'},
{'atlas': 'Lung', 'version': '~null'},
{'atlas': 'Retina', 'version': 'v1.0'},
{'atlas': 'Blood', 'version': 'v1.0'}
],
'6615efae-fca8-4dd2-a223-9cfcf30fe94d': [
{'atlas': 'Blood', 'version': 'v1.0'}
],
'4e6f083b-5b9a-4393-9890-2a83da8188f1': [
]
}
for hit in self._filter_hits(hits, DocumentType.aggregate, 'projects'):
contents = hit['_source']['contents']
project = cast(JSON, one(contents['projects']))
project_id = project['document_id']
self.assertEqual(expected[project_id], project['tissue_atlas'])

def test_accessions_fields(self):
bundle_fqid = self.bundle_fqid(uuid='fa5be5eb-2d64-49f5-8ed8-bd627ac9bc7a',
version='2019-02-14T19:24:38.034764Z')
Expand Down
2 changes: 1 addition & 1 deletion test/service/test_app_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def filter_body(organ: str) -> JSON:
elif debug == 1:
expected_log = f'… with a response body starting in {body[:prefix_len]}'
elif debug > 1:
expected_log = f'… with a response body of length 9137 being {body}'
expected_log = f'… with a response body of length 9163 being {body}'
else:
assert False
self.assertEqual(expected_log, body_log_message)
Expand Down
70 changes: 65 additions & 5 deletions test/service/test_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -2624,6 +2624,64 @@ def from_response(cls, hit: JSON) -> Self:
})


class TestNestedFieldAggregation(IndexResponseTestCase):
maxDiff = None

@classmethod
def bundles(cls) -> list[BundleFQID]:
return [
# 1 file, 1 sample
# tissue_atlas=[None/None (x2), Lung/None, Retina/v1.0, Blood/v1.0]
cls.bundle_fqid(uuid='2c7d06b8-658e-4c51-9de4-a768322f84c5',
version='2021-09-21T17:27:23.898000Z'),
# 20 files, 1 sample
# tissue_atlas=[Blood/v1.0]
cls.bundle_fqid(uuid='587d74b4-1075-4bbf-b96a-4d1ede0481b2',
version='2018-10-10T02:23:43.182000Z'),
# 2 files, 1 sample
# tissue_atlas=[] (none)
cls.bundle_fqid(uuid='97f0cc83-f0ac-417a-8a29-221c77debde8',
version='2019-10-14T19:54:15.397406Z'),
]

@classmethod
def setUpClass(cls):
super().setUpClass()
cls._setup_indices()

@classmethod
def tearDownClass(cls):
cls._teardown_indices()
super().tearDownClass()

def test_nested_field_facet(self):
tissue_atlas_terms = [
{'atlas': 'Blood', 'version': 'v1.0'},
{'atlas': None, 'version': None},
{'atlas': 'Lung', 'version': None},
{'atlas': 'Retina', 'version': 'v1.0'},
None,
]
tissue_atlas_term_counts = {
'projects': [2, 1, 1, 1, 1],
'bundles': [2, 1, 1, 1, 1],
'samples': [2, 1, 1, 1, 1],
'files': [21, 1, 1, 1, 2]
}
for entity_type, counts in tissue_atlas_term_counts.items():
with self.subTest(entity_type=entity_type):
url = self.base_url.set(path='/index/' + entity_type,
args=(self._params(size=1)))
response = requests.get(str(url))
response.raise_for_status()
response_json = response.json()
facets = response_json['termFacets']
expected = []
for count, term in zip(counts, tissue_atlas_terms):
expected.append({'count': count, 'term': term})
self.assertElasticEqual(expected, facets['tissueAtlas']['terms'])


class TestSortAndFilterByCellCount(IndexResponseTestCase):
maxDiff = None

Expand Down Expand Up @@ -3568,11 +3626,13 @@ def test_projects_response(self):

tissue_atlas = response_json['termFacets']['tissueAtlas']
self.assertEqual(5, tissue_atlas['total'])
terms = {
entry['term']: entry['count']
for entry in tissue_atlas['terms']
}
self.assertEqual({None: 2, 'Lung': 1, 'Retina': 1, 'Blood': 1}, terms)
expected_tissue_atlas_terms = [
{'count': 1, 'term': {'atlas': None, 'version': None}},
{'count': 1, 'term': {'atlas': 'Blood', 'version': 'v1.0'}},
{'count': 1, 'term': {'atlas': 'Lung', 'version': None}},
{'count': 1, 'term': {'atlas': 'Retina', 'version': 'v1.0'}},
]
self.assertEqual(expected_tissue_atlas_terms, tissue_atlas['terms'])

def test_data_use_and_duos_id(self):
test_data = [
Expand Down
Loading