Skip to content

Commit f137320

Browse files
committed
Merge branch 'ndefries/rvdss-endpoint' into rvdss-integration-tests
2 parents 24c6b9f + 7de9698 commit f137320

File tree

5 files changed

+155
-5
lines changed

5 files changed

+155
-5
lines changed

.github/workflows/ci.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ jobs:
6565
docker network create --driver bridge delphi-net
6666
docker run --rm -d -p 13306:3306 --network delphi-net --name delphi_database_epidata --cap-add=sys_nice delphi_database_epidata
6767
docker run --rm -d -p 6379:6379 --network delphi-net --env "REDIS_PASSWORD=1234" --name delphi_redis delphi_redis
68-
68+
6969
7070
- run: |
7171
wget https://raw.githubusercontent.com/eficode/wait-for/master/wait-for

src/client/delphi_epidata.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,43 @@ def covid_hosp_facility_lookup(state=None, ccn=None, city=None, zip=None, fips_c
675675
# Make the API call
676676
return Epidata._request("covid_hosp_facility_lookup", params)
677677

678+
# Fetch Canadian respiratory RVDSS data
679+
@staticmethod
680+
def rvdss(
681+
geo_type,
682+
time_values,
683+
geo_value,
684+
as_of=None,
685+
issues=None,
686+
):
687+
"""Fetch RVDSS data."""
688+
# Check parameters
689+
if None in (geo_type, time_values, geo_value):
690+
raise EpidataBadRequestException(
691+
"`geo_type`, `geo_value`, and `time_values` are all required"
692+
)
693+
694+
# Set up request
695+
params = {
696+
# Fake a time type param so that we can use some helper functions later.
697+
"time_type": "week",
698+
"geo_type": geo_type,
699+
"time_values": Epidata._list(time_values),
700+
}
701+
702+
if isinstance(geo_value, (list, tuple)):
703+
params["geo_values"] = ",".join(geo_value)
704+
else:
705+
params["geo_values"] = geo_value
706+
if as_of is not None:
707+
params["as_of"] = as_of
708+
if issues is not None:
709+
params["issues"] = Epidata._list(issues)
710+
711+
# Make the API call
712+
return Epidata._request("rvdss", params)
713+
714+
678715
@staticmethod
679716
def async_epidata(param_list, batch_size=50):
680717
"""[DEPRECATED] Make asynchronous Epidata calls for a list of parameters."""

src/server/_query.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -483,15 +483,29 @@ def apply_issues_filter(self, history_table: str, issues: Optional[TimeValues])
483483
self.where_integers("issue", issues)
484484
return self
485485

486-
def apply_as_of_filter(self, history_table: str, as_of: Optional[int]) -> "QueryBuilder":
486+
# Note: This function was originally only used by covidcast, so it assumed
487+
# that `source` and `signal` columns are always available. To make this usable
488+
# for rvdss, too, which doesn't have the `source` or `signal` columns, add the
489+
# `use_source_signal` flag to toggle inclusion of `source` and `signal` columns.
490+
#
491+
# `use_source_signal` defaults to True so if not passed, the function
492+
# retains the historical behavior.
493+
def apply_as_of_filter(self, history_table: str, as_of: Optional[int], use_source_signal: Optional[bool] = True) -> "QueryBuilder":
487494
if as_of is not None:
488495
self.retable(history_table)
489496
sub_condition_asof = "(issue <= :as_of)"
490497
self.params["as_of"] = as_of
491-
sub_fields = "max(issue) max_issue, time_type, time_value, `source`, `signal`, geo_type, geo_value"
492-
sub_group = "time_type, time_value, `source`, `signal`, geo_type, geo_value"
498+
493499
alias = self.alias
494-
sub_condition = f"x.max_issue = {alias}.issue AND x.time_type = {alias}.time_type AND x.time_value = {alias}.time_value AND x.source = {alias}.source AND x.signal = {alias}.signal AND x.geo_type = {alias}.geo_type AND x.geo_value = {alias}.geo_value"
500+
source_signal_plain = source_signal_alias = ""
501+
# If `use_source_signal` toggled on, append `source` and `signal` columns to group-by list and join list.
502+
if use_source_signal:
503+
source_signal_plain = ", `source`, `signal`"
504+
source_signal_alias = f" AND x.source = {alias}.source AND x.signal = {alias}.signal"
505+
506+
sub_fields = f"max(issue) max_issue, time_type, time_value{source_signal_plain}, geo_type, geo_value"
507+
sub_group = f"time_type, time_value{source_signal_plain}, geo_type, geo_value"
508+
sub_condition = f"x.max_issue = {alias}.issue AND x.time_type = {alias}.time_type AND x.time_value = {alias}.time_value{source_signal_alias} AND x.geo_type = {alias}.geo_type AND x.geo_value = {alias}.geo_value"
495509
self.subquery = f"JOIN (SELECT {sub_fields} FROM {self.table} WHERE {self.conditions_clause} AND {sub_condition_asof} GROUP BY {sub_group}) x ON {sub_condition}"
496510
return self
497511

src/server/endpoints/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
nowcast,
2626
paho_dengue,
2727
quidel,
28+
rvdss,
2829
sensors,
2930
twitter,
3031
wiki,
@@ -59,6 +60,7 @@
5960
nowcast,
6061
paho_dengue,
6162
quidel,
63+
rvdss,
6264
sensors,
6365
twitter,
6466
wiki,

src/server/endpoints/rvdss.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
from flask import Blueprint, request
2+
3+
from .._params import (
4+
extract_date,
5+
extract_dates,
6+
extract_strings,
7+
parse_time_set,
8+
)
9+
from .._query import execute_query, QueryBuilder
10+
from .._validate import require_all
11+
12+
bp = Blueprint("rvdss", __name__)
13+
14+
db_table_name = "rvdss"
15+
16+
@bp.route("/", methods=("GET", "POST"))
17+
def handle():
18+
require_all(request, "time_type", "time_values", "geo_type", "geo_values")
19+
20+
time_set = parse_time_set()
21+
geo_type = extract_strings("geo_type")
22+
geo_values = extract_strings("geo_values")
23+
issues = extract_dates("issues")
24+
as_of = extract_date("as_of")
25+
26+
# basic query info
27+
q = QueryBuilder(db_table_name, "rv")
28+
29+
fields_string = [
30+
"geo_type",
31+
"geo_value",
32+
"region",
33+
"time_type",
34+
]
35+
fields_int = [
36+
"epiweek",
37+
"time_value",
38+
"issue",
39+
"week",
40+
"weekorder",
41+
"year",
42+
]
43+
fields_float = [
44+
"adv_pct_positive",
45+
"adv_positive_tests",
46+
"adv_tests",
47+
"evrv_pct_positive",
48+
"evrv_positive_tests",
49+
"evrv_tests",
50+
"flu_pct_positive",
51+
"flu_positive_tests",
52+
"flu_tests",
53+
"flua_pct_positive",
54+
"flua_positive_tests",
55+
"flua_tests",
56+
"fluah1n1pdm09_positive_tests",
57+
"fluah3_positive_tests",
58+
"fluauns_positive_tests",
59+
"flub_pct_positive",
60+
"flub_positive_tests",
61+
"flub_tests",
62+
"hcov_pct_positive",
63+
"hcov_positive_tests",
64+
"hcov_tests",
65+
"hmpv_pct_positive",
66+
"hmpv_positive_tests",
67+
"hmpv_tests",
68+
"hpiv1_positive_tests",
69+
"hpiv2_positive_tests",
70+
"hpiv3_positive_tests",
71+
"hpiv4_positive_tests",
72+
"hpiv_pct_positive",
73+
"hpiv_positive_tests",
74+
"hpiv_tests",
75+
"hpivother_positive_tests",
76+
"rsv_pct_positive",
77+
"rsv_positive_tests",
78+
"rsv_tests",
79+
"sarscov2_pct_positive",
80+
"sarscov2_positive_tests",
81+
"sarscov2_tests",
82+
]
83+
84+
q.set_sort_order("epiweek", "time_value", "geo_type", "geo_value", "issue")
85+
q.set_fields(fields_string, fields_int, fields_float)
86+
87+
q.where_strings("geo_type", geo_type)
88+
# Only apply geo_values filter if wildcard "*" was NOT used.
89+
if not (len(geo_values) == 1 and geo_values[0] == "*"):
90+
q.where_strings("geo_value", geo_values)
91+
92+
q.apply_time_filter("time_type", "time_value", time_set)
93+
q.apply_issues_filter(db_table_name, issues)
94+
q.apply_as_of_filter(db_table_name, as_of, use_source_signal = False)
95+
96+
# send query
97+
return execute_query(str(q), q.params, fields_string, fields_int, fields_float)

0 commit comments

Comments
 (0)