Skip to content

Commit 067be88

Browse files
author
qw86972190
committed
[XPU]Add XPU memory monitor module
1 parent f69c9cd commit 067be88

File tree

2 files changed

+99
-0
lines changed

2 files changed

+99
-0
lines changed
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import os
2+
import time
3+
import threading
4+
import atexit
5+
from datetime import datetime
6+
import logging
7+
8+
from fastdeploy.model_executor.ops.xpu import (
9+
xpu_get_total_global_memory,
10+
xpu_get_used_global_memory,
11+
xpu_get_free_global_memory,
12+
)
13+
14+
15+
class XpuMemoryMonitor:
16+
"""
17+
Independent XPU memory monitor that only writes to a separate log file.
18+
19+
Args:
20+
device_ids (list[int]): List of device ids to monitor.
21+
log_path (str): Log file path. Defaults to './default.xpu.log'.
22+
interval (float): Logging interval in seconds. Defaults to 5.
23+
"""
24+
25+
def __init__(self, device_ids, log_path="./log/default.xpu.log", interval=5):
26+
self.device_ids = [int(d) for d in device_ids]
27+
self.log_path = log_path
28+
self.interval = interval
29+
self.stop_flag = False
30+
self.thread = None
31+
32+
self.logger = logging.getLogger("XPU_Monitor")
33+
self.logger.setLevel(logging.INFO)
34+
ch = logging.StreamHandler()
35+
ch.setLevel(logging.INFO)
36+
formatter = logging.Formatter("[%(asctime)s] %(levelname)s: %(message)s")
37+
ch.setFormatter(formatter)
38+
self.logger.addHandler(ch)
39+
40+
os.makedirs(os.path.dirname(self.log_path), exist_ok=True)
41+
42+
if not os.path.exists(self.log_path):
43+
with open(self.log_path, "w") as f:
44+
f.write("index,utilization_gpu,memory_total,memory_used,memory_free,timestamp\n")
45+
46+
atexit.register(self.stop)
47+
48+
def _to_number(self, x):
49+
"""Safely convert tensor or other types to float."""
50+
try:
51+
if hasattr(x, "item"):
52+
return float(x.item())
53+
elif hasattr(x, "__float__"):
54+
return float(x)
55+
elif hasattr(x, "__array__"):
56+
import numpy as np
57+
return float(np.array(x).flatten()[0])
58+
else:
59+
return float(x)
60+
except Exception:
61+
return 0.0
62+
63+
def _monitor_loop(self):
64+
"""Background thread function for periodic XPU memory monitoring."""
65+
while not self.stop_flag:
66+
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
67+
for device_id in self.device_ids:
68+
try:
69+
total = self._to_number(xpu_get_total_global_memory(device_id)) / 1024**2
70+
used = self._to_number(xpu_get_used_global_memory(device_id)) / 1024**2
71+
free = self._to_number(xpu_get_free_global_memory(device_id)) / 1024**2
72+
util = int((used / total) * 100) if total > 0 else 0
73+
74+
line = f"{device_id},{util},{int(total)},{int(used)},{int(free)},{ts}\n"
75+
with open(self.log_path, "a") as f:
76+
f.write(line)
77+
except Exception as e:
78+
with open(self.log_path, "a") as f:
79+
f.write(f"{device_id},error,0,0,0,{ts}\n")
80+
time.sleep(self.interval)
81+
82+
83+
def start(self):
84+
"""Start monitoring in a background daemon thread."""
85+
if self.thread and self.thread.is_alive():
86+
return
87+
self.stop_flag = False
88+
self.thread = threading.Thread(target=self._monitor_loop, daemon=True)
89+
self.thread.start()
90+
91+
def stop(self):
92+
"""Stop the monitoring thread."""
93+
if getattr(self, "stop_flag", False) is False:
94+
self.stop_flag = True
95+
if self.thread and self.thread.is_alive():
96+
self.thread.join(timeout=1)

fastdeploy/worker/xpu_worker.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from fastdeploy.worker.output import ModelRunnerOutput
2929
from fastdeploy.worker.worker_base import WorkerBase
3030
from fastdeploy.worker.xpu_model_runner import XPUModelRunner
31+
from fastdeploy.worker.xpu_memory_monitor import XpuMemoryMonitor
3132

3233
logger = get_logger("xpu_worker", "xpu_worker.log")
3334

@@ -67,6 +68,8 @@ def init_device(self):
6768

6869
gc.collect()
6970
paddle.device.xpu.empty_cache()
71+
self.monitor = XpuMemoryMonitor(device_ids=self.device_ids)
72+
self.monitor.start()
7073
else:
7174
raise RuntimeError(f"Not support device type: {self.device_config.device}")
7275

0 commit comments

Comments
 (0)