In order to facilitate the quick release of API handler processes (so the server can reply to new Thrift API requests), CodeChecker's server package implements support for the creation of background tasks.
A generic execution library deals with the driver aspects of background tasks, including the database handling (for cross-service synchronisation of task statuses) and memory management.
Each task is associated with a token, which is a random generated identifier, corresponding to the PRIMARY KEY in the database.
This token is used to query information about a task, and to execute administrative actions against the task.
Ancillary data, stored in the server's storage space on disk, is also keyed by the token.
The most important property of a Task is its status, which can be:
ALLOCATED: The task has its identifying token, and is under preparation.ENQUEUED: The task had been prepared and waiting for an executor to start the work.RUNNING: The task is currently being executed.COMPLETED: The task's execution finished successfully.FAILED: The task's execution "structurally" failed due to an "inside" property of the execution. An uncaughtExceptionwould have escaped the executor's "main" method.CANCELLED: The task's owner or an administrator (PRODUCT_ADMINorSUPERUSER, see the Permission system) cancelled the execution of the task, and the task gracefully terminated itself.DROPPED: External influence resulted in the executing server's shutdown, and the task did not complete in a graceful way.
The workflow of a task's lifecycle is as follows:
Tasks are generally spawned by API handlers, executed in the control flow of a Thrift RPC function.
- An API request arrives (later, this might be extended with a
cron-like scheduler) which exercises an endpoint that results in the need for a task. - (Optionally) some conformance checks are executed on the input, in order to not even create the task if the input is ill-formed.
- A task
tokenisALLOCATED: theBackgroundTaskrecord is written into the database, and now we have a unique identifier for the task. - The task is pushed to a shared, synchronised task queue of the CodeChecker server, resulting in the
ENQUEUEDstatus.AbstractTasksubclasses MUST bepickle-able and reasonably small.- The library offers means to store additional large data on the file system, in a temporary directory specific to the task.
- The
task tokenis returned to the user via the RPC API call, and the API worker is free too respond to other requests. - The API hander exits and the Thrift RPC connection is terminated.
- In a loop with some frequency, the user exercises the
getTaskInfo()API (executed in the context of any API worker process, synchronised over the database) to query whether the task was completed, if the user wishes to receive this information.
The API request dispatching of the CodeChecker server has a TaskManager instance which should be passed to the API handler implementation, if not already available.
Then, you can use this TaskManager object to perform the necessary actions to enqueue the execution of a task:
from pathlib import Path
from ..profiler import timeit
from ..task_executors.task_manager import TaskManager
from .common import exc_to_thrift_reqfail
class MyThriftEndpointHandler:
def __init__(self, task_manager: TaskManager):
self._task_manager = task_manager
@exc_to_thrift_reqfail
@timeit
def apiRequestThatResultsInATask(self, arg1, arg2, large_arg: str, ...) -> str: # Return the task token!
# Conformance checks and assertions on the input's validity.
if invalid_input(arg1, arg2):
raise ValueError("Bad request!")
# Allocate the task token.
tok: str = self._task_manager.allocate_task_record(
# The task's "Kind": a simple string identifier which should NOT
# depend on user input!
# Used in filters and to quickly identify the "type" for a task
# record.
"MyThriftEndpointHandler::apiRequestThatResultsInATask()",
# The task's "Summary": an arbitrary string that is used visually
# to describe the executing task. This can be anything, even
# spliced together from user input.
# This is not used in the filters.
"This is a task that was spawned from the API!",
# The task's "User": the name of the user who is the actor which
# caused the execution of the task.
# The status of the task may only be queried by the relevant actor,
# people with access to the product, or SUPERUSERs.
"user",
# If the task is associated with a product, pass the ORM `Product`
# object here. Otherwise, pass `None`.
current_product_obj or None)
# Large inputs to the task **MUST** be passed through the file system
# in order not to crash the server.
# **If** the task needs large inputs, they must go into a temporary
# directory appropriately created by the task manager.
data_dir: Path = self._task_manager.create_task_data(tok)
# Create the files under `data_dir` ...
with open(data_dir / "large.txt", 'w') as f:
f.write(large_arg)
# Instantiate the `MyTask` class (see later) which contains the
# actual business logic of the task.
#
# Small input fields that are of trivial serialisable Python types
# (scalars, string, etc., but not file descriptors or network
# connections) can be passed directly to the task object's constructor.
task = MyTask(token, data_dir, arg1, arg2)
# Enqueue the task, at which point it may start immediately executing,
# depending on server load.
self._task_manager.push_task(task)
return tokThe business logic of tasks are implemented by subclassing the AbstractTask class and providing an appropriate constructor and overriding the _implementation() method.
- Once a
Taskinstance is pushed into the server's task queue byTaskManager::push_task(), one of the background workers of the server will awaken and pop the task from the queue. The size of the queue is limited, hence why only small arguments may be present in the state of theTaskobject! - This popped object is reconstructed by the standard Python library
pickle, hence why only trivial scalar-like objects may be present in the state of theTaskobject! - The executor starts running the custom
_implementation()method, after setting the task's status toRUNNING. - The implementation does its thing, periodically calling
task_manager.heartbeat()to update the progress timestamp of the task, and, if appropriate, checking withtask_manager.should_cancel()whether the admins requested the task to cancel or the server is shutting down. - If
should_cancel()returnedTrue, the task does some appropriate clean-up, and exits by raising the specialTaskCancelHonouredexception, indicating that it responded to the request. (At this point, the status becomes eitherCANCELLEDorDROPPED, depending on the circumstances of the service.) - Otherwise, or if the task is for some reason not cancellable without causing damage, the task executes its logic.
- If the task's
_implementation()method exits cleanly, it reaches theCOMPLETEDstatus; otherwise, if any exception escapes from the_implementation()method, the task becomesFAILED, and exception information is logged into theBackgroundTask.commentscolumn of the database.
Caution! Tasks, executing in a separate background process part of the many processes spawned by a CodeChecker server, no longer have the ability to synchronously communicate with the user! This also includes the lack of ability to "return" a value: tasks only exercise side-effects, but do not calculate a "result".
from ..task_executors.abstract_task import AbstractTask
from ..task_executors.task_manager import TaskManager
class MyTask(AbstractTask):
def __init__(self, token: str, data_dir: Path, arg1, arg2): # Note: No large_arg!
# If the task does not use a temporary data directory, `data_dir` can
# be omitted, and `None` may be passed instead!
super().__init__(token, data_dir)
self.arg1 = arg1
self.arg2 = arg2
def _implementation(self, tm: TaskManager) -> None: # Tasks do not have a result value!
# First, obtain the rest of the input (e.g., `large_arg`),
# if any is needed.
with open(self.data_path / "large.txt", 'r') as f:
large_arg: str = f.read()
# Exceptions raised above, e.g., the lack of the file, automatically
# turn the task into the FAILED state.
# Let's assume the task does something in an iteration...
for i in range(0, int(self.arg1) + int(self.arg2)):
tm.heartbeat() # Indicate some progress ...
element = large_arg.split('\n')[i]
if tm.should_cancel(self):
# A shutdown was requested of the running task.
# Perform some cleanup logic ...
# Maybe have some customised log?
tm.add_comment(self,
# The body of the comment.
"Oh no, we are shutting down ...!\n"
f"But only processed {i + 1} entries!",
# The actor entity associated with the comment.
"SYSTEM?")
raise TaskCancelHonoured(self)
# Actually process the step ...
foo(element)At any point following ALLOCATED status, but most likely in the ENQUEUED and RUNNING statuses, a SUPERUSER may issue a cancelTask() order.
This will set BackgroundTask.cancel_flag, and the task is expected (although not required!) to poll its own should_cancel() status internally in checkpoints, and terminate gracefully to this request. This is done by _implementation() exiting by raising a TaskCancelHonoured exception.
(If the task does not raise one, it will be allowed to conclude normally, or fail in some other manner.
Tasks cancelled gracefully will have the CANCELLED status.
For example, a background task that performs an action over a set of input files generally should be implemented like this:
def _implementation(tm: TaskManager):
for file in INPUTS:
if tm.should_cancel(self):
ROLLBACK()
raise TaskCancelHonoured(self)
DO_LOGIC(file)Alternatively, at any point in this life cycle, the server might receive the command to terminate itself (kill signals SIGINT, SIGTERM; alternatively caused by CodeChecker server --stop). Following the termination of API workers, the background workers will also shut down one by one.
At this point, the default behaviour is to cause a special cancel event which tasks currently RUNNING may still gracefully honour, as-if it was a SUPERUSER's single-task cancel request. All other tasks that have not started executing yet and are in the ALLOCATED or ENQUEUED status will never start.
All tasks not in a normal termination state will be set to the DROPPED status, with the comments field containing a log about the specifics of in which state the task was dropped, and why. (Together, CANCELLED and DROPPED are the "abnormal termination states", indicating that the task terminated due to some external influence.)
In a client, call the task-generating API endpoint normally.
It should return a str, the token identifier of the task.
This token can be awaited (polled) programmatically using a library function:
from codechecker_client import client as libclient
from codechecker_client.task_client import await_task_termination
def main(...) -> int:
client = setup_client(server_url or product_url)
tok: str = client.apiRequestThatResultsInATask(16, 32, large_arg_str)
prot, host, port = split_server_url(server_url)
task_client = libclient.setup_task_client(prot, host, port)
status: str = await_task_termination(LOG, tok,
task_api_client=task_client)
if status == "COMPLETED":
return 0
LOG.error("The execution of the task failed!\n%s",
task_client.getTaskInfo(tok).comments)
return 1In simpler wrapper scripts, alternatively,
CodeChecker cmd serverside-tasks --token TOK --await may be used to block
execution until a task terminates (one way or another).