Skip to content

Commit e2e0a26

Browse files
author
Hicham
committed
refactor(exports): rearrange export tasks
1 parent 5567056 commit e2e0a26

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+1139
-742
lines changed

cohort/services/cohort_result.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from cohort.serializers import WSJobStatus, JobName
99
from cohort.services.base_service import CommonService
1010
from cohort.services.dated_measure import dm_service
11-
from cohort.services.utils import get_authorization_header, ServerError
11+
from cohort.services.utils import ServerError
1212
from admin_cohort.services.ws_event_manager import WebsocketManager, WebSocketMessageType
1313
from cohort.tasks import create_cohort
1414

@@ -36,7 +36,12 @@ def build_query(cohort_source_id: str, fhir_filter_id: str = None) -> str:
3636
}
3737
return json.dumps(query)
3838

39-
def create_cohort_subset(self, request, owner_id: str, table_name: str, source_cohort: CohortResult, fhir_filter_id: str) -> CohortResult:
39+
def create_cohort_subset(self,
40+
auth_headers: dict,
41+
owner_id: str,
42+
table_name: str,
43+
source_cohort: CohortResult,
44+
fhir_filter_id: str) -> CohortResult:
4045
def copy_query_snapshot(snapshot: RequestQuerySnapshot) -> RequestQuerySnapshot:
4146
return RequestQuerySnapshot.objects.create(owner=snapshot.owner,
4247
request=snapshot.request,
@@ -62,7 +67,7 @@ def copy_dated_measure(dm: DatedMeasure) -> DatedMeasure:
6267
dated_measure=new_dm,
6368
request_query_snapshot=new_rqs)
6469
with transaction.atomic():
65-
self.handle_cohort_creation(cohort_subset, request, False)
70+
self.handle_cohort_creation(cohort_subset, auth_headers)
6671
return cohort_subset
6772

6873
@staticmethod
@@ -74,9 +79,9 @@ def count_active_jobs():
7479
return CohortResult.objects.filter(request_job_status__in=active_statuses) \
7580
.count()
7681

77-
def handle_cohort_creation(self, cohort: CohortResult, request, global_estimate: bool) -> None:
82+
def handle_cohort_creation(self, cohort: CohortResult, auth_headers: dict, global_estimate: bool=False) -> None:
7883
if global_estimate:
79-
dm_service.handle_global_count(cohort, request)
84+
dm_service.handle_global_count(cohort, auth_headers)
8085
try:
8186
if cohort.parent_cohort and cohort.sampling_ratio:
8287
json_query = self.build_query(cohort_source_id=cohort.parent_cohort.group_id)
@@ -91,7 +96,7 @@ def handle_cohort_creation(self, cohort: CohortResult, request, global_estimate:
9196

9297
create_cohort.s(cohort_id=cohort.pk,
9398
json_query=json_query,
94-
auth_headers=get_authorization_header(request),
99+
auth_headers=auth_headers,
95100
cohort_creator_cls=self.operator_cls,
96101
sampling_ratio=cohort.sampling_ratio) \
97102
.apply_async()

cohort/services/dated_measure.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,20 @@
33
from cohort.models.dated_measure import GLOBAL_DM_MODE
44
from cohort.serializers import WSJobStatus, JobName
55
from cohort.services.base_service import CommonService
6-
from cohort.services.utils import get_authorization_header, ServerError
6+
from cohort.services.utils import ServerError
77
from admin_cohort.services.ws_event_manager import WebsocketManager, WebSocketMessageType
88
from cohort.tasks import cancel_previous_count_jobs, count_cohort
99

1010

1111
class DatedMeasureService(CommonService):
1212
job_type = "count"
1313

14-
def handle_count(self, dm: DatedMeasure, request) -> None:
15-
stage_details = request.data.get("stageDetails", None)
14+
def handle_count(self, dm: DatedMeasure, auth_headers: dict, stage_details) -> None:
1615
cancel_previous_count_jobs.s(dm_id=dm.uuid, cohort_counter_cls=self.operator_cls).apply_async()
1716
try:
1817
count_cohort.s(dm_id=dm.uuid,
1918
json_query=dm.request_query_snapshot.serialized_query,
20-
auth_headers=get_authorization_header(request),
19+
auth_headers=auth_headers,
2120
cohort_counter_cls=self.operator_cls,
2221
stage_details=stage_details
2322
) \
@@ -26,14 +25,14 @@ def handle_count(self, dm: DatedMeasure, request) -> None:
2625
dm.delete()
2726
raise ServerError("Could not launch count request") from e
2827

29-
def handle_global_count(self, cohort: CohortResult, request) -> None:
28+
def handle_global_count(self, cohort: CohortResult, auth_headers: dict) -> None:
3029
dm_global = DatedMeasure.objects.create(mode=GLOBAL_DM_MODE,
31-
owner=request.user,
32-
request_query_snapshot_id=request.data.get("request_query_snapshot"))
30+
owner=cohort.owner,
31+
request_query_snapshot=cohort.request_query_snapshot)
3332
try:
3433
count_cohort.s(dm_id=dm_global.uuid,
3534
json_query=dm_global.request_query_snapshot.serialized_query,
36-
auth_headers=get_authorization_header(request),
35+
auth_headers=auth_headers,
3736
cohort_counter_cls=self.operator_cls,
3837
global_estimate=True) \
3938
.apply_async()

cohort/services/feasibility_study.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from cohort.models import FeasibilityStudy
1616
from cohort.services.base_service import CommonService
1717

18-
from cohort.services.utils import get_authorization_header, ServerError
18+
from cohort.services.utils import ServerError
1919
from cohort.tasks import feasibility_study_count, send_feasibility_study_notification, send_email_feasibility_report_error, \
2020
send_email_feasibility_report_ready
2121

@@ -49,11 +49,11 @@ def bound_number(n: int) -> str:
4949
class FeasibilityStudyService(CommonService):
5050
job_type = "count"
5151

52-
def handle_feasibility_study_count(self, fs: FeasibilityStudy, request) -> None:
52+
def handle_feasibility_study_count(self, fs: FeasibilityStudy, auth_headers: dict) -> None:
5353
try:
5454
chain(*(feasibility_study_count.s(fs_id=fs.uuid,
5555
json_query=fs.request_query_snapshot.serialized_query,
56-
auth_headers=get_authorization_header(request),
56+
auth_headers=auth_headers,
5757
cohort_counter_cls=self.operator_cls),
5858
send_feasibility_study_notification.s(fs.uuid)))()
5959
except Exception as e:

cohort/tests/test_service_cohort_result.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
from unittest import mock
22

3-
from requests import Request as HttpRequest
4-
53
from admin_cohort.tests.tests_tools import new_random_user, TestCaseWithDBs
64
from cohort.models import Folder, Request, RequestQuerySnapshot, CohortResult, DatedMeasure, FhirFilter
75
from cohort.services.cohort_result import CohortResultService
@@ -31,12 +29,10 @@ def setUp(self):
3129
filter="param=value")
3230
self.cohort_result_service = CohortResultService()
3331

34-
@mock.patch('cohort.services.cohort_result.get_authorization_header')
3532
@mock.patch('cohort.services.cohort_result.create_cohort.apply_async')
36-
def test_create_cohort_subset(self, mock_get_auth_headers, mock_celery_task):
37-
mock_get_auth_headers.return_value = {"authorization": "token"}
33+
def test_create_cohort_subset(self, mock_celery_task):
3834
mock_celery_task.return_value = None
39-
cohort_subset = self.cohort_result_service.create_cohort_subset(request=HttpRequest(data={}),
35+
cohort_subset = self.cohort_result_service.create_cohort_subset(auth_headers={},
4036
owner_id=self.user1.pk,
4137
table_name="Table_01",
4238
source_cohort=self.source_cohort,

cohort/tests/tests_view_cohort_result.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -231,12 +231,10 @@ def setUp(self):
231231
status=status.HTTP_400_BAD_REQUEST,
232232
)
233233

234-
@mock.patch('cohort.services.cohort_result.get_authorization_header')
235234
@mock.patch('cohort.services.cohort_result.create_cohort.apply_async')
236235
@mock.patch('cohort.services.dated_measure.count_cohort.apply_async')
237236
def check_create_case_with_mock(self, case: CohortCreateCase, mock_count_task: MagicMock, mock_create_task: MagicMock,
238-
mock_header: MagicMock, other_view: any, view_kwargs: dict):
239-
mock_header.return_value = None
237+
other_view: any, view_kwargs: dict):
240238
mock_create_task.return_value = None
241239
mock_count_task.return_value = None
242240

@@ -253,7 +251,6 @@ def check_create_case_with_mock(self, case: CohortCreateCase, mock_count_task: M
253251
mock_create_task.assert_called() if case.mock_create_task_called else mock_create_task.assert_not_called()
254252

255253
mock_create_task.assert_called() if case.mock_create_task_called else mock_create_task.assert_not_called()
256-
mock_header.assert_called() if case.mock_create_task_called else mock_header.assert_not_called()
257254

258255
def check_create_case(self, case: CohortCreateCase, other_view: Any = None, **view_kwargs):
259256
return self.check_create_case_with_mock(case, other_view=other_view or None, view_kwargs=view_kwargs)

cohort/tests/tests_view_dated_measure.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -166,19 +166,16 @@ def setUp(self):
166166
request_job_status=JobStatus.pending,
167167
owner=self.user1)
168168

169-
@mock.patch('cohort.services.dated_measure.get_authorization_header')
170169
@mock.patch('cohort.services.dated_measure.cancel_previous_count_jobs.apply_async')
171170
@mock.patch('cohort.services.dated_measure.count_cohort.apply_async')
172-
def check_create_case_with_mock(self, case: DMCreateCase, mock_count_task: MagicMock, mock_cancel_task: MagicMock, mock_header: MagicMock,
171+
def check_create_case_with_mock(self, case: DMCreateCase, mock_count_task: MagicMock, mock_cancel_task: MagicMock,
173172
other_view: any, view_kwargs: dict):
174-
mock_header.return_value = None
175173
mock_cancel_task.return_value = None
176174
mock_count_task.return_value = None
177175

178176
with self.captureOnCommitCallbacks(execute=True):
179177
super(DatedMeasuresCreateTests, self).check_create_case(case, other_view, **(view_kwargs or {}))
180178

181-
mock_header.assert_called() if case.mock_header_called else mock_header.assert_not_called()
182179
mock_cancel_task.assert_called() if case.mock_cancel_task_called else mock_cancel_task.assert_not_called()
183180
mock_count_task.assert_called() if case.mock_count_task_called else mock_count_task.assert_not_called()
184181

cohort/views/cohort_result.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
from cohort.serializers import CohortResultSerializer, CohortRightsSerializer, CohortResultPatchSerializer, CohortResultCreateSerializer, \
1717
SampledCohortResultCreateSerializer
1818
from cohort.services.cohort_rights import cohort_rights_service
19+
from cohort.services.utils import get_authorization_header
1920
from cohort.views.shared import UserObjectsRestrictedViewSet
20-
from exports.services.export import export_service
2121

2222

2323
class CohortFilter(filters.FilterSet):
@@ -116,7 +116,8 @@ def list(self, request, *args, **kwargs):
116116
def create(self, request, *args, **kwargs):
117117
global_estimate = request.data.pop("global_estimate", False)
118118
response = super().create(request, *args, **kwargs)
119-
transaction.on_commit(lambda: cohort_service.handle_cohort_creation(request=request,
119+
auth_headers = get_authorization_header(request)
120+
transaction.on_commit(lambda: cohort_service.handle_cohort_creation(auth_headers=auth_headers,
120121
cohort=response.data.serializer.instance,
121122
global_estimate=global_estimate))
122123
return response
@@ -136,8 +137,6 @@ def partial_update(self, request, *args, **kwargs):
136137
else:
137138
response = super().partial_update(request, *args, **kwargs)
138139
cohort_service.handle_cohort_post_update(cohort=cohort, caller=request.user.username)
139-
if cohort.export_table.exists():
140-
export_service.check_all_cohort_subsets_created(export=cohort.export_table.first().export)
141140
cohort_service.ws_send_to_client(cohort=cohort)
142141
return response
143142

cohort/views/dated_measure.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from cohort.serializers import DatedMeasureSerializer, DatedMeasureCreateSerializer, DatedMeasurePatchSerializer
1111
from cohort.services.dated_measure import dm_service
1212
from cohort.services.request_refresh_schedule import requests_refresher_service
13-
from cohort.services.utils import await_celery_task
13+
from cohort.services.utils import await_celery_task, get_authorization_header
1414
from cohort.views.shared import UserObjectsRestrictedViewSet
1515

1616
_logger = logging.getLogger('info')
@@ -44,8 +44,11 @@ def get_queryset(self):
4444
@transaction.atomic
4545
def create(self, request, *args, **kwargs):
4646
response = super().create(request, *args, **kwargs)
47-
transaction.on_commit(lambda: dm_service.handle_count(request=request,
48-
dm=response.data.serializer.instance))
47+
auth_headers = get_authorization_header(request)
48+
stage_details = request.data.get("stageDetails", None)
49+
transaction.on_commit(lambda: dm_service.handle_count(dm=response.data.serializer.instance,
50+
auth_headers=auth_headers,
51+
stage_details=stage_details))
4952
return response
5053

5154
@await_celery_task

cohort/views/feasibility_study.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from cohort.models import FeasibilityStudy
1313
from cohort.serializers import FeasibilityStudySerializer, FeasibilityStudyCreateSerializer, FeasibilityStudyPatchSerializer
1414
from cohort.services.feasibility_study import feasibility_study_service
15+
from cohort.services.utils import get_authorization_header
1516
from cohort.views.shared import UserObjectsRestrictedViewSet
1617

1718
_logger = logging.getLogger('info')
@@ -48,7 +49,8 @@ def retrieve(self, request, *args, **kwargs):
4849
@transaction.atomic
4950
def create(self, request, *args, **kwargs):
5051
response = super().create(request, *args, **kwargs)
51-
transaction.on_commit(lambda: feasibility_study_service.handle_feasibility_study_count(request=request,
52+
auth_headers = get_authorization_header(request)
53+
transaction.on_commit(lambda: feasibility_study_service.handle_feasibility_study_count(auth_headers=auth_headers,
5254
fs=response.data.serializer.instance))
5355
return response
5456

exporters/enums.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
1-
from enum import Enum, StrEnum
1+
from enum import StrEnum
22

33
from admin_cohort.types import JobStatus
44

55

6-
class ExportTypes(Enum):
6+
class ExportTypes(StrEnum):
77
CSV = "csv"
88
XLSX = "xlsx"
99
HIVE = "hive"
1010

1111
@staticmethod
1212
def default() -> str:
13-
return ExportTypes.CSV.value
13+
return ExportTypes.CSV
1414

1515
@property
1616
def allow_download(self) -> bool:
@@ -41,15 +41,15 @@ class APIJobStatus(StrEnum):
4141
flowerNotAccessible = 'flowerNotAccessible'
4242

4343

44-
status_mapper = {APIJobStatus.Received.value: JobStatus.new,
45-
APIJobStatus.Pending.value: JobStatus.pending,
46-
APIJobStatus.Retry.value: JobStatus.pending,
47-
APIJobStatus.Running.value: JobStatus.started,
48-
APIJobStatus.FinishedSuccessfully.value: JobStatus.finished,
49-
APIJobStatus.FinishedWithError.value: JobStatus.failed,
50-
APIJobStatus.FinishedWithTimeout.value: JobStatus.failed,
51-
APIJobStatus.flowerNotAccessible.value: JobStatus.failed,
52-
APIJobStatus.Failure.value: JobStatus.failed,
53-
APIJobStatus.NotFound.value: JobStatus.failed,
54-
APIJobStatus.Revoked.value: JobStatus.cancelled
44+
status_mapper = {APIJobStatus.Received: JobStatus.new,
45+
APIJobStatus.Pending: JobStatus.pending,
46+
APIJobStatus.Retry: JobStatus.pending,
47+
APIJobStatus.Running: JobStatus.started,
48+
APIJobStatus.FinishedSuccessfully: JobStatus.finished,
49+
APIJobStatus.FinishedWithError: JobStatus.failed,
50+
APIJobStatus.FinishedWithTimeout: JobStatus.failed,
51+
APIJobStatus.flowerNotAccessible: JobStatus.failed,
52+
APIJobStatus.Failure: JobStatus.failed,
53+
APIJobStatus.NotFound: JobStatus.failed,
54+
APIJobStatus.Revoked: JobStatus.cancelled
5555
}

0 commit comments

Comments
 (0)