agent-Specialization/modules/skills_manager.py

212 lines
7.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Agent skills manager / 智能体技能管理器。
负责扫描全局 skills 库、生成可用清单,并同步到工作区 skills/。
"""
from __future__ import annotations
import re
import shutil
from pathlib import Path
from typing import Dict, List, Optional, Sequence
from config import AGENT_SKILLS_DIR, WORKSPACE_SKILLS_DIRNAME
from utils.logger import setup_logger
logger = setup_logger(__name__)
SKILL_FILE_NAME = "SKILL.md"
SKILL_ID_PATTERN = re.compile(r"^[A-Za-z0-9][A-Za-z0-9_-]*$")
def ensure_agent_skills_dir(base_dir: Optional[str] = None) -> Path:
"""Ensure the global skills directory exists / 确保全局技能目录存在。"""
root = Path(base_dir or AGENT_SKILLS_DIR).expanduser().resolve()
root.mkdir(parents=True, exist_ok=True)
return root
def ensure_workspace_skills_dir(project_path: str | Path) -> Path:
"""Ensure workspace skills directory exists / 确保工作区 skills 目录存在。"""
root = Path(project_path).expanduser().resolve()
skills_dir = (root / WORKSPACE_SKILLS_DIRNAME).resolve()
skills_dir.mkdir(parents=True, exist_ok=True)
return skills_dir
def _parse_frontmatter(text: str) -> Dict[str, str]:
"""Parse simple YAML frontmatter / 解析简单 YAML 头信息。"""
lines = text.splitlines()
if not lines or lines[0].strip() != "---":
return {}
end_idx = None
for idx in range(1, len(lines)):
if lines[idx].strip() == "---":
end_idx = idx
break
if end_idx is None:
return {}
meta: Dict[str, str] = {}
for line in lines[1:end_idx]:
if ":" not in line:
continue
key, value = line.split(":", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and value:
meta[key] = value
return meta
def _is_valid_skill_id(name: str) -> bool:
"""Validate skill id / 校验技能目录名。"""
return bool(name and SKILL_ID_PATTERN.match(name))
def get_skills_catalog(base_dir: Optional[str] = None) -> List[Dict[str, str]]:
"""List available skills from global library / 扫描全局技能库。"""
root = Path(base_dir or AGENT_SKILLS_DIR).expanduser().resolve()
if not root.exists() or not root.is_dir():
return []
catalog: List[Dict[str, str]] = []
for child in sorted(root.iterdir()):
if not child.is_dir():
continue
if not _is_valid_skill_id(child.name):
continue
skill_file = child / SKILL_FILE_NAME
if not skill_file.exists():
continue
meta: Dict[str, str] = {}
try:
meta = _parse_frontmatter(skill_file.read_text(encoding="utf-8"))
except Exception:
meta = {}
label = meta.get("name") or child.name
description = meta.get("description") or ""
catalog.append({
"id": child.name,
"label": label,
"description": description,
})
return catalog
def resolve_enabled_skills(
enabled_skills: Optional[Sequence[str]],
catalog: Sequence[Dict[str, str]],
) -> List[str]:
"""Resolve enabled skills list / 解析启用技能列表。"""
catalog_ids = [item.get("id", "") for item in catalog if item.get("id")]
if enabled_skills is None:
return catalog_ids
if not isinstance(enabled_skills, (list, tuple)):
return catalog_ids
allowed = set(catalog_ids)
seen = set()
resolved: List[str] = []
for item in enabled_skills:
if not isinstance(item, str):
continue
skill_id = item.strip()
if not skill_id or skill_id in seen or skill_id not in allowed:
continue
resolved.append(skill_id)
seen.add(skill_id)
return resolved
def build_skills_list(
catalog: Sequence[Dict[str, str]],
enabled_skill_ids: Sequence[str],
) -> List[str]:
"""Build skills list lines / 生成 skills 列表行。"""
if not enabled_skill_ids:
return []
lookup = {item.get("id"): item for item in catalog if item.get("id")}
lines: List[str] = []
for skill_id in enabled_skill_ids:
meta = lookup.get(skill_id)
if not meta:
continue
description = (meta.get("description") or "").strip()
if description:
lines.append(f"skills/{skill_id}{description}")
else:
lines.append(f"skills/{skill_id}")
return lines
def merge_enabled_skills(
enabled_skill_ids: Optional[Sequence[str]],
catalog: Sequence[Dict[str, str]],
catalog_snapshot: Optional[Sequence[str]] = None,
) -> List[str]:
"""Merge enabled skills with new catalog items / 合并启用列表并默认开启新增技能。"""
catalog_ids = [item.get("id") for item in catalog if item.get("id")]
if enabled_skill_ids is None:
base = list(catalog_ids)
else:
base_set = {item for item in enabled_skill_ids if isinstance(item, str)}
base = [item for item in catalog_ids if item in base_set]
if catalog_snapshot:
snapshot_set = {item for item in catalog_snapshot if isinstance(item, str)}
new_items = [item for item in catalog_ids if item not in snapshot_set]
for item in new_items:
if item not in base:
base.append(item)
return base
def build_skills_prompt(template: str, skills_list: Sequence[str]) -> str:
"""Build skills prompt from template / 根据模板生成 skills 提示。"""
if not template:
return ""
list_block = "\n".join(skills_list) if skills_list else ""
content = template
if list_block:
content = content.replace("{skills_list}", list_block)
content = re.sub(r"\[skills_empty\].*?\[/skills_empty\]\n?", "", content, flags=re.S)
else:
content = content.replace("{skills_list}", "")
content = content.replace("[skills_empty]", "").replace("[/skills_empty]", "")
return content.strip()
def sync_workspace_skills(
project_path: str | Path,
enabled_skills: Optional[Sequence[str]] = None,
base_dir: Optional[str] = None,
) -> Dict[str, object]:
"""Sync global skills into workspace / 将全局 skills 同步到工作区。"""
root = Path(project_path).expanduser().resolve()
skills_dir = (root / WORKSPACE_SKILLS_DIRNAME).resolve()
try:
skills_dir.relative_to(root)
except Exception:
return {"success": False, "error": "skills 目录不在项目路径内"}
ensure_agent_skills_dir(base_dir)
catalog = get_skills_catalog(base_dir)
resolved = resolve_enabled_skills(enabled_skills, catalog)
try:
if skills_dir.exists():
shutil.rmtree(skills_dir)
skills_dir.mkdir(parents=True, exist_ok=True)
global_root = Path(base_dir or AGENT_SKILLS_DIR).expanduser().resolve()
for skill_id in resolved:
src = global_root / skill_id
if not src.exists() or not src.is_dir():
continue
shutil.copytree(src, skills_dir / skill_id)
return {
"success": True,
"copied": list(resolved),
"available": [item.get("id") for item in catalog],
"target": str(skills_dir),
}
except Exception as exc:
logger.error("同步 skills 失败: %s", exc, exc_info=True)
return {"success": False, "error": str(exc), "target": str(skills_dir)}