#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 内容自动化创作Agent主程序 功能:爬取内容 → 提取文案 → 智能改稿 → 生成配音/视频 """ import asyncio import logging import sys from pathlib import Path from typing import Optional, Dict, Any # 导入自定义模块 from src.crawler.web_crawler import WebCrawler from src.extractor.content_extractor import ContentExtractor from src.rewriter.content_rewriter import ContentRewriter from src.voice_generator.voice_generator import create_voice_generator from src.video_generator.video_generator import create_video_generator from src.utils.config_loader import ConfigLoader from src.utils.logger import setup_logger class ContentAutomationAgent: """内容自动化创作Agent主类""" def __init__(self, config_path: str = "config/config.yaml"): """ 初始化Agent Args: config_path: 配置文件路径 """ self.config = ConfigLoader.load_config(config_path) self.logger = setup_logger("ContentAutomationAgent", self.config) # 初始化各个模块 self.crawler = WebCrawler(self.config) self.extractor = ContentExtractor(self.config) self.rewriter = ContentRewriter(self.config) self.voice_generator = create_voice_generator(self.config) self.video_generator = create_video_generator(self.config) self.logger.info("内容自动化创作Agent初始化完成") async def process_url(self, url: str, output_format: str = "voice") -> Dict[str, Any]: """ 处理单个URL的完整流程 Args: url: 目标URL output_format: 输出格式 ("voice", "video", "text") Returns: 处理结果字典 """ self.logger.info(f"开始处理URL: {url}") try: # 步骤1: 爬取网页内容 self.logger.info("步骤1: 爬取网页内容...") raw_html = await self.crawler.crawl(url) # 步骤2: 提取文案内容 self.logger.info("步骤2: 提取文案内容...") extracted_content = self.extractor.extract(raw_html, url) if not extracted_content: raise ValueError("未能从网页中提取有效内容") # 步骤3: 智能改稿 self.logger.info("步骤3: 智能改稿...") rewritten_content = self.rewriter.rewrite(extracted_content) # 步骤4: 根据需求生成配音或视频 result = { "url": url, "original_content": extracted_content, "rewritten_content": rewritten_content, "output_files": [] } if output_format == "voice": self.logger.info("步骤4: 生成配音...") voice_file = await self.voice_generator.generate(rewritten_content) result["output_files"].append(voice_file) result["voice_file"] = voice_file elif output_format == "video": self.logger.info("步骤4: 生成视频...") video_file = await self.video_generator.generate(rewritten_content) result["output_files"].append(video_file) result["video_file"] = video_file self.logger.info(f"处理完成: {url}") return result except Exception as e: self.logger.error(f"处理URL失败 {url}: {str(e)}") raise async def process_batch(self, urls: list, output_format: str = "voice") -> list: """ 批量处理多个URL Args: urls: URL列表 output_format: 输出格式 Returns: 处理结果列表 """ self.logger.info(f"开始批量处理 {len(urls)} 个URL") tasks = [] for url in urls: task = self.process_url(url, output_format) tasks.append(task) results = await asyncio.gather(*tasks, return_exceptions=True) # 统计结果 successful = sum(1 for r in results if not isinstance(r, Exception)) failed = sum(1 for r in results if isinstance(r, Exception)) self.logger.info(f"批量处理完成: 成功 {successful}, 失败 {failed}") return results def generate_report(self, results: list) -> str: """ 生成处理报告 Args: results: 处理结果列表 Returns: 报告文件路径 """ report_content = [] report_content.append("内容自动化创作Agent处理报告") report_content.append("=" * 50) report_content.append("") for i, result in enumerate(results, 1): if isinstance(result, Exception): report_content.append(f"任务 {i}: 失败 - {str(result)}") else: report_content.append(f"任务 {i}: 成功") report_content.append(f" URL: {result['url']}") report_content.append(f" 原始内容长度: {len(result['original_content'])} 字符") report_content.append(f" 改写后长度: {len(result['rewritten_content'])} 字符") report_content.append(f" 输出文件: {', '.join(result['output_files'])}") report_content.append("") # 保存报告 report_path = Path(self.config['paths']['output']) / f"report_{asyncio.get_event_loop().time()}.txt" report_path.parent.mkdir(parents=True, exist_ok=True) with open(report_path, 'w', encoding='utf-8') as f: f.write('\n'.join(report_content)) self.logger.info(f"报告已生成: {report_path}") return str(report_path) async def main(): """主函数""" # 检查命令行参数 if len(sys.argv) < 2: print("用法: python main.py [url2] ... [选项]") print("选项:") print(" --format voice 生成配音文件(默认)") print(" --format video 生成视频文件") print(" --format text 仅生成文本") sys.exit(1) # 解析参数 urls = [] output_format = "voice" for arg in sys.argv[1:]: if arg.startswith("--format="): output_format = arg.split("=")[1] elif arg.startswith("--format"): # 处理 --format video 这种格式 format_index = sys.argv.index(arg) if format_index + 1 < len(sys.argv): output_format = sys.argv[format_index + 1] elif not arg.startswith("--"): urls.append(arg) if not urls: print("错误: 请提供至少一个URL") sys.exit(1) # 初始化Agent try: agent = ContentAutomationAgent() except Exception as e: print(f"初始化Agent失败: {e}") sys.exit(1) # 处理URL try: if len(urls) == 1: # 单个URL处理 result = await agent.process_url(urls[0], output_format) print(f"处理完成!") print(f"输出文件: {result['output_files']}") else: # 批量处理 results = await agent.process_batch(urls, output_format) report_path = agent.generate_report(results) print(f"批量处理完成!") print(f"详细报告: {report_path}") except Exception as e: print(f"处理失败: {e}") sys.exit(1) if __name__ == "__main__": asyncio.run(main())