diff --git a/Dockerfile b/Dockerfile index 0bbd611..33d290a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -30,6 +30,7 @@ RUN curl -L -o /usr/bin/kubectl https://storage.googleapis.com/kubernetes-releas ADD LICENSE / ADD gcp_deepvariant_runner.py /opt/deepvariant_runner/src/ ADD gke_cluster.py /opt/deepvariant_runner/src/ +ADD metrics.py /opt/deepvariant_runner/src/ ADD process_util.py /opt/deepvariant_runner/src/ ADD run_and_verify.sh /opt/deepvariant_runner/bin/ ADD cancel /opt/deepvariant_runner/bin/ diff --git a/gcp_deepvariant_runner.py b/gcp_deepvariant_runner.py index d1ef5ac..6699a1c 100644 --- a/gcp_deepvariant_runner.py +++ b/gcp_deepvariant_runner.py @@ -52,6 +52,7 @@ import argparse import datetime +import functools import json import logging import multiprocessing @@ -64,6 +65,8 @@ import uuid import gke_cluster +import metrics +import process_util from google.api_core import exceptions as google_exceptions from google.cloud import storage @@ -81,6 +84,12 @@ _DEFAULT_BOOT_DISK_SIZE_GB = '50' _ROLE_STORAGE_OBJ_CREATOR = ['storage.objects.create'] +# Metrics name. +_MAKE_EXAMPLES = 'MakeExamples' +_CALL_VARIANTS = 'CallVariants' +_POSTPROCESS_VARIANTS = 'PostprocessVariants' +_START = 'Start' + _GCSFUSE_IMAGE = 'gcr.io/cloud-genomics-pipelines/gcsfuse' _GCSFUSE_LOCAL_DIR_TEMPLATE = '/mnt/google/input-gcsfused-{SHARD_INDEX}/' @@ -273,6 +282,56 @@ def _write_actions_to_temp_file(actions): return temp_file.name +def _get_project_number(project_id): + """Returns GCP project number (-1 on failure).""" + if hasattr(_get_project_number, + 'project_number') and _get_project_number.project_number != -1: + return _get_project_number.project_number + try: + args = [ + 'gcloud', 'projects', 'describe', project_id, + '--format=value(projectNumber)' + ] + _get_project_number.project_number = process_util.run_command( + args, retries=2) + except RuntimeError: + # Error is already logged. + _get_project_number.project_number = -1 + return _get_project_number.project_number + + +def report_runtime_metrics(method_name): + """Decorator that reports runtime metrics.""" + + def decorated(func): + """Pseudo decorator.""" + @functools.wraps(func) + def wrapper(pipeline_args, *args, **kwargs): + """Wrapper that measures time elapsed, and reports it. + + if stop_collecting_anonymous_usage_metrics is set it only calls the method + without collecting any metrics. + """ + if pipeline_args.stop_collecting_anonymous_usage_metrics: + func(pipeline_args, *args, **kwargs) + else: + status = '_Failure' + start = time.time() + try: + func(pipeline_args, *args, **kwargs) + status = '_Success' + finally: + metrics_name = method_name + status + metrics.add( + _get_project_number(pipeline_args.project), + metrics_name, + duration_seconds=int(time.time() - start)) + + return wrapper + + return decorated + + def _run_job(run_args, log_path): """Runs a job using the pipelines CLI tool. @@ -386,6 +445,7 @@ def _meets_gcp_label_restrictions(label): label) is not None +@report_runtime_metrics(method_name=_MAKE_EXAMPLES) def _run_make_examples(pipeline_args): """Runs the make_examples job.""" @@ -612,6 +672,7 @@ def get_extra_args(): _wait_for_results(threads, results) +@report_runtime_metrics(method_name=_CALL_VARIANTS) def _run_call_variants(pipeline_args): """Runs the call_variants job.""" if pipeline_args.tpu: @@ -620,6 +681,7 @@ def _run_call_variants(pipeline_args): _run_call_variants_with_pipelines_api(pipeline_args) +@report_runtime_metrics(method_name=_POSTPROCESS_VARIANTS) def _run_postprocess_variants(pipeline_args): """Runs the postprocess_variants job.""" @@ -1040,10 +1102,65 @@ def run(argv=None): 'jobs. By default, the pipeline runs all 3 jobs (make_examples, ' 'call_variants, postprocess_variants) in sequence. ' 'This option may be used to run parts of the pipeline.')) + parser.add_argument( + '--stop_collecting_anonymous_usage_metrics', + default=False, + action='store_true', + help=('This tool collects some anonymous metrics related to utilized ' + 'resources, such as how many workers, how many CPU cores, how much ' + 'ram, etc. was used to run each step of DeepVariant. We use these ' + 'metrics to further improve the usibility of our tool. You can ' + 'compeletly stop collecting these metrics by setting this flag.')) pipeline_args = parser.parse_args(argv) _validate_and_complete_args(pipeline_args) + def get_image_version(docker_image): + return docker_image.split(':')[-1] + + def get_model_version(model): + return model.split('/')[-1] + + if not pipeline_args.stop_collecting_anonymous_usage_metrics: + metrics.add( + _get_project_number(pipeline_args.project), + _START, + # General run related settings. + image_version=get_image_version(pipeline_args.docker_image), + model_version=get_model_version(pipeline_args.model), + input_file_format=( + _BAM_FILE_SUFFIX if pipeline_args.bam.endswith(_BAM_FILE_SUFFIX) + else _CRAM_FILE_SUFFIX), + zones=pipeline_args.zones, + genomic_regions=( + len(pipeline_args.regions) if pipeline_args.regions else 0), + shards=pipeline_args.shards, + jobs_to_run=pipeline_args.jobs_to_run, + gvcf=True if pipeline_args.gvcf_outfile else False, + max_non_preemptible_tries=pipeline_args.max_non_preemptible_tries, + max_preemptible_tries=pipeline_args.max_preemptible_tries, + preemptible=pipeline_args.preemptible, + # Make_examples stage related settings. + gcsfuse=pipeline_args.gcsfuse, + make_examples_workers=pipeline_args.make_examples_workers, + make_examples_cores_per_worker=( + pipeline_args.make_examples_cores_per_worker), + make_examples_ram_per_worker_gb=( + pipeline_args.make_examples_ram_per_worker_gb), + # Call_variants stage related settings. + gpu=pipeline_args.gpu, + accelerator_type=pipeline_args.accelerator_type, + tpu=pipeline_args.tpu, + existing_gke_cluster=True if pipeline_args.gke_cluster_name else False, + call_variants_workers=pipeline_args.call_variants_workers, + call_variants_cores_per_worker=( + pipeline_args.call_variants_cores_per_worker), + call_variants_ram_per_worker_gb=( + pipeline_args.call_variants_ram_per_worker_gb), + # Postprocess stage related settings. + postprocess_variants_cores=pipeline_args.postprocess_variants_cores, + postprocess_variants_ram_gb=pipeline_args.postprocess_variants_ram_gb) + # TODO(b/112148076): Fail fast: validate GKE cluster early on in the pipeline. if _MAKE_EXAMPLES_JOB_NAME in pipeline_args.jobs_to_run: logging.info('Running make_examples...') diff --git a/gcp_deepvariant_runner_test.py b/gcp_deepvariant_runner_test.py index a47ea7d..2ac2b6d 100644 --- a/gcp_deepvariant_runner_test.py +++ b/gcp_deepvariant_runner_test.py @@ -102,7 +102,7 @@ def setUp(self): '--project', 'project', '--docker_image', - 'gcr.io/dockerimage', + 'gcr.io/dockerimage:tag', '--zones', 'zone-a', 'zone-b', @@ -111,7 +111,7 @@ def setUp(self): '--staging', 'gs://bucket/staging', '--model', - 'gs://bucket/model', + 'gs://bucket/model/version_type', '--bam', 'gs://bucket/bam', '--ref', @@ -122,19 +122,21 @@ def setUp(self): @mock.patch.object(multiprocessing, 'Pool') @mock.patch('gcp_deepvariant_runner._gcs_object_exist') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') - def testRunPipeline(self, mock_can_write_to_bucket, mock_obj_exist, mock_pool, - mock_run_job): + @mock.patch('gcp_deepvariant_runner._get_project_number') + def testRunPipeline(self, mock_get_project_number, mock_can_write_to_bucket, + mock_obj_exist, mock_pool, mock_run_job): mock_apply_async = mock_pool.return_value.apply_async mock_apply_async.return_value = None mock_obj_exist.return_value = True mock_can_write_to_bucket.return_value = True + mock_get_project_number.return_value = 1000 self._argv.extend( ['--make_examples_workers', '1', '--call_variants_workers', '1']) gcp_deepvariant_runner.run(self._argv) mock_apply_async.assert_has_calls([ mock.call(mock_run_job, [ - _HasAllOf('make_examples', 'gcr.io/dockerimage', + _HasAllOf('make_examples', 'gcr.io/dockerimage:tag', 'INPUT_BAM=gs://bucket/bam', 'INPUT_BAI=gs://bucket/bam.bai', 'INPUT_REF=gs://bucket/ref', @@ -144,8 +146,8 @@ def testRunPipeline(self, mock_can_write_to_bucket, mock_obj_exist, mock_pool, 'gs://bucket/staging/logs/make_examples/0' ]), mock.call(mock_run_job, [ - _HasAllOf('call_variants', 'gcr.io/dockerimage', - 'MODEL=gs://bucket/model', + _HasAllOf('call_variants', 'gcr.io/dockerimage:tag', + 'MODEL=gs://bucket/model/version_type', 'EXAMPLES=gs://bucket/staging/examples/0/*', 'CALLED_VARIANTS=gs://bucket/staging/called_variants/*', '--output-interval', '60s'), @@ -155,7 +157,7 @@ def testRunPipeline(self, mock_can_write_to_bucket, mock_obj_exist, mock_pool, self.assertEqual(mock_apply_async.call_count, 2) mock_run_job.assert_called_once_with( - _HasAllOf('postprocess_variants', 'gcr.io/dockerimage', + _HasAllOf('postprocess_variants', 'gcr.io/dockerimage:tag', 'CALLED_VARIANTS=gs://bucket/staging/called_variants/*', 'OUTFILE=gs://bucket/output.vcf', '--output-interval', '60s'), 'gs://bucket/staging/logs/postprocess_variants') @@ -164,12 +166,15 @@ def testRunPipeline(self, mock_can_write_to_bucket, mock_obj_exist, mock_pool, @mock.patch.object(multiprocessing, 'Pool') @mock.patch('gcp_deepvariant_runner._gcs_object_exist') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') - def testRunPipeline_WithGVCFOutFile(self, mock_can_write_to_bucket, - mock_obj_exist, mock_pool, mock_run_job): + @mock.patch('gcp_deepvariant_runner._get_project_number') + def testRunPipeline_WithGVCFOutFile(self, mock_get_project_number, + mock_can_write_to_bucket, mock_obj_exist, + mock_pool, mock_run_job): mock_apply_async = mock_pool.return_value.apply_async mock_apply_async.return_value = None mock_obj_exist.return_value = True mock_can_write_to_bucket.return_value = True + mock_get_project_number.return_value = 1000 self._argv.extend([ '--make_examples_workers', '1', @@ -184,7 +189,7 @@ def testRunPipeline_WithGVCFOutFile(self, mock_can_write_to_bucket, mock_apply_async.assert_has_calls([ mock.call(mock_run_job, [ - _HasAllOf('make_examples', 'gcr.io/dockerimage', + _HasAllOf('make_examples', 'gcr.io/dockerimage:tag', 'INPUT_BAM=gs://bucket/bam', 'INPUT_BAI=gs://bucket/bam.bai', 'INPUT_REF=gs://bucket/ref', @@ -194,8 +199,8 @@ def testRunPipeline_WithGVCFOutFile(self, mock_can_write_to_bucket, 'gs://bucket/staging/logs/make_examples/0' ]), mock.call(mock_run_job, [ - _HasAllOf('call_variants', 'gcr.io/dockerimage', - 'MODEL=gs://bucket/model', + _HasAllOf('call_variants', 'gcr.io/dockerimage:tag', + 'MODEL=gs://bucket/model/version_type', 'EXAMPLES=gs://bucket/staging/examples/0/*', 'CALLED_VARIANTS=gs://bucket/staging/called_variants/*'), 'gs://bucket/staging/logs/call_variants/0' @@ -204,7 +209,7 @@ def testRunPipeline_WithGVCFOutFile(self, mock_can_write_to_bucket, self.assertEqual(mock_apply_async.call_count, 2) mock_run_job.assert_called_once_with( - _HasAllOf('postprocess_variants', 'gcr.io/dockerimage', + _HasAllOf('postprocess_variants', 'gcr.io/dockerimage:tag', 'CALLED_VARIANTS=gs://bucket/staging/called_variants/*', 'OUTFILE=gs://bucket/output.vcf', 'GVCF=gs://bucket/staging/gvcf/*', @@ -214,12 +219,14 @@ def testRunPipeline_WithGVCFOutFile(self, mock_can_write_to_bucket, @mock.patch.object(multiprocessing, 'Pool') @mock.patch('gcp_deepvariant_runner._gcs_object_exist') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') - def testRunMakeExamples(self, mock_can_write_to_bucket, mock_obj_exist, - mock_pool): + @mock.patch('gcp_deepvariant_runner._get_project_number') + def testRunMakeExamples(self, mock_get_project_number, + mock_can_write_to_bucket, mock_obj_exist, mock_pool): mock_apply_async = mock_pool.return_value.apply_async mock_apply_async.return_value = None mock_obj_exist.return_value = True mock_can_write_to_bucket.return_value = True + mock_get_project_number.return_value = 1000 self._argv.extend([ '--jobs_to_run', 'make_examples', @@ -250,7 +257,7 @@ def testRunMakeExamples(self, mock_can_write_to_bucket, mock_obj_exist, # Verifying Pipeline's API commands mock_apply_async.assert_has_calls([ mock.call(mock.ANY, [ - _HasAllOf('prefix_make_examples', 'gcr.io/dockerimage', + _HasAllOf('prefix_make_examples', 'gcr.io/dockerimage:tag', 'EXAMPLES=gs://bucket/staging/examples/0/*', 'INPUT_BAM=gs://bucket/bam', 'INPUT_REF=gs://bucket/ref', 'INPUT_BAI=gs://bucket/bam.bai', @@ -260,7 +267,7 @@ def testRunMakeExamples(self, mock_can_write_to_bucket, mock_obj_exist, 'gs://bucket/staging/logs/make_examples/0' ]), mock.call(mock.ANY, [ - _HasAllOf('prefix_make_examples', 'gcr.io/dockerimage', + _HasAllOf('prefix_make_examples', 'gcr.io/dockerimage:tag', 'EXAMPLES=gs://bucket/staging/examples/0/*', 'INPUT_BAM=gs://bucket/bam', 'INPUT_REF=gs://bucket/ref', 'INPUT_BAI=gs://bucket/bam.bai', @@ -270,7 +277,7 @@ def testRunMakeExamples(self, mock_can_write_to_bucket, mock_obj_exist, 'gs://bucket/staging/logs/make_examples/1' ]), mock.call(mock.ANY, [ - _HasAllOf('prefix_make_examples', 'gcr.io/dockerimage', + _HasAllOf('prefix_make_examples', 'gcr.io/dockerimage:tag', 'EXAMPLES=gs://bucket/staging/examples/0/*', 'INPUT_BAM=gs://bucket/bam', 'INPUT_REF=gs://bucket/ref', 'INPUT_BAI=gs://bucket/bam.bai', @@ -301,7 +308,7 @@ def testRunMakeExamples(self, mock_can_write_to_bucket, mock_obj_exist, ['-c', expected_command], 'entrypoint': 'bash', 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], - 'imageUri': 'gcr.io/dockerimage'}] + 'imageUri': 'gcr.io/dockerimage:tag'}] self.assertEqual(len(expected_actions_list), len(recieved_actions_list)) for i in range(len(expected_actions_list)): self.assertEqual(sorted(expected_actions_list[i].items()), @@ -310,12 +317,15 @@ def testRunMakeExamples(self, mock_can_write_to_bucket, mock_obj_exist, @mock.patch.object(multiprocessing, 'Pool') @mock.patch('gcp_deepvariant_runner._gcs_object_exist') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') - def testRunMakeExamples_WithGcsfuse(self, mock_can_write_to_bucket, + @mock.patch('gcp_deepvariant_runner._get_project_number') + def testRunMakeExamples_WithGcsfuse(self, mock_get_project_number, + mock_can_write_to_bucket, mock_obj_exist, mock_pool): mock_apply_async = mock_pool.return_value.apply_async mock_apply_async.return_value = None mock_obj_exist.return_value = True mock_can_write_to_bucket.return_value = True + mock_get_project_number.return_value = 1000 self._argv.extend([ '--jobs_to_run', 'make_examples', @@ -343,7 +353,7 @@ def testRunMakeExamples_WithGcsfuse(self, mock_can_write_to_bucket, # Verifying Pipeline's API commands mock_apply_async.assert_has_calls([ mock.call(mock.ANY, [ - _HasAllOf('prefix_make_examples', 'gcr.io/dockerimage', + _HasAllOf('prefix_make_examples', 'gcr.io/dockerimage:tag', 'EXAMPLES=gs://bucket/staging/examples/0/*', 'INPUT_REF=gs://bucket/ref', 'INPUT_BAI=gs://bucket/bam.bai', @@ -351,7 +361,7 @@ def testRunMakeExamples_WithGcsfuse(self, mock_can_write_to_bucket, 'gs://bucket/staging/logs/make_examples/0' ]), mock.call(mock.ANY, [ - _HasAllOf('prefix_make_examples', 'gcr.io/dockerimage', + _HasAllOf('prefix_make_examples', 'gcr.io/dockerimage:tag', 'EXAMPLES=gs://bucket/staging/examples/0/*', 'INPUT_REF=gs://bucket/ref', 'INPUT_BAI=gs://bucket/bam.bai', @@ -359,7 +369,7 @@ def testRunMakeExamples_WithGcsfuse(self, mock_can_write_to_bucket, 'gs://bucket/staging/logs/make_examples/1' ]), mock.call(mock.ANY, [ - _HasAllOf('prefix_make_examples', 'gcr.io/dockerimage', + _HasAllOf('prefix_make_examples', 'gcr.io/dockerimage:tag', 'EXAMPLES=gs://bucket/staging/examples/1/*', 'INPUT_REF=gs://bucket/ref', 'INPUT_BAI=gs://bucket/bam.bai', @@ -367,7 +377,7 @@ def testRunMakeExamples_WithGcsfuse(self, mock_can_write_to_bucket, 'gs://bucket/staging/logs/make_examples/2' ]), mock.call(mock.ANY, [ - _HasAllOf('prefix_make_examples', 'gcr.io/dockerimage', + _HasAllOf('prefix_make_examples', 'gcr.io/dockerimage:tag', 'EXAMPLES=gs://bucket/staging/examples/1/*', 'INPUT_REF=gs://bucket/ref', 'INPUT_BAI=gs://bucket/bam.bai', @@ -418,7 +428,7 @@ def testRunMakeExamples_WithGcsfuse(self, mock_can_write_to_bucket, ['-c', expected_command], 'entrypoint': 'bash', 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], - 'imageUri': 'gcr.io/dockerimage'}) + 'imageUri': 'gcr.io/dockerimage:tag'}) self.assertEqual(len(expected_actions_list), len(recieved_actions_list)) for i in range(len(expected_actions_list)): self.assertEqual(sorted(expected_actions_list[i].items()), @@ -427,12 +437,14 @@ def testRunMakeExamples_WithGcsfuse(self, mock_can_write_to_bucket, @mock.patch.object(multiprocessing, 'Pool') @mock.patch('gcp_deepvariant_runner._gcs_object_exist') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') - def testRunCallVariants(self, mock_can_write_to_bucket, mock_obj_exist, - mock_pool): + @mock.patch('gcp_deepvariant_runner._get_project_number') + def testRunCallVariants(self, mock_get_project_number, + mock_can_write_to_bucket, mock_obj_exist, mock_pool): mock_apply_async = mock_pool.return_value.apply_async mock_apply_async.return_value = None mock_obj_exist.return_value = True mock_can_write_to_bucket.return_value = True + mock_get_project_number.return_value = 1000 self._argv.extend([ '--make_examples_workers', '3', @@ -449,17 +461,20 @@ def testRunCallVariants(self, mock_can_write_to_bucket, mock_obj_exist, mock_apply_async.assert_has_calls([ mock.call(mock.ANY, [ - _HasAllOf('call_variants', 'gcr.io/dockerimage', + _HasAllOf('call_variants', 'gcr.io/dockerimage:tag', + '--attempts', '2', '--pvm-attempts', '0', 'CALL_VARIANTS_SHARD_INDEX=0'), 'gs://bucket/staging/logs/call_variants/0' ]), mock.call(mock.ANY, [ - _HasAllOf('call_variants', 'gcr.io/dockerimage', + _HasAllOf('call_variants', 'gcr.io/dockerimage:tag', + '--attempts', '2', '--pvm-attempts', '0', 'CALL_VARIANTS_SHARD_INDEX=1'), 'gs://bucket/staging/logs/call_variants/1' ]), mock.call(mock.ANY, [ - _HasAllOf('call_variants', 'gcr.io/dockerimage', + _HasAllOf('call_variants', 'gcr.io/dockerimage:tag', + '--attempts', '2', '--pvm-attempts', '0', 'CALL_VARIANTS_SHARD_INDEX=2'), 'gs://bucket/staging/logs/call_variants/2' ]), @@ -469,12 +484,15 @@ def testRunCallVariants(self, mock_can_write_to_bucket, mock_obj_exist, @mock.patch.object(multiprocessing, 'Pool') @mock.patch('gcp_deepvariant_runner._gcs_object_exist') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') - def testRunCallVariants_GPU(self, mock_can_write_to_bucket, mock_obj_exist, + @mock.patch('gcp_deepvariant_runner._get_project_number') + def testRunCallVariants_GPU(self, mock_get_project_number, + mock_can_write_to_bucket, mock_obj_exist, mock_pool): mock_apply_async = mock_pool.return_value.apply_async mock_apply_async.return_value = None mock_obj_exist.return_value = True mock_can_write_to_bucket.return_value = True + mock_get_project_number.return_value = 1000 self._argv.extend([ '--make_examples_workers', '3', @@ -516,11 +534,14 @@ def testRunCallVariants_GPU(self, mock_can_write_to_bucket, mock_obj_exist, @mock.patch.object(gke_cluster.GkeCluster, '_cluster_exists') @mock.patch('gcp_deepvariant_runner._gcs_object_exist') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') - def testRunCallVariants_TPU(self, mock_can_write_to_bucket, mock_obj_exist, + @mock.patch('gcp_deepvariant_runner._get_project_number') + def testRunCallVariants_TPU(self, mock_get_project_number, + mock_can_write_to_bucket, mock_obj_exist, mock_cluster_exists, mock_deploy_pod, mock_init): mock_obj_exist.return_value = True mock_can_write_to_bucket.return_value = True mock_cluster_exists.return_value = True + mock_get_project_number.return_value = 1000 self._argv.extend([ '--jobs_to_run', 'call_variants', @@ -534,7 +555,7 @@ def testRunCallVariants_TPU(self, mock_can_write_to_bucket, mock_obj_exist, '--gke_cluster_zone', 'us-central1-c', '--docker_image', - 'gcr.io/dockerimage', + 'gcr.io/dockerimage:tag', ]) gcp_deepvariant_runner.run(self._argv) mock_init.assert_has_calls([ @@ -564,7 +585,7 @@ def testRunFailCallVariants_TPU(self): '--gke_cluster_zone', 'us-central1-c', '--docker_image', - 'gcr.io/dockerimage', + 'gcr.io/dockerimage:tag', ]) with self.assertRaises(ValueError): gcp_deepvariant_runner.run(self._argv) @@ -572,10 +593,13 @@ def testRunFailCallVariants_TPU(self): @mock.patch('gcp_deepvariant_runner._run_job') @mock.patch('gcp_deepvariant_runner._gcs_object_exist') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') - def testRunPostProcessVariants(self, mock_can_write_to_bucket, mock_obj_exist, + @mock.patch('gcp_deepvariant_runner._get_project_number') + def testRunPostProcessVariants(self, mock_get_project_number, + mock_can_write_to_bucket, mock_obj_exist, mock_run_job): mock_obj_exist.return_value = True mock_can_write_to_bucket.return_value = True + mock_get_project_number.return_value = 1000 self._argv.extend([ '--jobs_to_run', 'postprocess_variants', @@ -589,13 +613,77 @@ def testRunPostProcessVariants(self, mock_can_write_to_bucket, mock_obj_exist, ]) gcp_deepvariant_runner.run(self._argv) mock_run_job.assert_called_once_with( - _HasAllOf('postprocess_variants', 'gcr.io/dockerimage', + _HasAllOf('postprocess_variants', 'gcr.io/dockerimage:tag', 'CALLED_VARIANTS=gs://bucket/staging/called_variants/*', 'INPUT_REF=gs://bucket/ref', 'SHARDS=15', 'CALL_VARIANTS_SHARDS=1', 'INPUT_REF_FAI=gs://bucket/ref.fai', 'OUTFILE=gs://bucket/output.vcf'), 'gs://bucket/staging/logs/postprocess_variants') + @mock.patch('gcp_deepvariant_runner._run_job') + @mock.patch.object(multiprocessing, 'Pool') + @mock.patch('gcp_deepvariant_runner._gcs_object_exist') + @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') + @mock.patch('gcp_deepvariant_runner._get_project_number') + @mock.patch('metrics.add') + def testRunPipeline_Metrics(self, mock_metrics_add, mock_get_project_number, + mock_can_write_to_bucket, mock_obj_exist, + mock_pool, unused_mock_run_job): + mock_apply_async = mock_pool.return_value.apply_async + mock_apply_async.return_value = None + mock_obj_exist.return_value = True + mock_can_write_to_bucket.return_value = True + mock_get_project_number.return_value = 1000 + self._argv.extend([ + '--make_examples_workers', '2', '--make_examples_cores_per_worker', '8', + '--make_examples_ram_per_worker_gb', '32', + '--call_variants_workers', '1', '--call_variants_cores_per_worker', '2', + '--call_variants_ram_per_worker_gb', '8', + '--postprocess_variants_cores', '4', + '--postprocess_variants_ram_gb', '16', + '--shards', '16', '--gcsfuse', + '--regions', 'gs://bucket/region-1.bed', 'chr1:10-20', 'chr2:1-2', + '--gvcf_outfile', 'gvcf-folder-path', + '--gpu', '--docker_image_gpu', 'gcr.io/dockerimage_gpu', + '--max_non_preemptible_tries', '1', '--max_preemptible_tries', '2' + ]) + + gcp_deepvariant_runner.run(self._argv) + metrics_calls = [ + mock.call( + 1000, + 'Start', + accelerator_type='nvidia-tesla-k80', + call_variants_cores_per_worker=2, + call_variants_ram_per_worker_gb=8, + call_variants_workers=1, + existing_gke_cluster=False, + gcsfuse=True, + genomic_regions=3, + gpu=True, + gvcf=True, + image_version='tag', + input_file_format='.cram', + jobs_to_run=[ + 'make_examples', 'call_variants', 'postprocess_variants'], + make_examples_cores_per_worker=8, + make_examples_ram_per_worker_gb=32, + make_examples_workers=2, + max_non_preemptible_tries=1, + max_preemptible_tries=2, + model_version='version_type', + postprocess_variants_cores=4, + postprocess_variants_ram_gb=16, + preemptible=False, + shards=16, + tpu=False, + zones=['zone-a', 'zone-b']), + mock.call(1000, 'MakeExamples_Success', duration_seconds=0), + mock.call(1000, 'CallVariants_Success', duration_seconds=0), + mock.call(1000, 'PostprocessVariants_Success', duration_seconds=0) + ] + mock_metrics_add.assert_has_calls(metrics_calls) + @mock.patch.object(storage.bucket.Bucket, 'test_iam_permissions') def testRunFailsMissingInput(self, mock_bucket_iam): mock_bucket_iam.return_value = ( diff --git a/metrics.py b/metrics.py new file mode 100644 index 0000000..559191b --- /dev/null +++ b/metrics.py @@ -0,0 +1,185 @@ +# Copyright 2019 Google LLC. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from this +# software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +"""Used to collect anonymous DeepVariant metrics.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import atexit +import functools +import json +import logging +import time +import uuid + +import requests +from typing import Dict, Optional, Text + +_CLEARCUT_ENDPOINT = 'https://play.googleapis.com/log' +_CLOUD_HCLS_OSS = 'CLOUD_HCLS_OSS' +_CONCORD = 'CONCORD' +_DEEP_VARIANT_RUN = 'DeepVariantRun' +_HTTP_REQUEST_TIMEOUT_SEC = 10 +_PYTHON = 'PYTHON' +_VIRTUAL_HCLS_DEEPVARIANT = 'virtual.hcls.deepvariant' + + +def capture_exceptions(func): + """Function decorator to capture and log any exceptions.""" + + @functools.wraps(func) + def wrapper(*args, **kwds): + try: + return func(*args, **kwds) + # pylint:disable=broad-except + except Exception as e: + logging.error('Exception captured in %s : %s', func.__name__, e) + + return wrapper + + +class _ConcordEvent(object): + """Encapsulates information representing a Concord event.""" + + def __init__(self, + event_name: Text, + event_type: Text, + project_number: int, + console_type: Text, + page_hostname: Text, + event_metadata: Optional[Dict[Text, Text]] = None) -> None: + self._event_name = event_name + self._event_type = event_type + self._project_number = project_number + self._console_type = console_type + self._page_hostname = page_hostname + self._event_metadata = event_metadata or {} + + def to_json(self, **kwargs): + """Encodes data in json.""" + event_dict = { + 'project_number': str(self._project_number), + 'event_name': self._event_name, + 'event_type': self._event_type, + 'console_type': self._console_type, + 'page_hostname': self._page_hostname, + 'event_metadata': self._event_metadata_as_kv(), + } + return json.dumps(event_dict, **kwargs) + + def _event_metadata_as_kv(self): + kv_list = [] + for k, v in sorted(self._event_metadata.items()): + kv_list.append({'key': k, 'value': str(v)}) + + return kv_list + + +class _MetricsCollector(object): + """A class that collects and submits metrics. + + Instances of this class share the same internal state, and thus behave the + same all the time. + """ + _events = [] + _session_identifier = uuid.uuid4().hex + + def add_metrics(self, project_number: int, + metrics_name: Text, **metrics_kw: Text) -> None: + concord_event = _ConcordEvent( + event_name=metrics_name, + event_type=_DEEP_VARIANT_RUN, + project_number=project_number, + console_type=_CLOUD_HCLS_OSS, + page_hostname=_VIRTUAL_HCLS_DEEPVARIANT, + event_metadata={k: v for k, v in metrics_kw.items()}) + self._events.append(concord_event) + + def submit_metrics(self): + """Submits all the collected metrics to Concord endpoint. + + Raises: + HTTPError if http request doesn't succeed (status code != 200). + """ + request_data = json.dumps(self._clearcut_request(), sort_keys=True) + requests.post( + url=_CLEARCUT_ENDPOINT, + data=request_data, + headers=None, + timeout=_HTTP_REQUEST_TIMEOUT_SEC).raise_for_status() + + def _clearcut_request(self): + # We dont have (or want to have) any cookies. So, using a random ID for + # zwieback_cookie is ok for tracking purposes. + return { + 'client_info': { + 'client_type': _PYTHON, + }, + 'log_source_name': + _CONCORD, + 'zwieback_cookie': + self._session_identifier, + 'request_time_ms': + _now_ms(), + 'log_event': [{ + 'source_extension_json': e.to_json(sort_keys=True) + } for e in self._events] + } + + +def _now_ms(): + """Returns current time in milliseconds.""" + return int(round(time.time() * 1000)) + + +def add(project_number: int, metrics_name: Text, **metrics_kw: Text) -> None: + """Adds the given metric to the metrics to be submitted to Concord. + + Note: All metrics are submitted at exit. + Note: Do not rely on thread safety of this method. + + Args: + project_number(int): GCP project number. + metrics_name(str): metrics name. + **metrics_kw: key-values of metrics. For example, for + metrics_name="MakeExamplesSuccess", metrics_kw can be + duration_seconds=1000, wait_duration_seconds=100. + """ + metrics_collector = _MetricsCollector() + metrics_collector.add_metrics(project_number, metrics_name, **metrics_kw) + + +# Exceptions are captured and logged to avoid crashing callers. +@capture_exceptions +@atexit.register +def shutdown(): + """Reports all metrics that were collected.""" + metrics_collector = _MetricsCollector() + metrics_collector.submit_metrics() diff --git a/metrics_test.py b/metrics_test.py new file mode 100644 index 0000000..8c7b2bd --- /dev/null +++ b/metrics_test.py @@ -0,0 +1,230 @@ +# Copyright 2019 Google LLC. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from this +# software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +"""Tests for metrics.py. + +To run the tests, first activate virtualenv and install required packages: +$ virtualenv venv +$ . venv/bin/activate +$ pip install mock requests + +Then run: +$ python metrics_test.py +""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import json +import unittest + +import metrics +import mock + + +# This is to test if all metrics collector instances share same session +# identifier. Mocks '_session_identifier' on import. +@mock.patch('metrics._MetricsCollector._session_identifier', 'abcd') +class MetricsCollectorTest(unittest.TestCase): + """Tests for MetricsCollector class.""" + + def setUp(self): + super(MetricsCollectorTest, self).setUp() + # 'metrics_collector' has class attributes, clear them before each test. + metrics._MetricsCollector()._events[:] = [] + + @mock.patch('requests.post') + @mock.patch('time.time', return_value=1234) + def test_submit_metrics(self, unused_mock_time, mock_requests_post): + metrics.add( + 123, + 'test-metrics-1', + attribute_1=1, + attribute_2='string-1', + attribute_3=True) + metrics.add( + 123, + 'test-metrics-2', + attribute_1=2, + attribute_2='string-2', + attribute_3=True) + metrics._MetricsCollector().submit_metrics() + + mock_requests_post.assert_called_with( + data=json.dumps( + { + 'zwieback_cookie': 'abcd', + 'request_time_ms': 1234000, + 'log_source_name': 'CONCORD', + 'log_event': [ + { + 'source_extension_json': + json.dumps( + { + 'console_type': 'CLOUD_HCLS_OSS', + 'event_metadata': [ + { + 'key': 'attribute_1', + 'value': '1' + }, + { + 'key': 'attribute_2', + 'value': 'string-1' + }, + { + 'key': 'attribute_3', + 'value': 'True' + } + ], + 'event_name': 'test-metrics-1', + 'event_type': 'DeepVariantRun', + 'page_hostname': 'virtual.hcls.deepvariant', + 'project_number': '123' + }, + sort_keys=True) + }, + { + 'source_extension_json': + json.dumps({ + 'console_type': 'CLOUD_HCLS_OSS', + 'event_metadata': [ + { + 'key': 'attribute_1', + 'value': '2' + }, + { + 'key': 'attribute_2', + 'value': 'string-2' + }, + { + 'key': 'attribute_3', + 'value': 'True' + } + ], + 'event_name': 'test-metrics-2', + 'event_type': 'DeepVariantRun', + 'page_hostname': 'virtual.hcls.deepvariant', + 'project_number': '123' + }, + sort_keys=True) + } + ], + 'client_info': { + 'client_type': 'PYTHON' + } + }, + sort_keys=True), + headers=None, + timeout=10, + url=metrics._CLEARCUT_ENDPOINT) + + @mock.patch('requests.post') + @mock.patch('time.time', side_effect=(1234, 1235)) + def test_two_metrics_collector(self, unused_mock_time, mock_requests_post): + first_metric_collector = metrics._MetricsCollector() + second_metric_collector = metrics._MetricsCollector() + + first_metric_collector.add_metrics(123, 'test-metrics-1', attribute_1=1) + second_metric_collector.add_metrics(123, 'test-metrics-2', attribute_2=2) + metrics.add(123, 'test-metrics-3', attribute_3=3) + + def expected_post_data(request_time_ms): + template = { + 'zwieback_cookie': 'abcd', + 'log_source_name': 'CONCORD', + 'log_event': [ + { + 'source_extension_json': + json.dumps({ + 'console_type': 'CLOUD_HCLS_OSS', + 'event_metadata': [{ + 'key': 'attribute_1', + 'value': '1' + }], + 'event_name': 'test-metrics-1', + 'event_type': 'DeepVariantRun', + 'page_hostname': 'virtual.hcls.deepvariant', + 'project_number': '123' + }, + sort_keys=True) + }, + { + 'source_extension_json': + json.dumps({ + 'console_type': 'CLOUD_HCLS_OSS', + 'event_metadata': [{ + 'key': 'attribute_2', + 'value': '2' + }], + 'event_name': 'test-metrics-2', + 'event_type': 'DeepVariantRun', + 'page_hostname': 'virtual.hcls.deepvariant', + 'project_number': '123' + }, + sort_keys=True) + }, + { + 'source_extension_json': + json.dumps({ + 'console_type': 'CLOUD_HCLS_OSS', + 'event_metadata': [{ + 'key': 'attribute_3', + 'value': '3' + }], + 'event_name': 'test-metrics-3', + 'event_type': 'DeepVariantRun', + 'page_hostname': 'virtual.hcls.deepvariant', + 'project_number': '123' + }, + sort_keys=True) + } + ], + 'client_info': { + 'client_type': 'PYTHON' + } + } + template.update({'request_time_ms': request_time_ms}) + return json.dumps(template, sort_keys=True) + + first_metric_collector.submit_metrics() + mock_requests_post.assert_called_with( + data=expected_post_data(1234000), + headers=None, + timeout=10, + url=metrics._CLEARCUT_ENDPOINT) + + second_metric_collector.submit_metrics() + mock_requests_post.assert_called_with( + data=expected_post_data(1235000), + headers=None, + timeout=10, + url=metrics._CLEARCUT_ENDPOINT) + + +if __name__ == '__main__': + unittest.main()