diff --git a/frontend/server/src/main/java/com/amazonaws/ml/mms/wlm/WorkLoadManager.java b/frontend/server/src/main/java/com/amazonaws/ml/mms/wlm/WorkLoadManager.java index 4bcfafe38..ba445bc9e 100644 --- a/frontend/server/src/main/java/com/amazonaws/ml/mms/wlm/WorkLoadManager.java +++ b/frontend/server/src/main/java/com/amazonaws/ml/mms/wlm/WorkLoadManager.java @@ -147,7 +147,7 @@ private CompletableFuture shutdownServerThread( Process workerProcess = lifecycle.getProcess(); if (workerProcess.isAlive()) { boolean workerDestroyed = false; - workerProcess.destroyForcibly(); + workerProcess.destroy(); // sends SIGTERM to workerProcess try { workerDestroyed = workerProcess.waitFor( diff --git a/mms/model_service_worker.py b/mms/model_service_worker.py index 164d463b1..0017fa41d 100644 --- a/mms/model_service_worker.py +++ b/mms/model_service_worker.py @@ -186,6 +186,25 @@ def start_worker(self, cl_socket): cl_socket.close() sys.exit(0) + def server_worker_sigterm_handler(self, signum, frame): + # Frontend Process.destroy() sends sigterm + logging.info("PID=%s, received signal %s server worker", os.getpid(), signum) + for p in multiprocessing.active_children(): + logging.info("PID=%s, Killing child %s", os.getpid(), p.pid) + p.terminate() + + if self.sock_type == 'unix' and os.path.exists(self.sock_name): + os.remove(self.sock_name) + + logging.info("PID=%s, Sending self kill signal", os.getpid()) + os.kill(os.getpid(), 9) + + def sigchld_handler(self, signum, frame): + # This is to handle zombie processes. + # Calling `active_children` has the side effect of `joining` any processes which have already finished + val = len(multiprocessing.active_children()) + + def run_server(self): """ Run the backend worker process and listen on a socket @@ -200,6 +219,8 @@ def run_server(self): logging.info("[PID] %d", os.getpid()) logging.info("MMS worker started.") logging.info("Python runtime: %s", platform.python_version()) + signal.signal(signal.SIGCHLD, self.sigchld_handler) + signal.signal(signal.SIGTERM, self.server_worker_sigterm_handler) while True: if self.service is None and self.preload is True: # Lazy loading the models diff --git a/tests/performance/agents/metrics/__init__.py b/tests/performance/agents/metrics/__init__.py index 9d7bf7eb2..756051a9c 100644 --- a/tests/performance/agents/metrics/__init__.py +++ b/tests/performance/agents/metrics/__init__.py @@ -92,7 +92,7 @@ def get_metrics(server_process, child_processes, logger): """ result = {} children.update(child_processes) - logger.debug("children : {0}".format(",".join([str(c.pid) for c in children]))) + logger.info("children : {0}".format(",".join([str(c.pid) for c in children]))) def update_metric(metric_name, proc_type, stats): stats = list(filter(lambda x: isinstance(x, (float, int)), stats)) @@ -125,10 +125,12 @@ def update_metric(metric_name, proc_type, stats): processes_stats.append({'type': ProcessType.WORKER, 'stats': child.as_dict()}) else: reclaimed_pids.append(child) - logger.debug('child {0} no longer available'.format(child.pid)) + logger.info('child {0} no longer available'.format(child.pid)) except (NoSuchProcess, ZombieProcess): reclaimed_pids.append(child) - logger.debug('child {0} no longer available'.format(child.pid)) + logger.info('child {0} no longer available'.format(child.pid)) + except psutil.AccessDenied as e: + logger.info('child {0} threw access denied {1}'.format(child.pid, str(e))) for p in reclaimed_pids: children.remove(p)