diff --git a/src/clusterfuzz/_internal/protos/swarming.proto b/src/clusterfuzz/_internal/protos/swarming.proto index c5822c9b4f..8743524e96 100644 --- a/src/clusterfuzz/_internal/protos/swarming.proto +++ b/src/clusterfuzz/_internal/protos/swarming.proto @@ -1,4 +1,4 @@ -// Copyright 2024 Google LLC +// Copyright 2026 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,12 +12,71 @@ // See the License for the specific language governing permissions and // limitations under the License. -// This file is based on https://source.chromium.org/chromium/infra/infra/+/main:luci/appengine/swarming/proto/api_v2/swarming.proto +// This file is based on https://source.chromium.org/chromium/infra/infra_superproject/+/main:infra/luci/appengine/swarming/proto/api_v2/swarming.proto // This includes necessary messages to construct a NewTaskRequest syntax = "proto3"; package swarming.v2; +import "google/protobuf/timestamp.proto"; + +// Enums + +// Use one of the values in this enum to query for tasks in one of the +// specified state. +// +// Use 'ALL' to not use any filtering based on task state. +// +// As an example, this enum enables querying for all tasks with state COMPLETED +// but non-zero exit code via COMPLETED_FAILURE. +// +// Do not confuse StateQuery and TaskState. StateQuery is to query tasks +// via the API. TaskState is the current task state. +enum StateQuery { + // Query for all tasks currently TaskState.PENDING. + QUERY_PENDING = 0; + // Query for all tasks currently TaskState.RUNNING. This includes tasks + // currently in the overhead phase; mapping input files or archiving outputs + // back to the server. + QUERY_RUNNING = 1; + // Query for all tasks currently TaskState.PENDING or TaskState.RUNNING. This + // is the query for the 'active' tasks. + QUERY_PENDING_RUNNING = 2; + // Query for all tasks that completed normally as TaskState.COMPLETED, + // independent of the process exit code. + QUERY_COMPLETED = 3; + // Query for all tasks that completed normally as TaskState.COMPLETED and that + // had exit code 0. + QUERY_COMPLETED_SUCCESS = 4; + // Query for all tasks that completed normally as TaskState.COMPLETED and that + // had exit code not 0. + QUERY_COMPLETED_FAILURE = 5; + // Query for all tasks that are TaskState.EXPIRED. + QUERY_EXPIRED = 6; + // Query for all tasks that are TaskState.TIMED_OUT. + QUERY_TIMED_OUT = 7; + // Query for all tasks that are TaskState.BOT_DIED. + QUERY_BOT_DIED = 8; + // Query for all tasks that are TaskState.CANCELED. + QUERY_CANCELED = 9; + // Query for all tasks, independent of the task state. + // + // In hindsight, this constant should have been the value 0. Sorry, the + // original author was young and foolish. + QUERY_ALL = 10; + // Query for all tasks that are TaskState.COMPLETED but that actually didn't + // run due to TaskProperties.idempotent being True *and* that a previous task + // with the exact same TaskProperties had successfully run before, aka + // COMPLETED_SUCCESS. + QUERY_DEDUPED = 11; + // Query for all tasks that are TaskState.KILLED. + QUERY_KILLED = 12; + // Query for all tasks that are TaskState.NO_RESOURCE. + QUERY_NO_RESOURCE = 13; + // Query for all tasks that are TaskState.CLIENT_ERROR. + QUERY_CLIENT_ERROR = 14; +} + // Messages // Represents a mapping of string to a string. @@ -345,4 +404,17 @@ message NewTaskRequest { // Task realm. // See api/swarming.proto for more details. string realm = 18; -} \ No newline at end of file +} + +message TasksCountRequest { + google.protobuf.Timestamp start = 1; + google.protobuf.Timestamp end = 2; + StateQuery state = 3; + repeated string tags = 4; +} + +// Returns the count, as requested. +message TasksCount { + int32 count = 1; + google.protobuf.Timestamp now = 2; +} diff --git a/src/clusterfuzz/_internal/protos/swarming_pb2.py b/src/clusterfuzz/_internal/protos/swarming_pb2.py index 543c11cce9..d67b1f95af 100644 --- a/src/clusterfuzz/_internal/protos/swarming_pb2.py +++ b/src/clusterfuzz/_internal/protos/swarming_pb2.py @@ -26,41 +26,48 @@ _sym_db = _symbol_database.Default() +from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n+clusterfuzz/_internal/protos/swarming.proto\x12\x0bswarming.v2\"(\n\nStringPair\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t\",\n\x0eStringListPair\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x03(\t\"*\n\x06\x44igest\x12\x0c\n\x04hash\x18\x01 \x01(\t\x12\x12\n\nsize_bytes\x18\x02 \x01(\x03\"I\n\x0c\x43\x41SReference\x12\x14\n\x0c\x63\x61s_instance\x18\x01 \x01(\t\x12#\n\x06\x64igest\x18\x02 \x01(\x0b\x32\x13.swarming.v2.Digest\"B\n\x0b\x43ipdPackage\x12\x14\n\x0cpackage_name\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x0c\n\x04path\x18\x03 \x01(\t\"y\n\tCipdInput\x12\x0e\n\x06server\x18\x01 \x01(\t\x12\x30\n\x0e\x63lient_package\x18\x02 \x01(\x0b\x32\x18.swarming.v2.CipdPackage\x12*\n\x08packages\x18\x03 \x03(\x0b\x32\x18.swarming.v2.CipdPackage\"(\n\nCacheEntry\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0c\n\x04path\x18\x02 \x01(\t\"\xf2\x01\n\x0b\x43ontainment\x12\x16\n\x0elower_priority\x18\x01 \x01(\x08\x12\x42\n\x10\x63ontainment_type\x18\x02 \x01(\x0e\x32(.swarming.v2.Containment.ContainmentType\x12\x17\n\x0flimit_processes\x18\x03 \x01(\x03\x12$\n\x1climit_total_committed_memory\x18\x04 \x01(\x03\"H\n\x0f\x43ontainmentType\x12\x11\n\rNOT_SPECIFIED\x10\x00\x12\x08\n\x04NONE\x10\x01\x12\x08\n\x04\x41UTO\x10\x02\x12\x0e\n\nJOB_OBJECT\x10\x03\"\x83\x04\n\x0eTaskProperties\x12\'\n\x06\x63\x61\x63hes\x18\x01 \x03(\x0b\x32\x17.swarming.v2.CacheEntry\x12*\n\ncipd_input\x18\x02 \x01(\x0b\x32\x16.swarming.v2.CipdInput\x12\x0f\n\x07\x63ommand\x18\x03 \x03(\t\x12\x14\n\x0crelative_cwd\x18\x04 \x01(\t\x12+\n\ndimensions\x18\x05 \x03(\x0b\x32\x17.swarming.v2.StringPair\x12$\n\x03\x65nv\x18\x06 \x03(\x0b\x32\x17.swarming.v2.StringPair\x12\x31\n\x0c\x65nv_prefixes\x18\x07 \x03(\x0b\x32\x1b.swarming.v2.StringListPair\x12\x1e\n\x16\x65xecution_timeout_secs\x18\x08 \x01(\x05\x12\x19\n\x11grace_period_secs\x18\t \x01(\x05\x12\x12\n\nidempotent\x18\n \x01(\x08\x12\x31\n\x0e\x63\x61s_input_root\x18\x0b \x01(\x0b\x32\x19.swarming.v2.CASReference\x12\x17\n\x0fio_timeout_secs\x18\x0c \x01(\x05\x12\x0f\n\x07outputs\x18\r \x03(\t\x12\x14\n\x0csecret_bytes\x18\x0e \x01(\x0c\x12-\n\x0b\x63ontainment\x18\x0f \x01(\x0b\x32\x18.swarming.v2.Containment\"p\n\tTaskSlice\x12/\n\nproperties\x18\x01 \x01(\x0b\x32\x1b.swarming.v2.TaskProperties\x12\x17\n\x0f\x65xpiration_secs\x18\x02 \x01(\x05\x12\x19\n\x11wait_for_capacity\x18\x03 \x01(\x08\"\x1d\n\x0bResultDBCfg\x12\x0e\n\x06\x65nable\x18\x01 \x01(\x08\"\xe8\x04\n\x0eNewTaskRequest\x12\x17\n\x0f\x65xpiration_secs\x18\x01 \x01(\x05\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x16\n\x0eparent_task_id\x18\x03 \x01(\t\x12\x10\n\x08priority\x18\x04 \x01(\x05\x12/\n\nproperties\x18\x05 \x01(\x0b\x32\x1b.swarming.v2.TaskProperties\x12+\n\x0btask_slices\x18\x06 \x03(\x0b\x32\x16.swarming.v2.TaskSlice\x12\x0c\n\x04tags\x18\x07 \x03(\t\x12\x0c\n\x04user\x18\x08 \x01(\t\x12\x17\n\x0fservice_account\x18\t \x01(\t\x12\x14\n\x0cpubsub_topic\x18\n \x01(\t\x12\x19\n\x11pubsub_auth_token\x18\x0b \x01(\t\x12\x17\n\x0fpubsub_userdata\x18\x0c \x01(\t\x12\x15\n\revaluate_only\x18\r \x01(\x08\x12M\n\x12pool_task_template\x18\x0e \x01(\x0e\x32\x31.swarming.v2.NewTaskRequest.PoolTaskTemplateField\x12\x1f\n\x17\x62ot_ping_tolerance_secs\x18\x0f \x01(\x05\x12\x14\n\x0crequest_uuid\x18\x10 \x01(\t\x12*\n\x08resultdb\x18\x11 \x01(\x0b\x32\x18.swarming.v2.ResultDBCfg\x12\r\n\x05realm\x18\x12 \x01(\t\"P\n\x15PoolTaskTemplateField\x12\x08\n\x04\x41UTO\x10\x00\x12\x11\n\rCANARY_PREFER\x10\x01\x12\x10\n\x0c\x43\x41NARY_NEVER\x10\x02\x12\x08\n\x04SKIP\x10\x03\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n+clusterfuzz/_internal/protos/swarming.proto\x12\x0bswarming.v2\x1a\x1fgoogle/protobuf/timestamp.proto\"(\n\nStringPair\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t\",\n\x0eStringListPair\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x03(\t\"*\n\x06\x44igest\x12\x0c\n\x04hash\x18\x01 \x01(\t\x12\x12\n\nsize_bytes\x18\x02 \x01(\x03\"I\n\x0c\x43\x41SReference\x12\x14\n\x0c\x63\x61s_instance\x18\x01 \x01(\t\x12#\n\x06\x64igest\x18\x02 \x01(\x0b\x32\x13.swarming.v2.Digest\"B\n\x0b\x43ipdPackage\x12\x14\n\x0cpackage_name\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x0c\n\x04path\x18\x03 \x01(\t\"y\n\tCipdInput\x12\x0e\n\x06server\x18\x01 \x01(\t\x12\x30\n\x0e\x63lient_package\x18\x02 \x01(\x0b\x32\x18.swarming.v2.CipdPackage\x12*\n\x08packages\x18\x03 \x03(\x0b\x32\x18.swarming.v2.CipdPackage\"(\n\nCacheEntry\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0c\n\x04path\x18\x02 \x01(\t\"\xf2\x01\n\x0b\x43ontainment\x12\x16\n\x0elower_priority\x18\x01 \x01(\x08\x12\x42\n\x10\x63ontainment_type\x18\x02 \x01(\x0e\x32(.swarming.v2.Containment.ContainmentType\x12\x17\n\x0flimit_processes\x18\x03 \x01(\x03\x12$\n\x1climit_total_committed_memory\x18\x04 \x01(\x03\"H\n\x0f\x43ontainmentType\x12\x11\n\rNOT_SPECIFIED\x10\x00\x12\x08\n\x04NONE\x10\x01\x12\x08\n\x04\x41UTO\x10\x02\x12\x0e\n\nJOB_OBJECT\x10\x03\"\x83\x04\n\x0eTaskProperties\x12\'\n\x06\x63\x61\x63hes\x18\x01 \x03(\x0b\x32\x17.swarming.v2.CacheEntry\x12*\n\ncipd_input\x18\x02 \x01(\x0b\x32\x16.swarming.v2.CipdInput\x12\x0f\n\x07\x63ommand\x18\x03 \x03(\t\x12\x14\n\x0crelative_cwd\x18\x04 \x01(\t\x12+\n\ndimensions\x18\x05 \x03(\x0b\x32\x17.swarming.v2.StringPair\x12$\n\x03\x65nv\x18\x06 \x03(\x0b\x32\x17.swarming.v2.StringPair\x12\x31\n\x0c\x65nv_prefixes\x18\x07 \x03(\x0b\x32\x1b.swarming.v2.StringListPair\x12\x1e\n\x16\x65xecution_timeout_secs\x18\x08 \x01(\x05\x12\x19\n\x11grace_period_secs\x18\t \x01(\x05\x12\x12\n\nidempotent\x18\n \x01(\x08\x12\x31\n\x0e\x63\x61s_input_root\x18\x0b \x01(\x0b\x32\x19.swarming.v2.CASReference\x12\x17\n\x0fio_timeout_secs\x18\x0c \x01(\x05\x12\x0f\n\x07outputs\x18\r \x03(\t\x12\x14\n\x0csecret_bytes\x18\x0e \x01(\x0c\x12-\n\x0b\x63ontainment\x18\x0f \x01(\x0b\x32\x18.swarming.v2.Containment\"p\n\tTaskSlice\x12/\n\nproperties\x18\x01 \x01(\x0b\x32\x1b.swarming.v2.TaskProperties\x12\x17\n\x0f\x65xpiration_secs\x18\x02 \x01(\x05\x12\x19\n\x11wait_for_capacity\x18\x03 \x01(\x08\"\x1d\n\x0bResultDBCfg\x12\x0e\n\x06\x65nable\x18\x01 \x01(\x08\"\xe8\x04\n\x0eNewTaskRequest\x12\x17\n\x0f\x65xpiration_secs\x18\x01 \x01(\x05\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x16\n\x0eparent_task_id\x18\x03 \x01(\t\x12\x10\n\x08priority\x18\x04 \x01(\x05\x12/\n\nproperties\x18\x05 \x01(\x0b\x32\x1b.swarming.v2.TaskProperties\x12+\n\x0btask_slices\x18\x06 \x03(\x0b\x32\x16.swarming.v2.TaskSlice\x12\x0c\n\x04tags\x18\x07 \x03(\t\x12\x0c\n\x04user\x18\x08 \x01(\t\x12\x17\n\x0fservice_account\x18\t \x01(\t\x12\x14\n\x0cpubsub_topic\x18\n \x01(\t\x12\x19\n\x11pubsub_auth_token\x18\x0b \x01(\t\x12\x17\n\x0fpubsub_userdata\x18\x0c \x01(\t\x12\x15\n\revaluate_only\x18\r \x01(\x08\x12M\n\x12pool_task_template\x18\x0e \x01(\x0e\x32\x31.swarming.v2.NewTaskRequest.PoolTaskTemplateField\x12\x1f\n\x17\x62ot_ping_tolerance_secs\x18\x0f \x01(\x05\x12\x14\n\x0crequest_uuid\x18\x10 \x01(\t\x12*\n\x08resultdb\x18\x11 \x01(\x0b\x32\x18.swarming.v2.ResultDBCfg\x12\r\n\x05realm\x18\x12 \x01(\t\"P\n\x15PoolTaskTemplateField\x12\x08\n\x04\x41UTO\x10\x00\x12\x11\n\rCANARY_PREFER\x10\x01\x12\x10\n\x0c\x43\x41NARY_NEVER\x10\x02\x12\x08\n\x04SKIP\x10\x03\"\x9d\x01\n\x11TasksCountRequest\x12)\n\x05start\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\'\n\x03\x65nd\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12&\n\x05state\x18\x03 \x01(\x0e\x32\x17.swarming.v2.StateQuery\x12\x0c\n\x04tags\x18\x04 \x03(\t\"D\n\nTasksCount\x12\r\n\x05\x63ount\x18\x01 \x01(\x05\x12\'\n\x03now\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp*\xcf\x02\n\nStateQuery\x12\x11\n\rQUERY_PENDING\x10\x00\x12\x11\n\rQUERY_RUNNING\x10\x01\x12\x19\n\x15QUERY_PENDING_RUNNING\x10\x02\x12\x13\n\x0fQUERY_COMPLETED\x10\x03\x12\x1b\n\x17QUERY_COMPLETED_SUCCESS\x10\x04\x12\x1b\n\x17QUERY_COMPLETED_FAILURE\x10\x05\x12\x11\n\rQUERY_EXPIRED\x10\x06\x12\x13\n\x0fQUERY_TIMED_OUT\x10\x07\x12\x12\n\x0eQUERY_BOT_DIED\x10\x08\x12\x12\n\x0eQUERY_CANCELED\x10\t\x12\r\n\tQUERY_ALL\x10\n\x12\x11\n\rQUERY_DEDUPED\x10\x0b\x12\x10\n\x0cQUERY_KILLED\x10\x0c\x12\x15\n\x11QUERY_NO_RESOURCE\x10\r\x12\x16\n\x12QUERY_CLIENT_ERROR\x10\x0e\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'clusterfuzz._internal.protos.swarming_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None - _globals['_STRINGPAIR']._serialized_start=60 - _globals['_STRINGPAIR']._serialized_end=100 - _globals['_STRINGLISTPAIR']._serialized_start=102 - _globals['_STRINGLISTPAIR']._serialized_end=146 - _globals['_DIGEST']._serialized_start=148 - _globals['_DIGEST']._serialized_end=190 - _globals['_CASREFERENCE']._serialized_start=192 - _globals['_CASREFERENCE']._serialized_end=265 - _globals['_CIPDPACKAGE']._serialized_start=267 - _globals['_CIPDPACKAGE']._serialized_end=333 - _globals['_CIPDINPUT']._serialized_start=335 - _globals['_CIPDINPUT']._serialized_end=456 - _globals['_CACHEENTRY']._serialized_start=458 - _globals['_CACHEENTRY']._serialized_end=498 - _globals['_CONTAINMENT']._serialized_start=501 - _globals['_CONTAINMENT']._serialized_end=743 - _globals['_CONTAINMENT_CONTAINMENTTYPE']._serialized_start=671 - _globals['_CONTAINMENT_CONTAINMENTTYPE']._serialized_end=743 - _globals['_TASKPROPERTIES']._serialized_start=746 - _globals['_TASKPROPERTIES']._serialized_end=1261 - _globals['_TASKSLICE']._serialized_start=1263 - _globals['_TASKSLICE']._serialized_end=1375 - _globals['_RESULTDBCFG']._serialized_start=1377 - _globals['_RESULTDBCFG']._serialized_end=1406 - _globals['_NEWTASKREQUEST']._serialized_start=1409 - _globals['_NEWTASKREQUEST']._serialized_end=2025 - _globals['_NEWTASKREQUEST_POOLTASKTEMPLATEFIELD']._serialized_start=1945 - _globals['_NEWTASKREQUEST_POOLTASKTEMPLATEFIELD']._serialized_end=2025 + _globals['_STATEQUERY']._serialized_start=2291 + _globals['_STATEQUERY']._serialized_end=2626 + _globals['_STRINGPAIR']._serialized_start=93 + _globals['_STRINGPAIR']._serialized_end=133 + _globals['_STRINGLISTPAIR']._serialized_start=135 + _globals['_STRINGLISTPAIR']._serialized_end=179 + _globals['_DIGEST']._serialized_start=181 + _globals['_DIGEST']._serialized_end=223 + _globals['_CASREFERENCE']._serialized_start=225 + _globals['_CASREFERENCE']._serialized_end=298 + _globals['_CIPDPACKAGE']._serialized_start=300 + _globals['_CIPDPACKAGE']._serialized_end=366 + _globals['_CIPDINPUT']._serialized_start=368 + _globals['_CIPDINPUT']._serialized_end=489 + _globals['_CACHEENTRY']._serialized_start=491 + _globals['_CACHEENTRY']._serialized_end=531 + _globals['_CONTAINMENT']._serialized_start=534 + _globals['_CONTAINMENT']._serialized_end=776 + _globals['_CONTAINMENT_CONTAINMENTTYPE']._serialized_start=704 + _globals['_CONTAINMENT_CONTAINMENTTYPE']._serialized_end=776 + _globals['_TASKPROPERTIES']._serialized_start=779 + _globals['_TASKPROPERTIES']._serialized_end=1294 + _globals['_TASKSLICE']._serialized_start=1296 + _globals['_TASKSLICE']._serialized_end=1408 + _globals['_RESULTDBCFG']._serialized_start=1410 + _globals['_RESULTDBCFG']._serialized_end=1439 + _globals['_NEWTASKREQUEST']._serialized_start=1442 + _globals['_NEWTASKREQUEST']._serialized_end=2058 + _globals['_NEWTASKREQUEST_POOLTASKTEMPLATEFIELD']._serialized_start=1978 + _globals['_NEWTASKREQUEST_POOLTASKTEMPLATEFIELD']._serialized_end=2058 + _globals['_TASKSCOUNTREQUEST']._serialized_start=2061 + _globals['_TASKSCOUNTREQUEST']._serialized_end=2218 + _globals['_TASKSCOUNT']._serialized_start=2220 + _globals['_TASKSCOUNT']._serialized_end=2288 # @@protoc_insertion_point(module_scope) diff --git a/src/clusterfuzz/_internal/protos/swarming_pb2.pyi b/src/clusterfuzz/_internal/protos/swarming_pb2.pyi index fbdb1cd7d6..d610aa2682 100644 --- a/src/clusterfuzz/_internal/protos/swarming_pb2.pyi +++ b/src/clusterfuzz/_internal/protos/swarming_pb2.pyi @@ -1,7 +1,7 @@ """ @generated by mypy-protobuf. Do not edit manually! isort:skip_file -Copyright 2024 Google LLC +Copyright 2026 Google LLC Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -15,7 +15,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -This file is based on https://source.chromium.org/chromium/infra/infra/+/main:luci/appengine/swarming/proto/api_v2/swarming.proto +This file is based on https://source.chromium.org/chromium/infra/infra_superproject/+/main:infra/luci/appengine/swarming/proto/api_v2/swarming.proto This includes necessary messages to construct a NewTaskRequest """ import builtins @@ -24,6 +24,7 @@ import google.protobuf.descriptor import google.protobuf.internal.containers import google.protobuf.internal.enum_type_wrapper import google.protobuf.message +import google.protobuf.timestamp_pb2 import sys import typing @@ -34,6 +35,128 @@ else: DESCRIPTOR: google.protobuf.descriptor.FileDescriptor +class _StateQuery: + ValueType = typing.NewType("ValueType", builtins.int) + V: typing_extensions.TypeAlias = ValueType + +class _StateQueryEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[_StateQuery.ValueType], builtins.type): + DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor + QUERY_PENDING: _StateQuery.ValueType # 0 + """Query for all tasks currently TaskState.PENDING.""" + QUERY_RUNNING: _StateQuery.ValueType # 1 + """Query for all tasks currently TaskState.RUNNING. This includes tasks + currently in the overhead phase; mapping input files or archiving outputs + back to the server. + """ + QUERY_PENDING_RUNNING: _StateQuery.ValueType # 2 + """Query for all tasks currently TaskState.PENDING or TaskState.RUNNING. This + is the query for the 'active' tasks. + """ + QUERY_COMPLETED: _StateQuery.ValueType # 3 + """Query for all tasks that completed normally as TaskState.COMPLETED, + independent of the process exit code. + """ + QUERY_COMPLETED_SUCCESS: _StateQuery.ValueType # 4 + """Query for all tasks that completed normally as TaskState.COMPLETED and that + had exit code 0. + """ + QUERY_COMPLETED_FAILURE: _StateQuery.ValueType # 5 + """Query for all tasks that completed normally as TaskState.COMPLETED and that + had exit code not 0. + """ + QUERY_EXPIRED: _StateQuery.ValueType # 6 + """Query for all tasks that are TaskState.EXPIRED.""" + QUERY_TIMED_OUT: _StateQuery.ValueType # 7 + """Query for all tasks that are TaskState.TIMED_OUT.""" + QUERY_BOT_DIED: _StateQuery.ValueType # 8 + """Query for all tasks that are TaskState.BOT_DIED.""" + QUERY_CANCELED: _StateQuery.ValueType # 9 + """Query for all tasks that are TaskState.CANCELED.""" + QUERY_ALL: _StateQuery.ValueType # 10 + """Query for all tasks, independent of the task state. + + In hindsight, this constant should have been the value 0. Sorry, the + original author was young and foolish. + """ + QUERY_DEDUPED: _StateQuery.ValueType # 11 + """Query for all tasks that are TaskState.COMPLETED but that actually didn't + run due to TaskProperties.idempotent being True *and* that a previous task + with the exact same TaskProperties had successfully run before, aka + COMPLETED_SUCCESS. + """ + QUERY_KILLED: _StateQuery.ValueType # 12 + """Query for all tasks that are TaskState.KILLED.""" + QUERY_NO_RESOURCE: _StateQuery.ValueType # 13 + """Query for all tasks that are TaskState.NO_RESOURCE.""" + QUERY_CLIENT_ERROR: _StateQuery.ValueType # 14 + """Query for all tasks that are TaskState.CLIENT_ERROR.""" + +class StateQuery(_StateQuery, metaclass=_StateQueryEnumTypeWrapper): + """Enums + + Use one of the values in this enum to query for tasks in one of the + specified state. + + Use 'ALL' to not use any filtering based on task state. + + As an example, this enum enables querying for all tasks with state COMPLETED + but non-zero exit code via COMPLETED_FAILURE. + + Do not confuse StateQuery and TaskState. StateQuery is to query tasks + via the API. TaskState is the current task state. + """ + +QUERY_PENDING: StateQuery.ValueType # 0 +"""Query for all tasks currently TaskState.PENDING.""" +QUERY_RUNNING: StateQuery.ValueType # 1 +"""Query for all tasks currently TaskState.RUNNING. This includes tasks +currently in the overhead phase; mapping input files or archiving outputs +back to the server. +""" +QUERY_PENDING_RUNNING: StateQuery.ValueType # 2 +"""Query for all tasks currently TaskState.PENDING or TaskState.RUNNING. This +is the query for the 'active' tasks. +""" +QUERY_COMPLETED: StateQuery.ValueType # 3 +"""Query for all tasks that completed normally as TaskState.COMPLETED, +independent of the process exit code. +""" +QUERY_COMPLETED_SUCCESS: StateQuery.ValueType # 4 +"""Query for all tasks that completed normally as TaskState.COMPLETED and that +had exit code 0. +""" +QUERY_COMPLETED_FAILURE: StateQuery.ValueType # 5 +"""Query for all tasks that completed normally as TaskState.COMPLETED and that +had exit code not 0. +""" +QUERY_EXPIRED: StateQuery.ValueType # 6 +"""Query for all tasks that are TaskState.EXPIRED.""" +QUERY_TIMED_OUT: StateQuery.ValueType # 7 +"""Query for all tasks that are TaskState.TIMED_OUT.""" +QUERY_BOT_DIED: StateQuery.ValueType # 8 +"""Query for all tasks that are TaskState.BOT_DIED.""" +QUERY_CANCELED: StateQuery.ValueType # 9 +"""Query for all tasks that are TaskState.CANCELED.""" +QUERY_ALL: StateQuery.ValueType # 10 +"""Query for all tasks, independent of the task state. + +In hindsight, this constant should have been the value 0. Sorry, the +original author was young and foolish. +""" +QUERY_DEDUPED: StateQuery.ValueType # 11 +"""Query for all tasks that are TaskState.COMPLETED but that actually didn't +run due to TaskProperties.idempotent being True *and* that a previous task +with the exact same TaskProperties had successfully run before, aka +COMPLETED_SUCCESS. +""" +QUERY_KILLED: StateQuery.ValueType # 12 +"""Query for all tasks that are TaskState.KILLED.""" +QUERY_NO_RESOURCE: StateQuery.ValueType # 13 +"""Query for all tasks that are TaskState.NO_RESOURCE.""" +QUERY_CLIENT_ERROR: StateQuery.ValueType # 14 +"""Query for all tasks that are TaskState.CLIENT_ERROR.""" +global___StateQuery = StateQuery + @typing_extensions.final class StringPair(google.protobuf.message.Message): """Messages @@ -673,3 +796,53 @@ class NewTaskRequest(google.protobuf.message.Message): def ClearField(self, field_name: typing_extensions.Literal["bot_ping_tolerance_secs", b"bot_ping_tolerance_secs", "evaluate_only", b"evaluate_only", "expiration_secs", b"expiration_secs", "name", b"name", "parent_task_id", b"parent_task_id", "pool_task_template", b"pool_task_template", "priority", b"priority", "properties", b"properties", "pubsub_auth_token", b"pubsub_auth_token", "pubsub_topic", b"pubsub_topic", "pubsub_userdata", b"pubsub_userdata", "realm", b"realm", "request_uuid", b"request_uuid", "resultdb", b"resultdb", "service_account", b"service_account", "tags", b"tags", "task_slices", b"task_slices", "user", b"user"]) -> None: ... global___NewTaskRequest = NewTaskRequest + +@typing_extensions.final +class TasksCountRequest(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + START_FIELD_NUMBER: builtins.int + END_FIELD_NUMBER: builtins.int + STATE_FIELD_NUMBER: builtins.int + TAGS_FIELD_NUMBER: builtins.int + @property + def start(self) -> google.protobuf.timestamp_pb2.Timestamp: ... + @property + def end(self) -> google.protobuf.timestamp_pb2.Timestamp: ... + state: global___StateQuery.ValueType + @property + def tags(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ... + def __init__( + self, + *, + start: google.protobuf.timestamp_pb2.Timestamp | None = ..., + end: google.protobuf.timestamp_pb2.Timestamp | None = ..., + state: global___StateQuery.ValueType = ..., + tags: collections.abc.Iterable[builtins.str] | None = ..., + ) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["end", b"end", "start", b"start"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["end", b"end", "start", b"start", "state", b"state", "tags", b"tags"]) -> None: ... + +global___TasksCountRequest = TasksCountRequest + +@typing_extensions.final +class TasksCount(google.protobuf.message.Message): + """Returns the count, as requested.""" + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + COUNT_FIELD_NUMBER: builtins.int + NOW_FIELD_NUMBER: builtins.int + count: builtins.int + @property + def now(self) -> google.protobuf.timestamp_pb2.Timestamp: ... + def __init__( + self, + *, + count: builtins.int = ..., + now: google.protobuf.timestamp_pb2.Timestamp | None = ..., + ) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["now", b"now"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["count", b"count", "now", b"now"]) -> None: ... + +global___TasksCount = TasksCount diff --git a/src/clusterfuzz/_internal/protos/swarming_pb2_grpc.py b/src/clusterfuzz/_internal/protos/swarming_pb2_grpc.py index e88584a711..7553de3ca8 100644 --- a/src/clusterfuzz/_internal/protos/swarming_pb2_grpc.py +++ b/src/clusterfuzz/_internal/protos/swarming_pb2_grpc.py @@ -1,4 +1,4 @@ -# Copyright 2026 Google LLC +# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/src/clusterfuzz/_internal/swarming/__init__.py b/src/clusterfuzz/_internal/swarming/__init__.py index 0369abcb62..165c3e50e0 100644 --- a/src/clusterfuzz/_internal/swarming/__init__.py +++ b/src/clusterfuzz/_internal/swarming/__init__.py @@ -17,9 +17,6 @@ import json import uuid -from google.auth.transport import requests -from google.protobuf import json_format - from clusterfuzz._internal.base import utils from clusterfuzz._internal.base.errors import BadConfigError from clusterfuzz._internal.base.feature_flags import FeatureFlags @@ -31,11 +28,6 @@ from clusterfuzz._internal.protos import swarming_pb2 from clusterfuzz._internal.system import environment -_SWARMING_SCOPES = [ - 'https://www.googleapis.com/auth/cloud-platform', - 'https://www.googleapis.com/auth/userinfo.email' -] - def is_swarming_task(job_name: str, job: data_types.Job | None = None) -> bool: """Returns True if the task is supposed to run on swarming.""" @@ -54,7 +46,7 @@ def is_swarming_task(job_name: str, job: data_types.Job | None = None) -> bool: logs.info('[Swarming DEBUG] No swarming env var', job_name=job_name) return False - swarming_config = _get_swarming_config() + swarming_config = get_swarming_config() if swarming_config is None: logs.warning( """[Swarming DEBUG] current task is not suitable for swarming. @@ -74,7 +66,7 @@ def _get_task_name(job_name: str): return f't-{str(uuid.uuid4()).lower()}-{job_name}' -def _get_swarming_config() -> local_config.SwarmingConfig | None: +def get_swarming_config() -> local_config.SwarmingConfig | None: """Returns the swarming config.""" try: return local_config.SwarmingConfig() @@ -87,7 +79,7 @@ def _get_task_dimensions(job: data_types.Job, platform_specific_dimensions: list ) -> list[swarming_pb2.StringPair]: # pylint: disable=no-member """ Gets all swarming dimensions for a task. Job dimensions have more precedence than static dimensions""" - swarming_config = _get_swarming_config() + swarming_config = get_swarming_config() if not swarming_config: logs.error( '[Swarming] No dimensions set. Reason: failed to retrieve config') @@ -95,7 +87,7 @@ def _get_task_dimensions(job: data_types.Job, platform_specific_dimensions: list unique_dimensions = {} unique_dimensions['os'] = str(job.platform).capitalize() - unique_dimensions['pool'] = _get_swarming_config().get('swarming_pool') + unique_dimensions['pool'] = get_swarming_config().get('swarming_pool') for dimension in platform_specific_dimensions: unique_dimensions[dimension['key'].lower()] = dimension['value'] @@ -202,7 +194,7 @@ def create_new_task_request(command: str, job_name: str, download_url: str if job is None: return None - swarming_config = _get_swarming_config() + swarming_config = get_swarming_config() if not swarming_config: return None @@ -255,36 +247,3 @@ def create_new_task_request(command: str, job_name: str, download_url: str ]) return new_task_request - - -def push_swarming_task(task_request: swarming_pb2.NewTaskRequest): # pylint: disable=no-member - """Schedules a task on swarming.""" - swarming_config = _get_swarming_config() - if not swarming_config: - logs.error( - '[Swarming] Failed to push task into swarming. Reason: No config.') - return - creds = credentials.get_scoped_service_account_credentials(_SWARMING_SCOPES) - if not creds: - logs.error( - '[Swarming] Failed to push task into swarming. Reason: No credentials.') - return - - if not creds.token: - creds.refresh(requests.Request()) - - headers = { - 'Accept': 'application/json', - 'Content-Type': 'application/json', - 'Authorization': f'Bearer {creds.token}' - } - swarming_server = _get_swarming_config().get('swarming_server') - url = f'https://{swarming_server}/prpc/swarming.v2.Tasks/NewTask' - message_body = json_format.MessageToJson(task_request) - logs.info( - f"""[Swarming] Pushing task {task_request.name} - as {creds.service_account_email}""", - url=url, - body=message_body) - response = utils.post_url(url=url, data=message_body, headers=headers) - logs.info(f'[Swarming] Response from {task_request.name}', response=response) diff --git a/src/clusterfuzz/_internal/swarming/api.py b/src/clusterfuzz/_internal/swarming/api.py new file mode 100644 index 0000000000..38752403b6 --- /dev/null +++ b/src/clusterfuzz/_internal/swarming/api.py @@ -0,0 +1,130 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Swarming pRPC API client.""" + +from google.auth.transport import requests +from google.protobuf import json_format + +from clusterfuzz._internal.base import utils +from clusterfuzz._internal.config.local_config import SwarmingConfig +from clusterfuzz._internal.google_cloud_utils import credentials +from clusterfuzz._internal.metrics import logs +from clusterfuzz._internal.protos import swarming_pb2 +from clusterfuzz._internal.swarming import get_swarming_config + +_SWARMING_SCOPES = [ + 'https://www.googleapis.com/auth/cloud-platform', + 'https://www.googleapis.com/auth/userinfo.email' +] + + +class SwarmingAPI: + """Client for Swarming pRPC API.""" + + _config: SwarmingConfig = None + _base_url: str = "" + + def __init__(self): + self._config = get_swarming_config() + if self._config: + self._base_url = f"https://{self._config.get('swarming_server')}/prpc/" + + def _get_headers(self) -> dict[str, str]: + """Checks config and returns headers for pRPC request. + + Returns: + A dict containing headers, or empty dict if config is missing or + auth fails. + """ + if not self._config: + logs.error('[Swarming] No config available.') + return {} + + creds = credentials.get_scoped_service_account_credentials(_SWARMING_SCOPES) + if not creds: + logs.error('[Swarming] Failed to get credentials.') + return {} + + if not creds.token: + creds.refresh(requests.Request()) + + return { + 'Accept': 'application/json', + 'Content-Type': 'application/json', + 'Authorization': f'Bearer {creds.token}' + } + + def _make_request(self, endpoint: str, body: str) -> str | None: + """Makes a pRPC request to the Swarming API. + + Args: + endpoint: The pRPC endpoint (e.g. "swarming.v2.Tasks/NewTask"). + body: The JSON body of the request. + + Returns: + The raw JSON response string from the server, or None if the request + could not be made (e.g. missing config, auth failure) or failed. + """ + headers = self._get_headers() + if not headers: + return None + + url = f'{self._base_url}{endpoint}' + response = utils.post_url(url=url, data=body, headers=headers) + if not response: + logs.error(f"[Swarming] Failed to make request to {url}") + return None + return response + + def push_task(self, task_request: swarming_pb2.NewTaskRequest) -> str | None: # pylint: disable=no-member + """Schedules a task on swarming. + + Args: + task_request: The NewTaskRequest proto message. + + Returns: + The raw JSON response string from the server, or None if the request + could not be made (e.g. missing config, auth failure) or failed. + """ + message_body = json_format.MessageToJson(task_request) + logs.info( + f"[Swarming] Pushing task {task_request.name}", + url=self._base_url, + body=message_body) + + response = self._make_request('swarming.v2.Tasks/NewTask', message_body) + logs.info( + f'[Swarming] Response from {task_request.name}', response=response) + return response + + def count_tasks(self, + count_request: swarming_pb2.TasksCountRequest) -> str | None: # pylint: disable=no-member + """Counts tasks on swarming. + + Args: + count_request: The TasksCountRequest proto message. + + Returns: + The raw JSON response string from the server, or None if the request + could not be made (e.g. missing config, auth failure) or failed. + """ + message_body = json_format.MessageToJson(count_request) + logs.info( + "[Swarming] Counting tasks in queue", + url=self._base_url, + body=message_body) + + response = self._make_request('swarming.v2.Tasks/CountTasks', message_body) + logs.info('[Swarming] Response from CountTasks', response=response) + return response diff --git a/src/clusterfuzz/_internal/swarming/service.py b/src/clusterfuzz/_internal/swarming/service.py index 30c1bad677..76c1a781c7 100644 --- a/src/clusterfuzz/_internal/swarming/service.py +++ b/src/clusterfuzz/_internal/swarming/service.py @@ -17,11 +17,20 @@ from clusterfuzz._internal.base.tasks import task_utils from clusterfuzz._internal.metrics import logs from clusterfuzz._internal.remote_task import remote_task_types +from clusterfuzz._internal.swarming.api import SwarmingAPI class SwarmingService(remote_task_types.RemoteTaskInterface): """Remote task service implementation for Swarming.""" + _api: SwarmingAPI = None + + def _get_api(self) -> SwarmingAPI: + """Returns the Swarming API instance.""" + if not self._api: + self._api = SwarmingAPI() + return self._api + def create_utask_main_job(self, module: str, job_type: str, input_download_url: str): """Creates a single swarming task for a uworker main task.""" @@ -51,7 +60,7 @@ def create_utask_main_jobs(self, continue if request := swarming.create_new_task_request( task.command, task.job_type, task.argument): - swarming.push_swarming_task(request) + self._get_api().push_task(request) except Exception: # pylint: disable=broad-except logs.error( f'Failed to push task to Swarming: {task.command}, {task.job_type}.' diff --git a/src/clusterfuzz/_internal/tests/core/swarming/api_test.py b/src/clusterfuzz/_internal/tests/core/swarming/api_test.py new file mode 100644 index 0000000000..7be899ae57 --- /dev/null +++ b/src/clusterfuzz/_internal/tests/core/swarming/api_test.py @@ -0,0 +1,98 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for api.py.""" +import unittest +from unittest import mock + +from google.protobuf import json_format + +from clusterfuzz._internal.protos import swarming_pb2 +from clusterfuzz._internal.swarming.api import SwarmingAPI +from clusterfuzz._internal.tests.test_libs import helpers + + +class SwarmingAPITest(unittest.TestCase): + """Tests for SwarmingAPI.""" + + def setUp(self): + helpers.patch(self, [ + 'clusterfuzz._internal.base.utils.post_url', + 'clusterfuzz._internal.google_cloud_utils.credentials.get_scoped_service_account_credentials', + 'google.auth.transport.requests.Request', + ]) + + self.mock_creds = mock.MagicMock() + self.mock_creds.token = 'fake_token' + self.mock.get_scoped_service_account_credentials.return_value = self.mock_creds + + self.api = SwarmingAPI() + + def test_push_task(self): + """Tests that push_task works as expected.""" + task_request = swarming_pb2.NewTaskRequest(name='test_task') + self.api.push_task(task_request) + + expected_headers = { + 'Accept': 'application/json', + 'Content-Type': 'application/json', + 'Authorization': 'Bearer fake_token' + } + expected_url = 'https://server-name/prpc/swarming.v2.Tasks/NewTask' + self.mock.post_url.assert_called_with( + url=expected_url, + data=json_format.MessageToJson(task_request), + headers=expected_headers) + + def test_count_tasks(self): + """Tests that count_tasks works as expected.""" + count_request = swarming_pb2.TasksCountRequest(tags=['tag1']) + + # Mock response from post_url + self.mock.post_url.return_value = '{"count": 42}' + + response = self.api.count_tasks(count_request) + + expected_headers = { + 'Accept': 'application/json', + 'Content-Type': 'application/json', + 'Authorization': 'Bearer fake_token' + } + expected_url = 'https://server-name/prpc/swarming.v2.Tasks/CountTasks' + self.mock.post_url.assert_called_with( + url=expected_url, + data=json_format.MessageToJson(count_request), + headers=expected_headers) + + self.assertEqual(response, '{"count": 42}') + + def test_push_task_no_config(self): + """Tests that push_task fails when config is missing.""" + with mock.patch('clusterfuzz._internal.config.local_config.SwarmingConfig' + ) as mock_config: + mock_config.side_effect = ValueError('Failed to load') + api = SwarmingAPI() + response = api.push_task(swarming_pb2.NewTaskRequest()) + self.assertIsNone(response) + + def test_push_task_no_credentials(self): + """Tests that push_task fails when credentials are missing.""" + self.mock.get_scoped_service_account_credentials.return_value = None + response = self.api.push_task(swarming_pb2.NewTaskRequest()) + self.assertIsNone(response) + + def test_count_tasks_no_credentials(self): + """Tests that count_tasks fails when credentials are missing.""" + self.mock.get_scoped_service_account_credentials.return_value = None + response = self.api.count_tasks(swarming_pb2.TasksCountRequest()) + self.assertIsNone(response) diff --git a/src/clusterfuzz/_internal/tests/core/swarming/service_test.py b/src/clusterfuzz/_internal/tests/core/swarming/service_test.py index 234150cf5f..7db024d509 100644 --- a/src/clusterfuzz/_internal/tests/core/swarming/service_test.py +++ b/src/clusterfuzz/_internal/tests/core/swarming/service_test.py @@ -27,7 +27,7 @@ class SwarmingServiceTest(unittest.TestCase): def setUp(self): helpers.patch(self, [ 'clusterfuzz._internal.swarming.is_swarming_task', - 'clusterfuzz._internal.swarming.push_swarming_task', + 'clusterfuzz._internal.swarming.service.SwarmingService._get_api', 'clusterfuzz._internal.swarming.create_new_task_request', 'clusterfuzz._internal.base.tasks.task_utils.get_command_from_module', 'clusterfuzz._internal.metrics.logs.error', @@ -36,6 +36,8 @@ def setUp(self): self.service = service.SwarmingService() self.mock.create_new_task_request.return_value = 'fake_request' self.mock.get.return_value = None + self.mock_api = mock.MagicMock() + self.mock._get_api.return_value = self.mock_api # pylint: disable=protected-access def test_create_utask_main_job_success(self): """Test creating a single task successfully.""" @@ -48,7 +50,7 @@ def test_create_utask_main_job_success(self): # Success returns None in this interface (consistent with GcpBatchService) self.assertIsNone(result) - self.mock.push_swarming_task.assert_called_once_with('fake_request') + self.mock_api.push_task.assert_called_once_with('fake_request') def test_create_utask_main_job_failure(self): """Test creating a single task that is not a swarming task.""" @@ -61,7 +63,7 @@ def test_create_utask_main_job_failure(self): # Failure returns the task itself self.assertIsInstance(result, remote_task_types.RemoteTask) self.assertEqual(result.command, 'fuzz') - self.mock.push_swarming_task.assert_not_called() + self.mock_api.push_task.assert_not_called() def test_create_utask_main_jobs_mixed_results(self): """Test creating multiple tasks with mixed success/failure.""" @@ -79,8 +81,8 @@ def test_create_utask_main_jobs_mixed_results(self): self.assertEqual(len(unscheduled), 1) self.assertEqual(unscheduled[0].job_type, 'job2') - self.assertEqual(self.mock.push_swarming_task.call_count, 2) - self.mock.push_swarming_task.assert_has_calls([ + self.assertEqual(self.mock_api.push_task.call_count, 2) + self.mock_api.push_task.assert_has_calls([ mock.call('fake_request'), mock.call('fake_request'), ]) @@ -96,7 +98,7 @@ def test_create_utask_main_jobs_all_success(self): unscheduled = self.service.create_utask_main_jobs(tasks) self.assertEqual(unscheduled, []) - self.assertEqual(self.mock.push_swarming_task.call_count, 2) + self.assertEqual(self.mock_api.push_task.call_count, 2) def test_create_utask_main_jobs_all_fail(self): """Test creating multiple tasks where all fail.""" @@ -109,13 +111,13 @@ def test_create_utask_main_jobs_all_fail(self): unscheduled = self.service.create_utask_main_jobs(tasks) self.assertEqual(unscheduled, tasks) - self.mock.push_swarming_task.assert_not_called() + self.mock_api.push_task.assert_not_called() def test_create_utask_main_jobs_empty(self): """Test creating tasks with an empty list.""" unscheduled = self.service.create_utask_main_jobs([]) self.assertEqual(unscheduled, []) - self.mock.push_swarming_task.assert_not_called() + self.mock_api.push_task.assert_not_called() def test_create_utask_main_jobs_exception(self): """Test creating tasks when push_swarming_task raises an exception.""" @@ -124,7 +126,7 @@ def test_create_utask_main_jobs_exception(self): ] self.mock.is_swarming_task.return_value = True - self.mock.push_swarming_task.side_effect = Exception('error') + self.mock_api.push_task.side_effect = Exception('error') unscheduled = self.service.create_utask_main_jobs(tasks) diff --git a/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py b/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py index c48d154058..fb8b99f67c 100644 --- a/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py +++ b/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py @@ -16,8 +16,6 @@ import unittest from unittest import mock -from google.protobuf import json_format - from clusterfuzz._internal import swarming from clusterfuzz._internal.datastore import data_types from clusterfuzz._internal.protos import swarming_pb2 @@ -252,109 +250,6 @@ def test_get_spec_from_config_for_fuzz_task(self): ]) self.assertEqual(spec, expected_spec) - def test_push_swarming_task(self): - """Tests that push_swarming_task works as expected.""" - mock_creds = mock.MagicMock() - mock_creds.token = 'fake_token' - self.mock.get_scoped_service_account_credentials.return_value = mock_creds - - job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') - job.put() - task_request = swarming.create_new_task_request('fuzz', job.name, - 'https://download_url') - swarming.push_swarming_task(task_request) - - expected_new_task_request = swarming_pb2.NewTaskRequest( - name='task_name', - priority=1, - realm='realm-name', - service_account='test-clusterfuzz-service-account-email', - task_slices=[ - swarming_pb2.TaskSlice( - expiration_secs=86400, - properties=swarming_pb2.TaskProperties( - command=[ - 'luci-auth', 'context', '--', './linux_entry_point.sh' - ], - dimensions=[ - swarming_pb2.StringPair( - key='os', value=str(job.platform).capitalize()), - swarming_pb2.StringPair(key='pool', value='pool-name') - ], - cipd_input=swarming_pb2.CipdInput(), # pylint: disable=no-member - cas_input_root=swarming_pb2.CASReference( - cas_instance= - 'projects/server-name/instances/instance_name', - digest=swarming_pb2.Digest( - hash='linux_entry_point_archive_hash', - size_bytes=1234)), - execution_timeout_secs=12345, - env=[ - swarming_pb2.StringPair( - key='DOCKER_IMAGE', - value= - 'gcr.io/clusterfuzz-images/base:a2f4dd6-202202070654' - ), - swarming_pb2.StringPair(key='UWORKER', value='True'), - swarming_pb2.StringPair( - key='SWARMING_BOT', value='True'), - swarming_pb2.StringPair(key='LOG_TO_GCP', value='True'), - swarming_pb2.StringPair(key='IS_K8S_ENV', value='True'), - swarming_pb2.StringPair( - key='DISABLE_MOUNTS', value='True'), - swarming_pb2.StringPair( - key='LOGGING_CLOUD_PROJECT_ID', value='project_id'), - swarming_pb2.StringPair( - key='DOCKER_ENV_VARS', - value= - ('{"DOCKER_IMAGE": "gcr.io/clusterfuzz-images/' - 'base:a2f4dd6-202202070654", "UWORKER": "True", ' - '"SWARMING_BOT": "True", "LOG_TO_GCP": "True", ' - '"IS_K8S_ENV": "True", "DISABLE_MOUNTS": "True", ' - '"LOGGING_CLOUD_PROJECT_ID": "project_id"}')), - ], - secret_bytes='https://download_url'.encode('utf-8'))) - ]) - - self.mock.get_scoped_service_account_credentials.assert_called_with( - swarming._SWARMING_SCOPES) # pylint: disable=protected-access - expected_headers = { - 'Accept': 'application/json', - 'Content-Type': 'application/json', - 'Authorization': 'Bearer fake_token' - } - expected_url = 'https://server-name/prpc/swarming.v2.Tasks/NewTask' - self.mock.post_url.assert_called_with( - url=expected_url, - data=json_format.MessageToJson(expected_new_task_request), - headers=expected_headers) - - def test_push_swarming_task_with_refresh(self): - """Tests that push_swarming_task refreshes credentials if token is missing.""" - mock_creds = mock.MagicMock() - mock_creds.token = None - self.mock.get_scoped_service_account_credentials.return_value = mock_creds - - def refresh_side_effect(_): - mock_creds.token = 'refreshed_token' - - mock_creds.refresh.side_effect = refresh_side_effect - - job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') - job.put() - request = swarming.create_new_task_request('fuzz', job.name, - 'https://download_url') - swarming.push_swarming_task(request) - - mock_creds.refresh.assert_called_with(self.mock.Request.return_value) - expected_headers = { - 'Accept': 'application/json', - 'Content-Type': 'application/json', - 'Authorization': 'Bearer refreshed_token' - } - self.assertEqual(self.mock.post_url.call_args[1]['headers'], - expected_headers) - def test_is_swarming_task(self): """Tests that is_swarming_task works as expected.""" job = data_types.Job(