Spaces:
Runtime error
Runtime error
| import asyncio | |
| import functools | |
| import hashlib | |
| import importlib | |
| import json | |
| import os | |
| import shutil | |
| import tempfile | |
| import sys | |
| import traceback | |
| import uuid | |
| import subprocess | |
| import threading | |
| from contextlib import asynccontextmanager | |
| from copy import deepcopy | |
| from datetime import datetime | |
| from typing import Optional, Tuple | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor | |
| import gradio as gr | |
| from pdf2image import convert_from_path | |
| from pptx import Presentation as PptxPresentation | |
| sys.path.append("./") | |
| import pptagent.induct as induct | |
| import pptagent.pptgen as pptgen | |
| from pptagent.document import Document | |
| from pptagent.model_utils import ModelManager, parse_pdf | |
| from pptagent.multimodal import ImageLabler | |
| from pptagent.presentation import Presentation | |
| from pptagent.utils import Config, get_logger, package_join, pjoin, ppt_to_images_async | |
| async def run_blocking(func, *args, **kw): | |
| loop = asyncio.get_running_loop() | |
| return await loop.run_in_executor(None, functools.partial(func, *args, **kw)) | |
| async def run_cmd(cmd: list[str]): | |
| proc = await asyncio.create_subprocess_exec( | |
| *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE | |
| ) | |
| stdout, stderr = await proc.communicate() | |
| if proc.returncode != 0: | |
| raise RuntimeError(f"{' '.join(cmd)}\n{stderr.decode()}") | |
| return stdout | |
| # Constants | |
| DEBUG = True if len(sys.argv) == 1 else False | |
| RUNS_DIR = package_join("runs") | |
| STAGES = ["PPT Parsing", "PDF Parsing", "PPT Analysis", "PPT Generation", "Success!"] | |
| # Create a temp directory for Gradio outputs | |
| GRADIO_TEMP_DIR = os.path.join(tempfile.gettempdir(), "gradio_ppt_agent") | |
| os.makedirs(GRADIO_TEMP_DIR, exist_ok=True) | |
| # Global variables | |
| ppt_video_progress_store: dict[str, dict] = {} | |
| progress_store: dict[str, dict] = {} | |
| models = None # Initialize as None, will be set in main thread | |
| logger = get_logger(__name__) | |
| executor = ThreadPoolExecutor(max_workers=2) | |
| # 在文件顶部添加默认模板配置 | |
| DEFAULT_TEMPLATES = [ | |
| { | |
| "name": "Template1", | |
| "path": "templates/Template1.pptx", | |
| "preview": "templates/previews/Template1.jpg" | |
| }, | |
| { | |
| "name": "Template2", | |
| "path": "templates/Template2.pptx", | |
| "preview": "templates/previews/Template2.jpg" | |
| }, | |
| { | |
| "name": "Template3", | |
| "path": "templates/Template3.pptx", | |
| "preview": "templates/previews/Template3.jpg" | |
| }, | |
| ] | |
| # 新增函数:获取默认模板列表 | |
| def get_default_templates(): | |
| """获取可用的默认模板""" | |
| available_templates = [] | |
| base_dir = os.path.dirname(__file__) | |
| for template in DEFAULT_TEMPLATES: | |
| template_path = os.path.join(base_dir, template["path"]) | |
| preview_path = os.path.join(base_dir, template["preview"]) | |
| if os.path.exists(template_path) and os.path.exists(preview_path): | |
| available_templates.append({ | |
| "name": template["name"], | |
| "path": template_path, | |
| "preview": preview_path | |
| }) | |
| return available_templates | |
| # 新增函数:模板选择回调 | |
| def select_template(selected_template_name): | |
| """选择默认模板""" | |
| if selected_template_name == "Upload Custom": | |
| return gr.update(visible=True), gr.update(visible=False), None | |
| else: | |
| templates = get_default_templates() | |
| for template in templates: | |
| if template["name"] == selected_template_name: | |
| return gr.update(visible=False), gr.update(visible=True), template["path"] | |
| return gr.update(visible=True), gr.update(visible=False), None | |
| # 新增函数:创建模板选择界面 | |
| def create_template_selection(): | |
| """创建模板选择界面""" | |
| templates = get_default_templates() | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Choose Template") | |
| # 创建模板选择按钮 | |
| template_choices = [template["name"] for template in templates] + ["Upload Custom"] | |
| template_radio = gr.Radio( | |
| choices=template_choices, | |
| value="Upload Custom", | |
| label="Select Template Type" | |
| ) | |
| # 默认模板预览 | |
| template_preview = gr.Gallery( | |
| value=[[template["preview"], template["name"]] for template in templates], | |
| label="Template Previews", | |
| columns=2, | |
| rows=2, | |
| height="auto", | |
| visible=True | |
| ) | |
| # 自定义上传区域 | |
| custom_upload = gr.File( | |
| label="Upload Custom PPT Template", | |
| file_types=[".pptx"], | |
| type="filepath", | |
| visible=True | |
| ) | |
| # 显示选中的模板 | |
| selected_template_display = gr.Textbox( | |
| label="Selected Template", | |
| interactive=False, | |
| visible=False | |
| ) | |
| return template_radio, template_preview, custom_upload, selected_template_display | |
| def copy_to_gradio_safe_path(source_path: str, filename: str = None) -> str: | |
| """ | |
| Copy file to a Gradio-safe location (temp directory) | |
| Args: | |
| source_path: Path to source file | |
| filename: Optional custom filename, defaults to original filename | |
| Returns: | |
| Path to copied file in temp directory | |
| """ | |
| if not os.path.exists(source_path): | |
| raise FileNotFoundError(f"Source file not found: {source_path}") | |
| if filename is None: | |
| filename = os.path.basename(source_path) | |
| # Create unique filename to avoid conflicts | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| name, ext = os.path.splitext(filename) | |
| safe_filename = f"{name}_{timestamp}{ext}" | |
| safe_path = os.path.join(GRADIO_TEMP_DIR, safe_filename) | |
| shutil.copy2(source_path, safe_path) | |
| return safe_path | |
| # Initialize models with custom configuration | |
| def init_models(api_key: str = None, api_base: str = None, language_model: str = None, | |
| vision_model: str = None, text_model: str = None): | |
| """Initialize models with custom configuration""" | |
| global models | |
| try: | |
| # Set environment variables if provided | |
| if api_key: | |
| os.environ["OPENAI_API_KEY"] = api_key | |
| if api_base: | |
| os.environ["API_BASE"] = api_base | |
| if language_model: | |
| os.environ["LANGUAGE_MODEL"] = language_model | |
| if vision_model: | |
| os.environ["VISION_MODEL"] = vision_model | |
| if text_model: | |
| os.environ["TEXT_MODEL"] = text_model | |
| # Initialize models | |
| models = ModelManager( | |
| api_base=api_base, | |
| api_key=api_key, | |
| language_model_name=language_model, | |
| vision_model_name=vision_model, | |
| text_model_name=text_model | |
| ) | |
| # Test connections in main thread | |
| try: | |
| loop = asyncio.get_event_loop() | |
| except RuntimeError: | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| result = loop.run_until_complete(models.test_connections()) | |
| assert result, "Model connection test failed" | |
| logger.info("Models initialized successfully") | |
| return "✅ Models initialized successfully" | |
| except Exception as e: | |
| error_msg = f"❌ Model initialization failed: {e}" | |
| logger.error(error_msg) | |
| return error_msg | |
| class GradioProgressManager: | |
| def __init__(self, task_id: str, stages: list[str], progress_callback=None): | |
| self.task_id = task_id | |
| self.stages = stages | |
| self.progress_callback = progress_callback | |
| self.failed = False | |
| self.current_stage = 0 | |
| self.total_stages = len(stages) | |
| async def report_progress(self): | |
| self.current_stage += 1 | |
| progress = int((self.current_stage / self.total_stages) * 100) | |
| status = f"Stage: {self.stages[self.current_stage - 1]}" | |
| if self.progress_callback: | |
| self.progress_callback(progress, status) | |
| return progress, status | |
| async def fail_stage(self, error_message: str): | |
| error_status = f"{self.stages[self.current_stage]} Error: {error_message}" | |
| if self.progress_callback: | |
| self.progress_callback(100, error_status) | |
| self.failed = True | |
| logger.error(f"{self.task_id}: {error_status}") | |
| return error_status | |
| def generate_ppt(pptx_file, pdf_file, num_pages, progress=gr.Progress()): | |
| """Generate PPT from template and PDF""" | |
| try: | |
| # Make sure models are initialized | |
| if models is None: | |
| return None, "❌ Please initialize models first in the Configuration tab" | |
| # Create task ID | |
| task_id = datetime.now().strftime("20%y-%m-%d") + "/" + str(uuid.uuid4()) | |
| logger.info(f"PPT generation task created: {task_id}") | |
| # Create directories | |
| os.makedirs(pjoin(RUNS_DIR, task_id), exist_ok=True) | |
| task = { | |
| "numberOfPages": num_pages, | |
| "pptx": "default_template", | |
| } | |
| # Handle PPT template | |
| if pptx_file is not None: | |
| pptx_blob = open(pptx_file, "rb").read() | |
| pptx_md5 = hashlib.md5(pptx_blob).hexdigest() | |
| task["pptx"] = pptx_md5 | |
| pptx_dir = pjoin(RUNS_DIR, "pptx", pptx_md5) | |
| if not os.path.exists(pptx_dir): | |
| os.makedirs(pptx_dir, exist_ok=True) | |
| with open(pjoin(pptx_dir, "source.pptx"), "wb") as f: | |
| f.write(pptx_blob) | |
| # Handle PDF | |
| if pdf_file is not None: | |
| pdf_blob = open(pdf_file, "rb").read() | |
| pdf_md5 = hashlib.md5(pdf_blob).hexdigest() | |
| task["pdf"] = pdf_md5 | |
| pdf_dir = pjoin(RUNS_DIR, "pdf", pdf_md5) | |
| if not os.path.exists(pdf_dir): | |
| os.makedirs(pdf_dir, exist_ok=True) | |
| with open(pjoin(pdf_dir, "source.pdf"), "wb") as f: | |
| f.write(pdf_blob) | |
| else: | |
| return None, "❌ Please provide a PDF file" | |
| progress_store[task_id] = task | |
| # Progress callback | |
| def update_progress(prog, status): | |
| progress(prog / 100, desc=status) | |
| # Run PPT generation directly in main thread event loop | |
| try: | |
| loop = asyncio.get_event_loop() | |
| except RuntimeError: | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| final_ppt_path = loop.run_until_complete(ppt_gen_async(task_id, update_progress)) | |
| if final_ppt_path and os.path.exists(final_ppt_path): | |
| # Copy to Gradio-safe location | |
| safe_path = copy_to_gradio_safe_path(final_ppt_path, "generated_presentation.pptx") | |
| return safe_path, f"✅ PPT generated successfully! Task ID: {task_id}" | |
| else: | |
| return None, "❌ PPT generation failed" | |
| except Exception as e: | |
| logger.error(f"PPT generation error: {str(e)}") | |
| traceback.print_exc() | |
| return None, f"❌ Error: {str(e)}" | |
| def generate_ppt_with_template_selection(selected_template_name, custom_pptx_file, pdf_file, num_pages, progress=gr.Progress()): | |
| """Generate PPT with template selection""" | |
| try: | |
| # Make sure models are initialized | |
| if models is None: | |
| return None, "❌ Please initialize models first in the Configuration tab" | |
| # 确定使用的模板文件 | |
| pptx_file = None | |
| if selected_template_name and selected_template_name != "Upload Custom": | |
| # 使用默认模板 | |
| templates = get_default_templates() | |
| for template in templates: | |
| if template["name"] == selected_template_name: | |
| pptx_file = template["path"] | |
| break | |
| else: | |
| # 使用上传的自定义模板 | |
| pptx_file = custom_pptx_file | |
| # 调用原有的generate_ppt函数 | |
| return generate_ppt(pptx_file, pdf_file, num_pages, progress) | |
| except Exception as e: | |
| logger.error(f"PPT generation with template selection error: {str(e)}") | |
| return None, f"❌ Error: {str(e)}" | |
| async def ppt_gen_async(task_id: str, progress_callback=None): | |
| """Async PPT generation function""" | |
| try: | |
| if DEBUG: | |
| importlib.reload(induct) | |
| importlib.reload(pptgen) | |
| task = progress_store[task_id] | |
| pptx_md5 = task["pptx"] | |
| pdf_md5 = task["pdf"] | |
| generation_config = Config(pjoin(RUNS_DIR, task_id)) | |
| pptx_config = Config(pjoin(RUNS_DIR, "pptx", pptx_md5)) | |
| json.dump(task, open(pjoin(generation_config.RUN_DIR, "task.json"), "w")) | |
| progress_manager = GradioProgressManager(task_id, STAGES, progress_callback) | |
| parsedpdf_dir = pjoin(RUNS_DIR, "pdf", pdf_md5) | |
| ppt_image_folder = pjoin(pptx_config.RUN_DIR, "slide_images") | |
| if progress_callback: | |
| progress_callback(10, "Task initialized successfully") | |
| # PPT parsing | |
| presentation = Presentation.from_file( | |
| pjoin(pptx_config.RUN_DIR, "source.pptx"), pptx_config | |
| ) | |
| if not os.path.exists(ppt_image_folder) or len(os.listdir(ppt_image_folder)) != len(presentation): | |
| await ppt_to_images_async( | |
| pjoin(pptx_config.RUN_DIR, "source.pptx"), ppt_image_folder | |
| ) | |
| # Handle error slides | |
| for err_idx, _ in presentation.error_history: | |
| error_file = pjoin(ppt_image_folder, f"slide_{err_idx:04d}.jpg") | |
| if os.path.exists(error_file): | |
| os.remove(error_file) | |
| # Rename slides | |
| for i, slide in enumerate(presentation.slides, 1): | |
| slide.slide_idx = i | |
| old_path = pjoin(ppt_image_folder, f"slide_{slide.real_idx:04d}.jpg") | |
| new_path = pjoin(ppt_image_folder, f"slide_{slide.slide_idx:04d}.jpg") | |
| if os.path.exists(old_path): | |
| os.rename(old_path, new_path) | |
| # Image labeling | |
| labler = ImageLabler(presentation, pptx_config) | |
| stats_file = pjoin(pptx_config.RUN_DIR, "image_stats.json") | |
| if os.path.exists(stats_file): | |
| image_stats = json.load(open(stats_file, encoding="utf-8")) | |
| labler.apply_stats(image_stats) | |
| else: | |
| await labler.caption_images_async(models.vision_model) | |
| json.dump( | |
| labler.image_stats, | |
| open(stats_file, "w", encoding="utf-8"), | |
| ensure_ascii=False, | |
| indent=4, | |
| ) | |
| await progress_manager.report_progress() | |
| # PDF parsing | |
| source_md_path = pjoin(parsedpdf_dir, "source.md") | |
| if not os.path.exists(source_md_path): | |
| # Check if we have a PDF file | |
| pdf_file_path = pjoin(RUNS_DIR, "pdf", pdf_md5, "source.pdf") | |
| if os.path.exists(pdf_file_path): | |
| text_content = parse_pdf( | |
| pdf_file_path, | |
| parsedpdf_dir, | |
| models.marker_model, | |
| ) | |
| else: | |
| raise ValueError("No PDF file found") | |
| else: | |
| text_content = open(source_md_path, encoding="utf-8").read() | |
| await progress_manager.report_progress() | |
| # Document refine | |
| refined_doc_path = pjoin(parsedpdf_dir, "refined_doc.json") | |
| if not os.path.exists(refined_doc_path): | |
| source_doc = await Document.from_markdown_async( | |
| text_content, | |
| models.language_model, | |
| models.vision_model, | |
| parsedpdf_dir, | |
| ) | |
| json.dump( | |
| source_doc.to_dict(), | |
| open(refined_doc_path, "w"), | |
| ensure_ascii=False, | |
| indent=4, | |
| ) | |
| else: | |
| source_doc_dict = json.load(open(refined_doc_path)) | |
| source_doc = Document.from_dict(source_doc_dict, parsedpdf_dir) | |
| await progress_manager.report_progress() | |
| # Slide Induction | |
| slide_induction_path = pjoin(pptx_config.RUN_DIR, "slide_induction.json") | |
| if not os.path.exists(slide_induction_path): | |
| deepcopy(presentation).save( | |
| pjoin(pptx_config.RUN_DIR, "template.pptx"), layout_only=True | |
| ) | |
| await ppt_to_images_async( | |
| pjoin(pptx_config.RUN_DIR, "template.pptx"), | |
| pjoin(pptx_config.RUN_DIR, "template_images"), | |
| ) | |
| slide_inducter = induct.SlideInducterAsync( | |
| presentation, | |
| ppt_image_folder, | |
| pjoin(pptx_config.RUN_DIR, "template_images"), | |
| pptx_config, | |
| models.image_model, | |
| models.language_model, | |
| models.vision_model, | |
| ) | |
| layout_induction = await slide_inducter.layout_induct() | |
| slide_induction = await slide_inducter.content_induct(layout_induction) | |
| json.dump( | |
| slide_induction, | |
| open(slide_induction_path, "w", encoding="utf-8"), | |
| ensure_ascii=False, | |
| indent=4, | |
| ) | |
| else: | |
| slide_induction = json.load(open(slide_induction_path, encoding="utf-8")) | |
| await progress_manager.report_progress() | |
| # PPT Generation | |
| ppt_agent = pptgen.PPTAgentAsync( | |
| models.text_model, | |
| models.language_model, | |
| models.vision_model, | |
| error_exit=False, | |
| retry_times=5, | |
| ) | |
| ppt_agent.set_reference( | |
| config=generation_config, | |
| slide_induction=slide_induction, | |
| presentation=presentation, | |
| ) | |
| prs, _ = await ppt_agent.generate_pres( | |
| source_doc=source_doc, | |
| num_slides=task["numberOfPages"], | |
| ) | |
| final_path = pjoin(generation_config.RUN_DIR, "final.pptx") | |
| prs.save(final_path) | |
| logger.info(f"{task_id}: generation finished") | |
| await progress_manager.report_progress() | |
| return final_path | |
| except Exception as e: | |
| logger.error(f"PPT generation failed: {str(e)}") | |
| traceback.print_exc() | |
| return None | |
| def ppt_to_video(ppt_file, progress=gr.Progress()): | |
| """Convert PPT to video presentation""" | |
| try: | |
| task_id = str(uuid.uuid4()) | |
| logger.info(f"PPT2Video task created: {task_id}") | |
| task_dir = pjoin(RUNS_DIR, "ppt_video", task_id) | |
| os.makedirs(task_dir, exist_ok=True) | |
| # Copy PPT file | |
| ppt_blob = open(ppt_file, "rb").read() | |
| ppt_path = pjoin(task_dir, "source.pptx") | |
| with open(ppt_path, "wb") as f: | |
| f.write(ppt_blob) | |
| # Initialize progress | |
| ppt_video_progress_store[task_id] = { | |
| "status": "processing", | |
| "current_step": 1, | |
| "current_slide": 0, | |
| "total_slides": 0, | |
| "progress_percentage": 0, | |
| "task_dir": task_dir, | |
| "ppt_path": ppt_path | |
| } | |
| # Progress callback | |
| def update_progress(prog, status): | |
| progress(prog, desc=status) | |
| # Run PPT to video conversion directly in main thread event loop | |
| try: | |
| loop = asyncio.get_event_loop() | |
| except RuntimeError: | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| video_path = loop.run_until_complete(process_ppt_to_video_async(task_id, update_progress)) | |
| if video_path and os.path.exists(video_path): | |
| # Copy to Gradio-safe location | |
| safe_path = copy_to_gradio_safe_path(video_path, "generated_video.mp4") | |
| return safe_path, f"✅ Video generated successfully! Task ID: {task_id}" | |
| else: | |
| return None, "❌ Video generation failed" | |
| except Exception as e: | |
| logger.error(f"PPT to video error: {str(e)}") | |
| return None, f"❌ Error: {str(e)}" | |
| async def process_ppt_to_video_async(task_id: str, progress_callback): | |
| """Process PPT to video asynchronously""" | |
| try: | |
| task_dir = ppt_video_progress_store[task_id]["task_dir"] | |
| ppt_path = ppt_video_progress_store[task_id]["ppt_path"] | |
| progress_callback(0.1, "Converting PPT to PDF...") | |
| # Convert PPT to PDF | |
| pdf_path = pjoin(task_dir, "source.pdf") | |
| await run_cmd([ | |
| "libreoffice", "--headless", "--convert-to", "pdf", | |
| ppt_path, "--outdir", task_dir | |
| ]) | |
| # Convert PDF to images | |
| images_from_path = await run_blocking(convert_from_path, pdf_path) | |
| prs = await run_blocking(PptxPresentation, ppt_path) | |
| if len(images_from_path) != len(prs.slides): | |
| raise Exception("PPT页数与生成的图片数量不匹配") | |
| progress_callback(0.2, "Extracting slides...") | |
| # Generate video segments | |
| video_segments = [] | |
| with tempfile.TemporaryDirectory() as temp_path: | |
| total_slides = len(prs.slides) | |
| for i, (slide, image) in enumerate(zip(prs.slides, images_from_path)): | |
| slide_progress = 0.3 + (i / total_slides) * 0.4 | |
| progress_callback(slide_progress, f"Processing slide {i + 1}/{total_slides}") | |
| # Get notes | |
| notes = "" | |
| if slide.has_notes_slide: | |
| notes = slide.notes_slide.notes_text_frame.text | |
| if not notes.strip(): | |
| notes = f"This is slide {i + 1}" | |
| # Save image | |
| image_path = pjoin(temp_path, f"frame_{i}.jpg") | |
| image.save(image_path) | |
| # Generate audio | |
| audio_path = pjoin(temp_path, f"frame_{i}.wav") | |
| await generate_tts_audio(notes, audio_path) | |
| # Create video segment | |
| video_segment_path = await create_video_segment( | |
| image_path, audio_path, temp_path, i | |
| ) | |
| video_segments.append(video_segment_path) | |
| progress_callback(0.8, "Merging video segments...") | |
| # Merge video segments | |
| output_video_path = pjoin(task_dir, "output.mp4") | |
| await merge_video_segments(video_segments, output_video_path) | |
| progress_callback(1.0, "Video generation completed!") | |
| ppt_video_progress_store[task_id]["status"] = "completed" | |
| return output_video_path | |
| except Exception as e: | |
| logger.error(f"PPT2Video processing failed {task_id}: {e}") | |
| ppt_video_progress_store[task_id]["status"] = "failed" | |
| return None | |
| async def generate_tts_audio(text: str, output_path: str): | |
| """Generate TTS audio""" | |
| try: | |
| # Try to use MegaTTS3 if available | |
| sys.path.append(pjoin(os.path.dirname(__file__), "MegaTTS3")) | |
| from tts.infer_cli import MegaTTS3DiTInfer | |
| from tts.utils.audio_utils.io import save_wav | |
| infer = MegaTTS3DiTInfer(ckpt_root=pjoin(os.path.dirname(__file__), "MegaTTS3", "checkpoints")) | |
| prompt_audio_path = pjoin(os.path.dirname(__file__), "MegaTTS3", "assets", "English_prompt.wav") | |
| with open(prompt_audio_path, 'rb') as f: | |
| audio_bytes = f.read() | |
| latent_file = None | |
| potential_npy = os.path.splitext(prompt_audio_path)[0] + '.npy' | |
| if os.path.isfile(potential_npy): | |
| latent_file = potential_npy | |
| resource_context = infer.preprocess(audio_bytes, latent_file) | |
| wav_bytes = infer.forward( | |
| resource_context, | |
| text, | |
| time_step=32, | |
| p_w=1.6, | |
| t_w=2.5 | |
| ) | |
| save_wav(wav_bytes, output_path) | |
| except Exception as e: | |
| logger.error(f"TTS failed: {str(e)}") | |
| # Fallback: create silent audio | |
| import numpy as np | |
| import wave | |
| sample_rate = 22050 | |
| duration = 3.0 | |
| samples = np.zeros(int(sample_rate * duration), dtype=np.int16) | |
| with wave.open(output_path, 'w') as wav_file: | |
| wav_file.setnchannels(1) | |
| wav_file.setsampwidth(2) | |
| wav_file.setframerate(sample_rate) | |
| wav_file.writeframes(samples.tobytes()) | |
| async def create_video_segment(image_path: str, audio_path: str, temp_path: str, index: int): | |
| """Create video segment from image and audio""" | |
| output_path = pjoin(temp_path, f"segment_{index}.mp4") | |
| await run_cmd([ | |
| "ffmpeg", "-y", "-loop", "1", "-i", image_path, "-i", audio_path, | |
| "-vf", "scale=1920:1080", "-c:v", "libx264", "-tune", "stillimage", | |
| "-c:a", "aac", "-b:a", "192k", "-pix_fmt", "yuv420p", "-shortest", | |
| output_path | |
| ]) | |
| return output_path | |
| async def merge_video_segments(video_segments: list[str], output_path: str): | |
| """Merge video segments""" | |
| list_file_path = output_path.replace('.mp4', '_list.txt') | |
| with open(list_file_path, "w") as f: | |
| for seg in video_segments: | |
| f.write(f"file '{seg}'\n") | |
| await run_cmd([ | |
| "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", list_file_path, | |
| "-c", "copy", output_path | |
| ]) | |
| os.remove(list_file_path) | |
| def cleanup_temp_files(): | |
| """Clean up old temporary files""" | |
| try: | |
| import glob | |
| import time | |
| # Remove files older than 1 hour | |
| cutoff_time = time.time() - 3600 | |
| for file_path in glob.glob(os.path.join(GRADIO_TEMP_DIR, "*")): | |
| if os.path.getctime(file_path) < cutoff_time: | |
| try: | |
| os.remove(file_path) | |
| logger.info(f"Cleaned up old temp file: {file_path}") | |
| except Exception as e: | |
| logger.warning(f"Failed to remove temp file {file_path}: {e}") | |
| except Exception as e: | |
| logger.warning(f"Cleanup failed: {e}") | |
| # Gradio interface | |
| def create_gradio_interface(): | |
| """Create Gradio interface""" | |
| with gr.Blocks(title="PresentAgent", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# PresentAgent - PowerPoint Generation and Presentation Creation") | |
| with gr.Tabs(): | |
| # Model Configuration Tab | |
| with gr.TabItem("🔧 Configuration"): | |
| gr.Markdown("## Model Configuration") | |
| gr.Markdown( | |
| "Configure your API settings and model parameters before using the PPT generation features.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| api_key_input = gr.Textbox( | |
| label="API Key", | |
| type="password", | |
| placeholder="Enter your OpenAI API key", | |
| value="" | |
| ) | |
| api_base_input = gr.Textbox( | |
| label="API Base URL", | |
| placeholder="https://api.openai.com/v1", | |
| value="" | |
| ) | |
| with gr.Row(): | |
| language_model_input = gr.Textbox( | |
| label="Language Model", | |
| placeholder="Model for text generation", | |
| value="gpt-4o" | |
| ) | |
| vision_model_input = gr.Textbox( | |
| label="Vision Model", | |
| placeholder="Model for image processing", | |
| value="gpt-4o" | |
| ) | |
| text_model_input = gr.Textbox( | |
| label="Text Embedding Model", | |
| placeholder="Model for text embeddings", | |
| value="text-embedding-3-small" | |
| ) | |
| init_btn = gr.Button("Initialize Models", variant="primary", size="lg") | |
| with gr.Column(): | |
| init_status = gr.Textbox( | |
| label="Initialization Status", | |
| interactive=False, | |
| lines=3 | |
| ) | |
| gr.Markdown(""" | |
| ### Instructions: | |
| 1. Enter your API key and base URL | |
| 2. Configure model names (defaults are recommended) | |
| 3. Click "Initialize Models" to test the connection | |
| 4. Once initialized, you can use the PPT generation features | |
| """) | |
| init_btn.click( | |
| fn=init_models, | |
| inputs=[api_key_input, api_base_input, language_model_input, vision_model_input, text_model_input], | |
| outputs=[init_status] | |
| ) | |
| with gr.TabItem("📊 PPT Generation"): | |
| gr.Markdown("## Generate PowerPoint from Template and PDF") | |
| with gr.Row(): | |
| with gr.Column(): | |
| # 模板选择区域 | |
| template_radio, template_preview, custom_upload, selected_template_display = create_template_selection() | |
| # PDF输入 | |
| pdf_input = gr.File( | |
| label="PDF Document", | |
| file_types=[".pdf"], | |
| type="filepath" | |
| ) | |
| # 页数选择 | |
| num_pages_input = gr.Slider( | |
| minimum=1, | |
| maximum=50, | |
| value=10, | |
| step=1, | |
| label="Number of Slides" | |
| ) | |
| generate_btn = gr.Button("Generate PPT", variant="primary", size="lg") | |
| with gr.Column(): | |
| ppt_output = gr.File(label="Generated PPT") | |
| ppt_status = gr.Textbox(label="Status", interactive=False, lines=3) | |
| # 绑定模板选择事件 | |
| template_radio.change( | |
| fn=select_template, | |
| inputs=[template_radio], | |
| outputs=[custom_upload, selected_template_display, gr.State()] | |
| ) | |
| # 绑定生成按钮 | |
| generate_btn.click( | |
| fn=generate_ppt_with_template_selection, | |
| inputs=[template_radio, custom_upload, pdf_input, num_pages_input], | |
| outputs=[ppt_output, ppt_status] | |
| ) | |
| # PPT to Video Tab | |
| with gr.TabItem("🎬 PPT to Presentation"): | |
| gr.Markdown("## Convert PowerPoint to Video Presentation") | |
| with gr.Row(): | |
| with gr.Column(): | |
| ppt_video_input = gr.File( | |
| label="PowerPoint File", | |
| file_types=[".pptx"], | |
| type="filepath" | |
| ) | |
| video_btn = gr.Button("Convert to Video", variant="primary", size="lg") | |
| with gr.Column(): | |
| video_output = gr.File(label="Generated Video") | |
| video_status = gr.Textbox(label="Status", interactive=False, lines=3) | |
| video_btn.click( | |
| fn=ppt_to_video, | |
| inputs=[ppt_video_input], | |
| outputs=[video_output, video_status] | |
| ) | |
| return demo | |
| def setup_template_directories(): | |
| """设置模板目录结构""" | |
| base_dir = os.path.dirname(__file__) | |
| template_dir = os.path.join(base_dir, "templates") | |
| preview_dir = os.path.join(template_dir, "previews") | |
| os.makedirs(template_dir, exist_ok=True) | |
| os.makedirs(preview_dir, exist_ok=True) | |
| logger.info(f"Template directories created at: {template_dir}") | |
| logger.info("Please place your default template files in the templates directory") | |
| logger.info("Please place corresponding preview images in the templates/previews directory") | |
| # Main function | |
| if __name__ == "__main__": | |
| # Create runs directory | |
| os.makedirs(RUNS_DIR, exist_ok=True) | |
| os.makedirs(pjoin(RUNS_DIR, "feedback"), exist_ok=True) | |
| setup_template_directories() | |
| # Clean up old temp files | |
| cleanup_temp_files() | |
| # Create and launch Gradio interface | |
| demo = create_gradio_interface() | |
| # Launch with allowed paths | |
| demo.queue().launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=True, | |
| show_error=True, | |
| allowed_paths=[RUNS_DIR] | |
| ) |