Spaces:
Runtime error
Runtime error
| import json | |
| import os | |
| from copy import deepcopy | |
| from typing import Optional | |
| import numpy as np | |
| import torch | |
| import torchvision.transforms as T | |
| from marker.config.parser import ConfigParser | |
| from marker.converters.pdf import PdfConverter | |
| from marker.models import create_model_dict | |
| from marker.output import text_from_rendered | |
| from PIL import Image | |
| from transformers import AutoModel, AutoProcessor | |
| from pptagent.llms import LLM, AsyncLLM | |
| from pptagent.presentation import Presentation, SlidePage | |
| from pptagent.utils import get_logger, is_image_path, pjoin | |
| logger = get_logger(__name__) | |
| class ModelManager: | |
| """ | |
| A class to manage models. | |
| """ | |
| def __init__( | |
| self, | |
| api_base: Optional[str] = None, | |
| api_key: Optional[str] = None, | |
| language_model_name: Optional[str] = None, | |
| vision_model_name: Optional[str] = None, | |
| text_model_name: Optional[str] = None, | |
| ): | |
| """Initialize models from environment variables after instance creation""" | |
| if api_base is None: | |
| api_base = os.environ.get("API_BASE", None) | |
| if api_key is None: | |
| api_key = os.environ.get("OPENAI_API_KEY", None) | |
| if language_model_name is None: | |
| language_model_name = os.environ.get("LANGUAGE_MODEL", "gpt-4.1") | |
| if vision_model_name is None: | |
| vision_model_name = os.environ.get("VISION_MODEL", "gpt-4.1") | |
| if text_model_name is None: | |
| text_model_name = os.environ.get("TEXT_MODEL", "text-embedding-3-small") | |
| self.api_base = api_base | |
| self.api_key = api_key | |
| self._image_model = None | |
| self._marker_model = None | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| self.language_model = AsyncLLM(language_model_name, api_base, api_key=api_key) | |
| self.vision_model = AsyncLLM(vision_model_name, api_base, api_key=api_key) | |
| self.text_model = AsyncLLM(text_model_name, api_base, api_key=api_key) | |
| def image_model(self): | |
| if self._image_model is None: | |
| self._image_model = get_image_model(device=self.device) | |
| return self._image_model | |
| def marker_model(self): | |
| if self._marker_model is None: | |
| self._marker_model = create_model_dict( | |
| device=self.device, dtype=torch.float16 | |
| ) | |
| return self._marker_model | |
| async def test_connections(self) -> bool: | |
| """Test connections for all LLM models | |
| Returns: | |
| bool: True if all connections are successful, False otherwise | |
| """ | |
| try: | |
| assert await self.language_model.test_connection() | |
| assert await self.vision_model.test_connection() | |
| assert await self.text_model.test_connection() | |
| except: | |
| return False | |
| return True | |
| def prs_dedup( | |
| presentation: Presentation, | |
| model: LLM, | |
| threshold: float = 0.8, | |
| ) -> list[SlidePage]: | |
| """ | |
| Deduplicate slides in a presentation based on text similarity. | |
| Args: | |
| presentation (Presentation): The presentation object containing slides. | |
| model: The model used for generating text embeddings. | |
| batchsize (int): The batch size for processing slides. | |
| threshold (float): The similarity threshold for deduplication. | |
| Returns: | |
| list: A list of removed duplicate slides. | |
| """ | |
| text_embeddings = model.get_embedding([i.to_text() for i in presentation.slides]) | |
| pre_embedding = text_embeddings[0] | |
| slide_idx = 1 | |
| duplicates = [] | |
| while slide_idx < len(presentation): | |
| cur_embedding = text_embeddings[slide_idx] | |
| if torch.cosine_similarity(pre_embedding, cur_embedding, -1) > threshold: | |
| duplicates.append(slide_idx - 1) | |
| slide_idx += 1 | |
| pre_embedding = cur_embedding | |
| return [presentation.slides.pop(i) for i in reversed(duplicates)] | |
| def get_image_model(device: str = None): | |
| """ | |
| Initialize and return an image model and its feature extractor. | |
| Args: | |
| device (str): The device to run the model on. | |
| Returns: | |
| tuple: A tuple containing the feature extractor and the image model. | |
| """ | |
| model_base = "google/vit-base-patch16-224-in21k" | |
| return ( | |
| AutoProcessor.from_pretrained( | |
| model_base, | |
| torch_dtype=torch.float16, | |
| device_map=device, | |
| use_fast=True, | |
| ), | |
| AutoModel.from_pretrained( | |
| model_base, | |
| torch_dtype=torch.float16, | |
| device_map=device, | |
| ).eval(), | |
| ) | |
| def parse_pdf( | |
| pdf_path: str, | |
| output_path: str, | |
| model_lst: list, | |
| ) -> str: | |
| """ | |
| Parse a PDF file and extract text and images. | |
| Args: | |
| pdf_path (str): The path to the PDF file. | |
| output_path (str): The directory to save the extracted content. | |
| model_lst (list): A list of models for processing the PDF. | |
| Returns: | |
| str: The full text extracted from the PDF. | |
| """ | |
| os.makedirs(output_path, exist_ok=True) | |
| config_parser = ConfigParser( | |
| { | |
| "output_format": "markdown", | |
| } | |
| ) | |
| converter = PdfConverter( | |
| config=config_parser.generate_config_dict(), | |
| artifact_dict=model_lst, | |
| processor_list=config_parser.get_processors(), | |
| renderer=config_parser.get_renderer(), | |
| ) | |
| rendered = converter(pdf_path) | |
| full_text, _, images = text_from_rendered(rendered) | |
| with open(pjoin(output_path, "source.md"), "w+", encoding="utf-8") as f: | |
| f.write(full_text) | |
| for filename, image in images.items(): | |
| image_filepath = os.path.join(output_path, filename) | |
| image.save(image_filepath, "JPEG") | |
| with open(pjoin(output_path, "meta.json"), "w+", encoding="utf-8") as f: | |
| f.write(json.dumps(rendered.metadata, indent=4)) | |
| return full_text | |
| def get_image_embedding( | |
| image_dir: str, extractor, model, batchsize: int = 16 | |
| ) -> dict[str, torch.Tensor]: | |
| """ | |
| Generate image embeddings for images in a directory. | |
| Args: | |
| image_dir (str): The directory containing images. | |
| extractor: The feature extractor for images. | |
| model: The model used for generating embeddings. | |
| batchsize (int): The batch size for processing images. | |
| Returns: | |
| dict: A dictionary mapping image filenames to their embeddings. | |
| """ | |
| transform = T.Compose( | |
| [ | |
| T.Resize(int((256 / 224) * extractor.size["height"])), | |
| T.CenterCrop(extractor.size["height"]), | |
| T.ToTensor(), | |
| T.Normalize(mean=extractor.image_mean, std=extractor.image_std), | |
| ] | |
| ) | |
| inputs = [] | |
| embeddings = [] | |
| images = [i for i in sorted(os.listdir(image_dir)) if is_image_path(i)] | |
| for file in images: | |
| image = Image.open(pjoin(image_dir, file)).convert("RGB") | |
| inputs.append(transform(image)) | |
| if len(inputs) % batchsize == 0 or file == images[-1]: | |
| batch = {"pixel_values": torch.stack(inputs).to(model.device)} | |
| embeddings.extend(model(**batch).last_hidden_state.detach()) | |
| inputs.clear() | |
| return {image: embedding.flatten() for image, embedding in zip(images, embeddings)} | |
| def images_cosine_similarity(embeddings: list[torch.Tensor]) -> torch.Tensor: | |
| """ | |
| Calculate the cosine similarity matrix for a list of embeddings. | |
| Args: | |
| embeddings (list[torch.Tensor]): A list of image embeddings. | |
| Returns: | |
| torch.Tensor: A NxN similarity matrix. | |
| """ | |
| embeddings = [embedding for embedding in embeddings] | |
| sim_matrix = torch.zeros((len(embeddings), len(embeddings))) | |
| for i in range(len(embeddings)): | |
| for j in range(i + 1, len(embeddings)): | |
| sim_matrix[i, j] = sim_matrix[j, i] = torch.cosine_similarity( | |
| embeddings[i], embeddings[j], -1 | |
| ) | |
| return sim_matrix | |
| IMAGENET_MEAN = (0.485, 0.456, 0.406) | |
| IMAGENET_STD = (0.229, 0.224, 0.225) | |
| def average_distance( | |
| similarity: torch.Tensor, idx: int, cluster_idx: list[int] | |
| ) -> float: | |
| """ | |
| Calculate the average distance between a point (idx) and a cluster (cluster_idx). | |
| Args: | |
| similarity (np.ndarray): The similarity matrix. | |
| idx (int): The index of the point. | |
| cluster_idx (list): The indices of the cluster. | |
| Returns: | |
| float: The average distance. | |
| """ | |
| if idx in cluster_idx: | |
| return 0 | |
| total_similarity = 0 | |
| for idx_in_cluster in cluster_idx: | |
| total_similarity += similarity[idx, idx_in_cluster] | |
| return total_similarity / len(cluster_idx) | |
| def get_cluster(similarity: np.ndarray, sim_bound: float = 0.65): | |
| """ | |
| Cluster points based on similarity. | |
| Args: | |
| similarity (np.ndarray): The similarity matrix. | |
| sim_bound (float): The similarity threshold for clustering. | |
| Returns: | |
| list: A list of clusters. | |
| """ | |
| sim_copy = deepcopy(similarity) | |
| num_points = sim_copy.shape[0] | |
| clusters = [] | |
| added = [False] * num_points | |
| while True: | |
| max_avg_dist = sim_bound | |
| best_cluster = None | |
| best_point = None | |
| for c in clusters: | |
| for point_idx in range(num_points): | |
| if added[point_idx]: | |
| continue | |
| avg_dist = average_distance(sim_copy, point_idx, c) | |
| if avg_dist > max_avg_dist: | |
| max_avg_dist = avg_dist | |
| best_cluster = c | |
| best_point = point_idx | |
| if best_point is not None: | |
| best_cluster.append(best_point) | |
| added[best_point] = True | |
| sim_copy[best_point, :] = 0 | |
| sim_copy[:, best_point] = 0 | |
| else: | |
| if sim_copy.max() < sim_bound: | |
| # append the remaining points invididual cluster | |
| for i in range(num_points): | |
| if not added[i]: | |
| clusters.append([i]) | |
| break | |
| i, j = np.unravel_index(np.argmax(sim_copy), sim_copy.shape) | |
| clusters.append([int(i), int(j)]) | |
| added[i] = True | |
| added[j] = True | |
| sim_copy[i, :] = 0 | |
| sim_copy[:, i] = 0 | |
| sim_copy[j, :] = 0 | |
| sim_copy[:, j] = 0 | |
| return clusters | |