Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions fastdeploy/worker/xpu_memory_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import atexit
import logging
import os
import threading
import time
from datetime import datetime

from fastdeploy.model_executor.ops.xpu import (
xpu_get_free_global_memory,
xpu_get_total_global_memory,
xpu_get_used_global_memory,
)


class XpuMemoryMonitor:
"""
Independent XPU memory monitor that only writes to a separate log file.

Args:
device_ids (list[int]): List of device ids to monitor.
log_path (str): Log file path. Defaults to './default.xpu.log'.
interval (float): Logging interval in seconds. Defaults to 5.
"""

def __init__(self, device_ids, log_path="./log/default.xpu.log", interval=5):
self.device_ids = [int(d) for d in device_ids]
self.log_path = log_path
self.interval = interval
self.stop_flag = False
self.thread = None

self.logger = logging.getLogger("XPU_Monitor")
self.logger.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
formatter = logging.Formatter("[%(asctime)s] %(levelname)s: %(message)s")
ch.setFormatter(formatter)
self.logger.addHandler(ch)

os.makedirs(os.path.dirname(self.log_path), exist_ok=True)

if not os.path.exists(self.log_path):
with open(self.log_path, "w") as f:
f.write("index,utilization_gpu,memory_total,memory_used,memory_free,timestamp\n")

atexit.register(self.stop)

def _to_number(self, x):
"""Safely convert tensor or other types to float."""
try:
if hasattr(x, "item"):
return float(x.item())
elif hasattr(x, "__float__"):
return float(x)
elif hasattr(x, "__array__"):
import numpy as np

return float(np.array(x).flatten()[0])
else:
return float(x)
except Exception:
return 0.0

def _monitor_loop(self):
"""Background thread function for periodic XPU memory monitoring."""
while not self.stop_flag:
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
for device_id in self.device_ids:
try:
total = self._to_number(xpu_get_total_global_memory(device_id)) / 1024**2
used = self._to_number(xpu_get_used_global_memory(device_id)) / 1024**2
free = self._to_number(xpu_get_free_global_memory(device_id)) / 1024**2
util = int((used / total) * 100) if total > 0 else 0

line = f"{device_id},{util},{int(total)},{int(used)},{int(free)},{ts}\n"
with open(self.log_path, "a") as f:
f.write(line)
except Exception:
with open(self.log_path, "a") as f:
f.write(f"{device_id},error,0,0,0,{ts}\n")
time.sleep(self.interval)

def start(self):
"""Start monitoring in a background daemon thread."""
if self.thread and self.thread.is_alive():
return
self.stop_flag = False
self.thread = threading.Thread(target=self._monitor_loop, daemon=True)
self.thread.start()

def stop(self):
"""Stop the monitoring thread."""
if getattr(self, "stop_flag", False) is False:
self.stop_flag = True
if self.thread and self.thread.is_alive():
self.thread.join(timeout=1)
3 changes: 3 additions & 0 deletions fastdeploy/worker/xpu_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from fastdeploy.utils import get_logger, set_random_seed
from fastdeploy.worker.output import ModelRunnerOutput
from fastdeploy.worker.worker_base import WorkerBase
from fastdeploy.worker.xpu_memory_monitor import XpuMemoryMonitor
from fastdeploy.worker.xpu_model_runner import XPUModelRunner

logger = get_logger("xpu_worker", "xpu_worker.log")
Expand Down Expand Up @@ -67,6 +68,8 @@ def init_device(self):

gc.collect()
paddle.device.xpu.empty_cache()
self.monitor = XpuMemoryMonitor(device_ids=self.device_ids)
self.monitor.start()
else:
raise RuntimeError(f"Not support device type: {self.device_config.device}")

Expand Down
Loading