agent-Specialization/modules/container_monitor.py

162 lines
4.1 KiB
Python

"""Collect resource metrics for Docker-based user containers."""
from __future__ import annotations
import json
import re
import shutil
import subprocess
import time
from typing import Dict, Optional, Tuple
SIZE_UNITS = {
"b": 1,
"kb": 1000,
"kib": 1024,
"mb": 1000 ** 2,
"mib": 1024 ** 2,
"gb": 1000 ** 3,
"gib": 1024 ** 3,
"tb": 1000 ** 4,
"tib": 1024 ** 4,
}
def _parse_size(value: str) -> Optional[int]:
if not value:
return None
value = value.strip()
if value in {"", "0"}:
return 0
match = re.match(r"([\d\.]+)\s*([a-zA-Z]+)", value)
if not match:
try:
return int(float(value))
except ValueError:
return None
number = float(match.group(1))
unit = match.group(2).lower()
multiplier = SIZE_UNITS.get(unit, 1)
return int(number * multiplier)
def _parse_pair(value: str) -> Tuple[Optional[int], Optional[int]]:
if not value:
return (None, None)
parts = [part.strip() for part in value.split("/", 1)]
if len(parts) == 1:
return _parse_size(parts[0]), None
return _parse_size(parts[0]), _parse_size(parts[1])
def _parse_percent(value: str) -> Optional[float]:
if not value:
return None
value = value.strip().rstrip("%")
try:
return float(value)
except ValueError:
return None
def collect_stats(container_name: str, docker_bin: Optional[str] = None) -> Optional[Dict]:
"""Return docker stats metrics for a running container."""
docker_bin = docker_bin or shutil.which("docker")
if not docker_bin:
return None
cmd = [
docker_bin,
"stats",
container_name,
"--no-stream",
"--format",
"{{json .}}",
]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=False,
timeout=5,
)
except (OSError, subprocess.SubprocessError):
return None
if result.returncode != 0:
return None
raw_output = (result.stdout or "").strip()
if not raw_output:
return None
try:
data = json.loads(raw_output.splitlines()[-1])
except json.JSONDecodeError:
return None
mem_used, mem_limit = _parse_pair(data.get("MemUsage"))
net_rx, net_tx = _parse_pair(data.get("NetIO"))
block_read, block_write = _parse_pair(data.get("BlockIO"))
return {
"timestamp": time.time(),
"cpu_percent": _parse_percent(data.get("CPUPerc")),
"memory": {
"used_bytes": mem_used,
"limit_bytes": mem_limit,
"percent": _parse_percent(data.get("MemPerc")),
"raw": data.get("MemUsage"),
},
"net_io": {
"rx_bytes": net_rx,
"tx_bytes": net_tx,
"raw": data.get("NetIO"),
},
"block_io": {
"read_bytes": block_read,
"write_bytes": block_write,
"raw": data.get("BlockIO"),
},
"pids": int(data.get("PIDs", 0)) if data.get("PIDs") else None,
"raw": data,
}
def inspect_state(container_name: str, docker_bin: Optional[str] = None) -> Optional[Dict]:
docker_bin = docker_bin or shutil.which("docker")
if not docker_bin:
return None
cmd = [
docker_bin,
"inspect",
"-f",
"{{json .State}}",
container_name,
]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=False,
timeout=5,
)
except (OSError, subprocess.SubprocessError):
return None
if result.returncode != 0:
return None
raw = (result.stdout or "").strip()
if not raw:
return None
try:
state = json.loads(raw)
except json.JSONDecodeError:
return None
return {
"status": state.get("Status"),
"running": state.get("Running"),
"started_at": state.get("StartedAt"),
"finished_at": state.get("FinishedAt"),
"pid": state.get("Pid"),
"error": state.get("Error"),
"exit_code": state.get("ExitCode"),
}