diff --git a/fastdeploy/worker/xpu_memory_monitor.py b/fastdeploy/worker/xpu_memory_monitor.py new file mode 100644 index 00000000000..2af7a711404 --- /dev/null +++ b/fastdeploy/worker/xpu_memory_monitor.py @@ -0,0 +1,96 @@ +import atexit +import logging +import os +import threading +import time +from datetime import datetime + +from fastdeploy.model_executor.ops.xpu import ( + xpu_get_free_global_memory, + xpu_get_total_global_memory, + xpu_get_used_global_memory, +) + + +class XpuMemoryMonitor: + """ + Independent XPU memory monitor that only writes to a separate log file. + + Args: + device_ids (list[int]): List of device ids to monitor. + log_path (str): Log file path. Defaults to './default.xpu.log'. + interval (float): Logging interval in seconds. Defaults to 5. + """ + + def __init__(self, device_ids, log_path="./log/default.xpu.log", interval=5): + self.device_ids = [int(d) for d in device_ids] + self.log_path = log_path + self.interval = interval + self.stop_flag = False + self.thread = None + + self.logger = logging.getLogger("XPU_Monitor") + self.logger.setLevel(logging.INFO) + ch = logging.StreamHandler() + ch.setLevel(logging.INFO) + formatter = logging.Formatter("[%(asctime)s] %(levelname)s: %(message)s") + ch.setFormatter(formatter) + self.logger.addHandler(ch) + + os.makedirs(os.path.dirname(self.log_path), exist_ok=True) + + if not os.path.exists(self.log_path): + with open(self.log_path, "w") as f: + f.write("index,utilization_gpu,memory_total,memory_used,memory_free,timestamp\n") + + atexit.register(self.stop) + + def _to_number(self, x): + """Safely convert tensor or other types to float.""" + try: + if hasattr(x, "item"): + return float(x.item()) + elif hasattr(x, "__float__"): + return float(x) + elif hasattr(x, "__array__"): + import numpy as np + + return float(np.array(x).flatten()[0]) + else: + return float(x) + except Exception: + return 0.0 + + def _monitor_loop(self): + """Background thread function for periodic XPU memory monitoring.""" + while not self.stop_flag: + ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + for device_id in self.device_ids: + try: + total = self._to_number(xpu_get_total_global_memory(device_id)) / 1024**2 + used = self._to_number(xpu_get_used_global_memory(device_id)) / 1024**2 + free = self._to_number(xpu_get_free_global_memory(device_id)) / 1024**2 + util = int((used / total) * 100) if total > 0 else 0 + + line = f"{device_id},{util},{int(total)},{int(used)},{int(free)},{ts}\n" + with open(self.log_path, "a") as f: + f.write(line) + except Exception: + with open(self.log_path, "a") as f: + f.write(f"{device_id},error,0,0,0,{ts}\n") + time.sleep(self.interval) + + def start(self): + """Start monitoring in a background daemon thread.""" + if self.thread and self.thread.is_alive(): + return + self.stop_flag = False + self.thread = threading.Thread(target=self._monitor_loop, daemon=True) + self.thread.start() + + def stop(self): + """Stop the monitoring thread.""" + if getattr(self, "stop_flag", False) is False: + self.stop_flag = True + if self.thread and self.thread.is_alive(): + self.thread.join(timeout=1) diff --git a/fastdeploy/worker/xpu_worker.py b/fastdeploy/worker/xpu_worker.py index 0f84b1db086..f213d56bc1f 100644 --- a/fastdeploy/worker/xpu_worker.py +++ b/fastdeploy/worker/xpu_worker.py @@ -27,6 +27,7 @@ from fastdeploy.utils import get_logger, set_random_seed from fastdeploy.worker.output import ModelRunnerOutput from fastdeploy.worker.worker_base import WorkerBase +from fastdeploy.worker.xpu_memory_monitor import XpuMemoryMonitor from fastdeploy.worker.xpu_model_runner import XPUModelRunner logger = get_logger("xpu_worker", "xpu_worker.log") @@ -67,6 +68,8 @@ def init_device(self): gc.collect() paddle.device.xpu.empty_cache() + self.monitor = XpuMemoryMonitor(device_ids=self.device_ids) + self.monitor.start() else: raise RuntimeError(f"Not support device type: {self.device_config.device}")