212 lines
7.3 KiB
Python
212 lines
7.3 KiB
Python
"""Agent skills manager / 智能体技能管理器。
|
||
|
||
负责扫描全局 skills 库、生成可用清单,并同步到工作区 skills/。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import re
|
||
import shutil
|
||
from pathlib import Path
|
||
from typing import Dict, List, Optional, Sequence
|
||
|
||
from config import AGENT_SKILLS_DIR, WORKSPACE_SKILLS_DIRNAME
|
||
from utils.logger import setup_logger
|
||
|
||
logger = setup_logger(__name__)
|
||
|
||
SKILL_FILE_NAME = "SKILL.md"
|
||
SKILL_ID_PATTERN = re.compile(r"^[A-Za-z0-9][A-Za-z0-9_-]*$")
|
||
|
||
|
||
def ensure_agent_skills_dir(base_dir: Optional[str] = None) -> Path:
|
||
"""Ensure the global skills directory exists / 确保全局技能目录存在。"""
|
||
root = Path(base_dir or AGENT_SKILLS_DIR).expanduser().resolve()
|
||
root.mkdir(parents=True, exist_ok=True)
|
||
return root
|
||
|
||
|
||
def ensure_workspace_skills_dir(project_path: str | Path) -> Path:
|
||
"""Ensure workspace skills directory exists / 确保工作区 skills 目录存在。"""
|
||
root = Path(project_path).expanduser().resolve()
|
||
skills_dir = (root / WORKSPACE_SKILLS_DIRNAME).resolve()
|
||
skills_dir.mkdir(parents=True, exist_ok=True)
|
||
return skills_dir
|
||
|
||
|
||
def _parse_frontmatter(text: str) -> Dict[str, str]:
|
||
"""Parse simple YAML frontmatter / 解析简单 YAML 头信息。"""
|
||
lines = text.splitlines()
|
||
if not lines or lines[0].strip() != "---":
|
||
return {}
|
||
end_idx = None
|
||
for idx in range(1, len(lines)):
|
||
if lines[idx].strip() == "---":
|
||
end_idx = idx
|
||
break
|
||
if end_idx is None:
|
||
return {}
|
||
meta: Dict[str, str] = {}
|
||
for line in lines[1:end_idx]:
|
||
if ":" not in line:
|
||
continue
|
||
key, value = line.split(":", 1)
|
||
key = key.strip()
|
||
value = value.strip().strip('"').strip("'")
|
||
if key and value:
|
||
meta[key] = value
|
||
return meta
|
||
|
||
|
||
def _is_valid_skill_id(name: str) -> bool:
|
||
"""Validate skill id / 校验技能目录名。"""
|
||
return bool(name and SKILL_ID_PATTERN.match(name))
|
||
|
||
|
||
def get_skills_catalog(base_dir: Optional[str] = None) -> List[Dict[str, str]]:
|
||
"""List available skills from global library / 扫描全局技能库。"""
|
||
root = Path(base_dir or AGENT_SKILLS_DIR).expanduser().resolve()
|
||
if not root.exists() or not root.is_dir():
|
||
return []
|
||
catalog: List[Dict[str, str]] = []
|
||
for child in sorted(root.iterdir()):
|
||
if not child.is_dir():
|
||
continue
|
||
if not _is_valid_skill_id(child.name):
|
||
continue
|
||
skill_file = child / SKILL_FILE_NAME
|
||
if not skill_file.exists():
|
||
continue
|
||
meta: Dict[str, str] = {}
|
||
try:
|
||
meta = _parse_frontmatter(skill_file.read_text(encoding="utf-8"))
|
||
except Exception:
|
||
meta = {}
|
||
label = meta.get("name") or child.name
|
||
description = meta.get("description") or ""
|
||
catalog.append({
|
||
"id": child.name,
|
||
"label": label,
|
||
"description": description,
|
||
})
|
||
return catalog
|
||
|
||
|
||
def resolve_enabled_skills(
|
||
enabled_skills: Optional[Sequence[str]],
|
||
catalog: Sequence[Dict[str, str]],
|
||
) -> List[str]:
|
||
"""Resolve enabled skills list / 解析启用技能列表。"""
|
||
catalog_ids = [item.get("id", "") for item in catalog if item.get("id")]
|
||
if enabled_skills is None:
|
||
return catalog_ids
|
||
if not isinstance(enabled_skills, (list, tuple)):
|
||
return catalog_ids
|
||
allowed = set(catalog_ids)
|
||
seen = set()
|
||
resolved: List[str] = []
|
||
for item in enabled_skills:
|
||
if not isinstance(item, str):
|
||
continue
|
||
skill_id = item.strip()
|
||
if not skill_id or skill_id in seen or skill_id not in allowed:
|
||
continue
|
||
resolved.append(skill_id)
|
||
seen.add(skill_id)
|
||
return resolved
|
||
|
||
|
||
def build_skills_list(
|
||
catalog: Sequence[Dict[str, str]],
|
||
enabled_skill_ids: Sequence[str],
|
||
) -> List[str]:
|
||
"""Build skills list lines / 生成 skills 列表行。"""
|
||
if not enabled_skill_ids:
|
||
return []
|
||
lookup = {item.get("id"): item for item in catalog if item.get("id")}
|
||
lines: List[str] = []
|
||
for skill_id in enabled_skill_ids:
|
||
meta = lookup.get(skill_id)
|
||
if not meta:
|
||
continue
|
||
description = (meta.get("description") or "").strip()
|
||
if description:
|
||
lines.append(f"skills/{skill_id}:{description}")
|
||
else:
|
||
lines.append(f"skills/{skill_id}")
|
||
return lines
|
||
|
||
|
||
def merge_enabled_skills(
|
||
enabled_skill_ids: Optional[Sequence[str]],
|
||
catalog: Sequence[Dict[str, str]],
|
||
catalog_snapshot: Optional[Sequence[str]] = None,
|
||
) -> List[str]:
|
||
"""Merge enabled skills with new catalog items / 合并启用列表并默认开启新增技能。"""
|
||
catalog_ids = [item.get("id") for item in catalog if item.get("id")]
|
||
if enabled_skill_ids is None:
|
||
base = list(catalog_ids)
|
||
else:
|
||
base_set = {item for item in enabled_skill_ids if isinstance(item, str)}
|
||
base = [item for item in catalog_ids if item in base_set]
|
||
if catalog_snapshot:
|
||
snapshot_set = {item for item in catalog_snapshot if isinstance(item, str)}
|
||
new_items = [item for item in catalog_ids if item not in snapshot_set]
|
||
for item in new_items:
|
||
if item not in base:
|
||
base.append(item)
|
||
return base
|
||
|
||
|
||
def build_skills_prompt(template: str, skills_list: Sequence[str]) -> str:
|
||
"""Build skills prompt from template / 根据模板生成 skills 提示。"""
|
||
if not template:
|
||
return ""
|
||
list_block = "\n".join(skills_list) if skills_list else ""
|
||
content = template
|
||
if list_block:
|
||
content = content.replace("{skills_list}", list_block)
|
||
content = re.sub(r"\[skills_empty\].*?\[/skills_empty\]\n?", "", content, flags=re.S)
|
||
else:
|
||
content = content.replace("{skills_list}", "")
|
||
content = content.replace("[skills_empty]", "").replace("[/skills_empty]", "")
|
||
return content.strip()
|
||
|
||
|
||
def sync_workspace_skills(
|
||
project_path: str | Path,
|
||
enabled_skills: Optional[Sequence[str]] = None,
|
||
base_dir: Optional[str] = None,
|
||
) -> Dict[str, object]:
|
||
"""Sync global skills into workspace / 将全局 skills 同步到工作区。"""
|
||
root = Path(project_path).expanduser().resolve()
|
||
skills_dir = (root / WORKSPACE_SKILLS_DIRNAME).resolve()
|
||
try:
|
||
skills_dir.relative_to(root)
|
||
except Exception:
|
||
return {"success": False, "error": "skills 目录不在项目路径内"}
|
||
|
||
ensure_agent_skills_dir(base_dir)
|
||
catalog = get_skills_catalog(base_dir)
|
||
resolved = resolve_enabled_skills(enabled_skills, catalog)
|
||
|
||
try:
|
||
if skills_dir.exists():
|
||
shutil.rmtree(skills_dir)
|
||
skills_dir.mkdir(parents=True, exist_ok=True)
|
||
global_root = Path(base_dir or AGENT_SKILLS_DIR).expanduser().resolve()
|
||
for skill_id in resolved:
|
||
src = global_root / skill_id
|
||
if not src.exists() or not src.is_dir():
|
||
continue
|
||
shutil.copytree(src, skills_dir / skill_id)
|
||
return {
|
||
"success": True,
|
||
"copied": list(resolved),
|
||
"available": [item.get("id") for item in catalog],
|
||
"target": str(skills_dir),
|
||
}
|
||
except Exception as exc:
|
||
logger.error("同步 skills 失败: %s", exc, exc_info=True)
|
||
return {"success": False, "error": str(exc), "target": str(skills_dir)}
|