"""Agent skills manager / 智能体技能管理器。 负责扫描全局 skills 库、生成可用清单,并同步到工作区 skills/。 """ from __future__ import annotations import re import shutil from pathlib import Path from typing import Dict, List, Optional, Sequence from config import AGENT_SKILLS_DIR, WORKSPACE_SKILLS_DIRNAME from utils.logger import setup_logger logger = setup_logger(__name__) SKILL_FILE_NAME = "SKILL.md" SKILL_ID_PATTERN = re.compile(r"^[A-Za-z0-9][A-Za-z0-9_-]*$") def ensure_agent_skills_dir(base_dir: Optional[str] = None) -> Path: """Ensure the global skills directory exists / 确保全局技能目录存在。""" root = Path(base_dir or AGENT_SKILLS_DIR).expanduser().resolve() root.mkdir(parents=True, exist_ok=True) return root def ensure_workspace_skills_dir(project_path: str | Path) -> Path: """Ensure workspace skills directory exists / 确保工作区 skills 目录存在。""" root = Path(project_path).expanduser().resolve() skills_dir = (root / WORKSPACE_SKILLS_DIRNAME).resolve() skills_dir.mkdir(parents=True, exist_ok=True) return skills_dir def _parse_frontmatter(text: str) -> Dict[str, str]: """Parse simple YAML frontmatter / 解析简单 YAML 头信息。""" lines = text.splitlines() if not lines or lines[0].strip() != "---": return {} end_idx = None for idx in range(1, len(lines)): if lines[idx].strip() == "---": end_idx = idx break if end_idx is None: return {} meta: Dict[str, str] = {} for line in lines[1:end_idx]: if ":" not in line: continue key, value = line.split(":", 1) key = key.strip() value = value.strip().strip('"').strip("'") if key and value: meta[key] = value return meta def _is_valid_skill_id(name: str) -> bool: """Validate skill id / 校验技能目录名。""" return bool(name and SKILL_ID_PATTERN.match(name)) def get_skills_catalog(base_dir: Optional[str] = None) -> List[Dict[str, str]]: """List available skills from global library / 扫描全局技能库。""" root = Path(base_dir or AGENT_SKILLS_DIR).expanduser().resolve() if not root.exists() or not root.is_dir(): return [] catalog: List[Dict[str, str]] = [] for child in sorted(root.iterdir()): if not child.is_dir(): continue if not _is_valid_skill_id(child.name): continue skill_file = child / SKILL_FILE_NAME if not skill_file.exists(): continue meta: Dict[str, str] = {} try: meta = _parse_frontmatter(skill_file.read_text(encoding="utf-8")) except Exception: meta = {} label = meta.get("name") or child.name description = meta.get("description") or "" catalog.append({ "id": child.name, "label": label, "description": description, }) return catalog def resolve_enabled_skills( enabled_skills: Optional[Sequence[str]], catalog: Sequence[Dict[str, str]], ) -> List[str]: """Resolve enabled skills list / 解析启用技能列表。""" catalog_ids = [item.get("id", "") for item in catalog if item.get("id")] if enabled_skills is None: return catalog_ids if not isinstance(enabled_skills, (list, tuple)): return catalog_ids allowed = set(catalog_ids) seen = set() resolved: List[str] = [] for item in enabled_skills: if not isinstance(item, str): continue skill_id = item.strip() if not skill_id or skill_id in seen or skill_id not in allowed: continue resolved.append(skill_id) seen.add(skill_id) return resolved def build_skills_list( catalog: Sequence[Dict[str, str]], enabled_skill_ids: Sequence[str], ) -> List[str]: """Build skills list lines / 生成 skills 列表行。""" if not enabled_skill_ids: return [] lookup = {item.get("id"): item for item in catalog if item.get("id")} lines: List[str] = [] for skill_id in enabled_skill_ids: meta = lookup.get(skill_id) if not meta: continue description = (meta.get("description") or "").strip() if description: lines.append(f"skills/{skill_id}:{description}") else: lines.append(f"skills/{skill_id}") return lines def merge_enabled_skills( enabled_skill_ids: Optional[Sequence[str]], catalog: Sequence[Dict[str, str]], catalog_snapshot: Optional[Sequence[str]] = None, ) -> List[str]: """Merge enabled skills with new catalog items / 合并启用列表并默认开启新增技能。""" catalog_ids = [item.get("id") for item in catalog if item.get("id")] if enabled_skill_ids is None: base = list(catalog_ids) else: base_set = {item for item in enabled_skill_ids if isinstance(item, str)} base = [item for item in catalog_ids if item in base_set] if catalog_snapshot: snapshot_set = {item for item in catalog_snapshot if isinstance(item, str)} new_items = [item for item in catalog_ids if item not in snapshot_set] for item in new_items: if item not in base: base.append(item) return base def build_skills_prompt(template: str, skills_list: Sequence[str]) -> str: """Build skills prompt from template / 根据模板生成 skills 提示。""" if not template: return "" list_block = "\n".join(skills_list) if skills_list else "" content = template if list_block: content = content.replace("{skills_list}", list_block) content = re.sub(r"\[skills_empty\].*?\[/skills_empty\]\n?", "", content, flags=re.S) else: content = content.replace("{skills_list}", "") content = content.replace("[skills_empty]", "").replace("[/skills_empty]", "") return content.strip() def sync_workspace_skills( project_path: str | Path, enabled_skills: Optional[Sequence[str]] = None, base_dir: Optional[str] = None, ) -> Dict[str, object]: """Sync global skills into workspace / 将全局 skills 同步到工作区。""" root = Path(project_path).expanduser().resolve() skills_dir = (root / WORKSPACE_SKILLS_DIRNAME).resolve() try: skills_dir.relative_to(root) except Exception: return {"success": False, "error": "skills 目录不在项目路径内"} ensure_agent_skills_dir(base_dir) catalog = get_skills_catalog(base_dir) resolved = resolve_enabled_skills(enabled_skills, catalog) try: if skills_dir.exists(): shutil.rmtree(skills_dir) skills_dir.mkdir(parents=True, exist_ok=True) global_root = Path(base_dir or AGENT_SKILLS_DIR).expanduser().resolve() for skill_id in resolved: src = global_root / skill_id if not src.exists() or not src.is_dir(): continue shutil.copytree(src, skills_dir / skill_id) return { "success": True, "copied": list(resolved), "available": [item.get("id") for item in catalog], "target": str(skills_dir), } except Exception as exc: logger.error("同步 skills 失败: %s", exc, exc_info=True) return {"success": False, "error": str(exc), "target": str(skills_dir)}