Spaces:
Runtime error
Runtime error
| import os | |
| import base64 | |
| import requests | |
| from typing import List, Optional, Generator, Tuple | |
| from openai import OpenAI | |
| from docling.document_converter import DocumentConverter | |
| import glob | |
| from pdf2image import convert_from_path | |
| from PIL import Image | |
| import tempfile | |
| import shutil | |
| import fitz # PyMuPDF ์ถ๊ฐ | |
| # Initialize the docling DocumentConverter | |
| converter = DocumentConverter() | |
| # ์ ์ญ ์ค์ | |
| openai_api_key = "EMPTY" | |
| openai_api_base = "http://118.38.20.101:8080/v1" | |
| model = "Qwen/Qwen2.5-VL-7B-Instruct-AWQ" | |
| # ์ ์ญ ๋ก๊ทธ ์์คํ | |
| current_log_messages = [] | |
| current_request_info = "" # ๋์ ๋ API ์์ฒญ ์ ๋ณด ์ ์ฅ | |
| # OpenAI ํด๋ผ์ด์ธํธ ์ด๊ธฐํ | |
| client = OpenAI( | |
| api_key=openai_api_key, | |
| base_url=openai_api_base | |
| ) | |
| def load_system_prompt() -> str: | |
| """์์คํ ํ๋กฌํํธ ํ์ผ์ ๋ก๋ํฉ๋๋ค.""" | |
| try: | |
| with open("prompt_system.txt", "r", encoding="utf-8") as f: | |
| return f.read().strip() | |
| except Exception as e: | |
| print(f"์์คํ ํ๋กฌํํธ ํ์ผ ๋ก๋ ์ค๋ฅ: {e}") | |
| return "๋น์ ์ ์ด๋ ฅ์ ๋ถ์์ ๋์์ฃผ๋ AI ์ด์์คํดํธ์ ๋๋ค." | |
| def load_user_prompt() -> str: | |
| """์ฌ์ฉ์ ํ๋กฌํํธ ํ์ผ์ ๋ก๋ํฉ๋๋ค.""" | |
| try: | |
| with open("prompt_user.txt", "r", encoding="utf-8") as f: | |
| return f.read().strip() | |
| except Exception as e: | |
| print(f"์ฌ์ฉ์ ํ๋กฌํํธ ํ์ผ ๋ก๋ ์ค๋ฅ: {e}") | |
| return "[ํ ์คํธ ์ถ์ถ ์์ ]\n\n์ฒจ๋ถ๋ ์ด๋ฏธ์ง๋ค์ ์ด๋ ฅ์๋ฅผ ์ด๋ฏธ์งํ ํ ๊ฒฐ๊ณผ๋ฌผ์ด์ผ. ์ด๋ฏธ์ง์ ๋ด์ฉ ๋ฐ ๋ ์ด์์์ ์ฐธ๊ณ ํด์ ์ด๋ ฅ์์ ๋ด์ฉ์ ์ ๋ฆฌ ํ ๋งํฌ๋ค์ด ํ์์ผ๋ก ์ ๋ฆฌํด์ค." | |
| def load_postprocess_prompt() -> str: | |
| """ํ์ฒ๋ฆฌ ํ๋กฌํํธ ํ์ผ์ ๋ก๋ํฉ๋๋ค.""" | |
| try: | |
| with open("prompt_postprocess.txt", "r", encoding="utf-8") as f: | |
| return f.read().strip() | |
| except Exception as e: | |
| print(f"ํ์ฒ๋ฆฌ ํ๋กฌํํธ ํ์ผ ๋ก๋ ์ค๋ฅ: {e}") | |
| return "[ํ ์คํธ ๋ณํฉ ์์ ]\n๋ฐฐ์น ์์ ์ผ๋ก ์์ง๋ ํ ์คํธ์ ๋๋ค. ์ด์ ์์ ํ ํํ์ ์ด๋ ฅ์๋ฅผ ๋ง๋ค์ด ์ฃผ์ธ์. ์ถ๋ ฅ ํฌ๋งท์ ๋งํฌ๋ค์ด์ ๋๋ค." | |
| def encode_image_base64_from_url(image_path: str) -> str: | |
| """Encode an image retrieved from a file path or url to base64 format.""" | |
| try: | |
| if isinstance(image_path, str) and 'http' in image_path: | |
| with requests.get(image_path) as response: | |
| response.raise_for_status() | |
| result = base64.b64encode(response.content).decode('utf-8') | |
| return result | |
| elif isinstance(image_path, str) and os.path.isfile(image_path): | |
| with open(image_path, 'rb') as image_file: | |
| result = base64.b64encode(image_file.read()).decode('utf-8') | |
| return result | |
| else: | |
| raise ValueError(f"Invalid image URL or file path: {image_path}") | |
| except Exception as e: | |
| print(f"Error encoding image: {e}") | |
| raise | |
| def convert_pdf_to_images(pdf_path: str, dpi: int = 200) -> List[str]: | |
| """Convert PDF to images using pdf2image and return list of image file paths.""" | |
| try: | |
| # PDF๋ฅผ ์ด๋ฏธ์ง๋ก ๋ณํ | |
| images = convert_from_path(pdf_path, dpi=dpi) | |
| # ์์ ๋๋ ํ ๋ฆฌ ์์ฑ | |
| temp_dir = tempfile.mkdtemp() | |
| image_paths = [] | |
| for i, image in enumerate(images): | |
| # ์ด๋ฏธ์ง๋ฅผ ์์ ํ์ผ๋ก ์ ์ฅ | |
| image_path = os.path.join(temp_dir, f"page_{i+1:03d}.png") | |
| image.save(image_path, "PNG") | |
| image_paths.append(image_path) | |
| return image_paths | |
| except Exception as e: | |
| print(f"Error converting PDF to images: {e}") | |
| raise | |
| def combine_images_horizontally(image_paths: List[str]) -> List[str]: | |
| """Combine images in pairs horizontally. Returns list of combined image paths.""" | |
| if not image_paths: | |
| return [] | |
| combined_paths = [] | |
| temp_dir = tempfile.mkdtemp() | |
| # 2์ฅ์ฉ ๊ฐ๋ก๋ก ๋ถ์ด๊ธฐ | |
| for i in range(0, len(image_paths), 2): | |
| if i + 1 < len(image_paths): | |
| # 2์ฅ์ ๊ฐ๋ก๋ก ๋ถ์ด๊ธฐ | |
| img1 = Image.open(image_paths[i]) | |
| img2 = Image.open(image_paths[i + 1]) | |
| # ๋์ด๋ฅผ ๋ง์ถค (๋ ๋์ ์ชฝ ๊ธฐ์ค) | |
| max_height = max(img1.height, img2.height) | |
| # ๋น์จ์ ์ ์งํ๋ฉด์ ๋์ด ์กฐ์ | |
| if img1.height != max_height: | |
| ratio = max_height / img1.height | |
| img1 = img1.resize((int(img1.width * ratio), max_height), Image.Resampling.LANCZOS) | |
| if img2.height != max_height: | |
| ratio = max_height / img2.height | |
| img2 = img2.resize((int(img2.width * ratio), max_height), Image.Resampling.LANCZOS) | |
| # ๊ฐ๋ก๋ก ๋ถ์ด๊ธฐ | |
| combined_width = img1.width + img2.width | |
| combined_image = Image.new('RGB', (combined_width, max_height), 'white') | |
| combined_image.paste(img1, (0, 0)) | |
| combined_image.paste(img2, (img1.width, 0)) | |
| # ์ ์ฅ | |
| combined_path = os.path.join(temp_dir, f"combined_{i//2 + 1:03d}.png") | |
| combined_image.save(combined_path, "PNG") | |
| combined_paths.append(combined_path) | |
| img1.close() | |
| img2.close() | |
| combined_image.close() | |
| else: | |
| # ํ์ ๊ฐ์ ๊ฒฝ์ฐ ๋ง์ง๋ง ์ด๋ฏธ์ง๋ ๊ทธ๋๋ก ๋ณต์ฌ | |
| img = Image.open(image_paths[i]) | |
| single_path = os.path.join(temp_dir, f"single_{i//2 + 1:03d}.png") | |
| img.save(single_path, "PNG") | |
| combined_paths.append(single_path) | |
| img.close() | |
| return combined_paths | |
| def combine_images_vertically(image_paths: List[str]) -> List[str]: | |
| """Combine images in pairs vertically. Returns list of combined image paths.""" | |
| if not image_paths: | |
| return [] | |
| combined_paths = [] | |
| temp_dir = tempfile.mkdtemp() | |
| # 2์ฅ์ฉ ์ธ๋ก๋ก ๋ถ์ด๊ธฐ | |
| for i in range(0, len(image_paths), 2): | |
| if i + 1 < len(image_paths): | |
| # 2์ฅ์ ์ธ๋ก๋ก ๋ถ์ด๊ธฐ | |
| img1 = Image.open(image_paths[i]) | |
| img2 = Image.open(image_paths[i + 1]) | |
| # ๋๋น๋ฅผ ๋ง์ถค (๋ ๋์ ์ชฝ ๊ธฐ์ค) | |
| max_width = max(img1.width, img2.width) | |
| # ๋น์จ์ ์ ์งํ๋ฉด์ ๋๋น ์กฐ์ | |
| if img1.width != max_width: | |
| ratio = max_width / img1.width | |
| img1 = img1.resize((max_width, int(img1.height * ratio)), Image.Resampling.LANCZOS) | |
| if img2.width != max_width: | |
| ratio = max_width / img2.width | |
| img2 = img2.resize((max_width, int(img2.height * ratio)), Image.Resampling.LANCZOS) | |
| # ์ธ๋ก๋ก ๋ถ์ด๊ธฐ | |
| combined_height = img1.height + img2.height | |
| combined_image = Image.new('RGB', (max_width, combined_height), 'white') | |
| combined_image.paste(img1, (0, 0)) | |
| combined_image.paste(img2, (0, img1.height)) | |
| # ์ ์ฅ | |
| combined_path = os.path.join(temp_dir, f"vertical_combined_{i//2 + 1:03d}.png") | |
| combined_image.save(combined_path, "PNG") | |
| combined_paths.append(combined_path) | |
| img1.close() | |
| img2.close() | |
| combined_image.close() | |
| else: | |
| # ํ์ ๊ฐ์ ๊ฒฝ์ฐ ๋ง์ง๋ง ์ด๋ฏธ์ง๋ ๊ทธ๋๋ก ๋ณต์ฌ | |
| img = Image.open(image_paths[i]) | |
| single_path = os.path.join(temp_dir, f"vertical_single_{i//2 + 1:03d}.png") | |
| img.save(single_path, "PNG") | |
| combined_paths.append(single_path) | |
| img.close() | |
| return combined_paths | |
| def combine_images_with_overlap(image_paths: List[str], direction: str = "horizontal") -> List[str]: | |
| """Combine images with sliding window (overlap). Returns list of combined image paths.""" | |
| if not image_paths or len(image_paths) < 2: | |
| return image_paths | |
| combined_paths = [] | |
| temp_dir = tempfile.mkdtemp() | |
| # ์ฌ๋ผ์ด๋ฉ ์๋์ฐ๋ก 2์ฅ์ฉ ๋ณํฉ (1,2), (2,3), (3,4), (4,5)... | |
| for i in range(len(image_paths) - 1): | |
| img1 = Image.open(image_paths[i]) | |
| img2 = Image.open(image_paths[i + 1]) | |
| if direction == "horizontal": | |
| # ๊ฐ๋ก ๋ณํฉ - ๋์ด๋ฅผ ๋ง์ถค | |
| max_height = max(img1.height, img2.height) | |
| if img1.height != max_height: | |
| ratio = max_height / img1.height | |
| img1 = img1.resize((int(img1.width * ratio), max_height), Image.Resampling.LANCZOS) | |
| if img2.height != max_height: | |
| ratio = max_height / img2.height | |
| img2 = img2.resize((int(img2.width * ratio), max_height), Image.Resampling.LANCZOS) | |
| # ๊ฐ๋ก๋ก ๋ถ์ด๊ธฐ | |
| combined_width = img1.width + img2.width | |
| combined_image = Image.new('RGB', (combined_width, max_height), 'white') | |
| combined_image.paste(img1, (0, 0)) | |
| combined_image.paste(img2, (img1.width, 0)) | |
| combined_path = os.path.join(temp_dir, f"overlap_h_{i+1}_{i+2}.png") | |
| else: # vertical | |
| # ์ธ๋ก ๋ณํฉ - ๋๋น๋ฅผ ๋ง์ถค | |
| max_width = max(img1.width, img2.width) | |
| if img1.width != max_width: | |
| ratio = max_width / img1.width | |
| img1 = img1.resize((max_width, int(img1.height * ratio)), Image.Resampling.LANCZOS) | |
| if img2.width != max_width: | |
| ratio = max_width / img2.width | |
| img2 = img2.resize((max_width, int(img2.height * ratio)), Image.Resampling.LANCZOS) | |
| # ์ธ๋ก๋ก ๋ถ์ด๊ธฐ | |
| combined_height = img1.height + img2.height | |
| combined_image = Image.new('RGB', (max_width, combined_height), 'white') | |
| combined_image.paste(img1, (0, 0)) | |
| combined_image.paste(img2, (0, img1.height)) | |
| combined_path = os.path.join(temp_dir, f"overlap_v_{i+1}_{i+2}.png") | |
| combined_image.save(combined_path, "PNG") | |
| combined_paths.append(combined_path) | |
| img1.close() | |
| img2.close() | |
| combined_image.close() | |
| return combined_paths | |
| def create_prompt_content_with_image(image_paths: List[str], prompt: str) -> list: | |
| """Create a prompt content with image URLs.""" | |
| if not image_paths: | |
| return [{"type": "text", "text": prompt}] | |
| else: | |
| content = [{"type": "text", "text": prompt}] | |
| for path in image_paths: | |
| try: | |
| content.append({ | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:image/jpeg;base64,{encode_image_base64_from_url(path)}" | |
| }, | |
| }) | |
| except Exception as e: | |
| print(f"Error encoding image {path}: {e}") | |
| return content | |
| def log_api_request(messages: List[dict], model_name: str) -> str: | |
| """Log the actual API request content for debugging - appends to accumulated requests.""" | |
| import json | |
| import datetime | |
| global current_request_info | |
| # ํ์ฌ ์๊ฐ ๊ฐ์ ธ์ค๊ธฐ | |
| timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| # ์ค์ API ์์ฒญ๊ณผ ๊ฐ์ ๊ตฌ์กฐ๋ฅผ ์์ฑ (์ด๋ฏธ์ง ๋ฐ์ดํฐ๋ ์ถ์ฝ) | |
| api_request = { | |
| "model": model_name, | |
| "messages": [] | |
| } | |
| for message in messages: | |
| message_copy = {"role": message.get("role", "unknown")} | |
| content = message.get("content", "") | |
| if isinstance(content, str): | |
| # ํ ์คํธ ๋ด์ฉ์ธ ๊ฒฝ์ฐ ๊ทธ๋๋ก ํฌํจ | |
| message_copy["content"] = content | |
| elif isinstance(content, list): | |
| # ๋ฉํฐ๋ชจ๋ฌ ์ฝํ ์ธ ์ธ ๊ฒฝ์ฐ | |
| content_copy = [] | |
| for item in content: | |
| if item.get("type") == "text": | |
| content_copy.append({ | |
| "type": "text", | |
| "text": item.get("text", "") | |
| }) | |
| elif item.get("type") == "image_url": | |
| image_url = item.get("image_url", {}).get("url", "") | |
| if image_url.startswith("data:image"): | |
| # Base64 ์ด๋ฏธ์ง ๋ฐ์ดํฐ๋ ์ถ์ฝํด์ ํ์ | |
| content_copy.append({ | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:image/*;base64,[BASE64_DATA_{len(image_url)}_CHARS]" | |
| } | |
| }) | |
| else: | |
| content_copy.append({ | |
| "type": "image_url", | |
| "image_url": {"url": image_url} | |
| }) | |
| message_copy["content"] = content_copy | |
| api_request["messages"].append(message_copy) | |
| # JSON์ผ๋ก ํฌ๋งทํ | |
| request_json = json.dumps(api_request, ensure_ascii=False, indent=2) | |
| # ์ ์์ฒญ์ ๊ตฌ๋ถ์ ๊ณผ ํจ๊ป ๊ธฐ์กด ๋ด์ฉ์ append | |
| separator = f"\n{'='*80}\n๐ API ์์ฒญ [{timestamp}]\n{'='*80}\n" | |
| new_request = f"{separator}{request_json}\n" | |
| if current_request_info: | |
| current_request_info += new_request | |
| else: | |
| # ์ฒซ ๋ฒ์งธ ์์ฒญ์ธ ๊ฒฝ์ฐ ๊ตฌ๋ถ์ ์์ด ์์ | |
| current_request_info = f"๐ API ์์ฒญ [{timestamp}]\n{'='*80}\n{request_json}\n" | |
| return current_request_info | |
| def send_chat_completion_request(image_paths: List[str], prompt: str, system_prompt: str = ""): | |
| """Send a chat completion request with images.""" | |
| # ์์คํ ํ๋กฌํํธ๊ฐ ๋น์ด์์ผ๋ฉด ํ์ผ์์ ๋ก๋ | |
| if not system_prompt.strip(): | |
| system_prompt = load_system_prompt() | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| { | |
| "role": "user", | |
| "content": create_prompt_content_with_image(image_paths, prompt), | |
| }, | |
| ] | |
| # ์ค์ API ํธ์ถ (๋ก๊น ์ process_request์์ ๋ณ๋๋ก ์ฒ๋ฆฌ) | |
| return client.chat.completions.create(model=model, messages=messages) | |
| def process_images_in_batches(image_paths: List[str], prompt: str, system_prompt: str, batch_size: int = 3) -> List[str]: | |
| """Process images in batches of specified size and return list of results.""" | |
| if not image_paths: | |
| return [] | |
| results = [] | |
| # ์ด๋ฏธ์ง๋ฅผ ๋ฐฐ์น ํฌ๊ธฐ๋ก ๋๋์ด ์ฒ๋ฆฌ | |
| for i in range(0, len(image_paths), batch_size): | |
| batch_images = image_paths[i:i + batch_size] | |
| try: | |
| # ๋ฐฐ์น ์ ๋ณด๋ฅผ ํ๋กฌํํธ์ ์ถ๊ฐ | |
| # batch_prompt = f"{prompt}\n\n[๋ฐฐ์น {i//batch_size + 1}/{(len(image_paths) + batch_size - 1)//batch_size}]" | |
| batch_prompt = f"{prompt}" | |
| # API ์์ฒญ | |
| completion = send_chat_completion_request(batch_images, batch_prompt, system_prompt) | |
| response_content = completion.choices[0].message.content | |
| print(response_content) | |
| results.append(response_content) | |
| except Exception as e: | |
| results.append(f"๋ฐฐ์น {i//batch_size + 1} ์ฒ๋ฆฌ ์ค ์ค๋ฅ ๋ฐ์: {str(e)}") | |
| return results | |
| def merge_batch_results(results: List[str]) -> str: | |
| """Merge results from multiple batches into a single document.""" | |
| if not results: | |
| return "" | |
| if len(results) == 1: | |
| return results[0] | |
| merged_result = "" | |
| # merged_result = "# ๋ชจ๋ ์ด๋ ฅ์ ๋ถ์ ๊ฒฐ๊ณผ\n\n" | |
| # merged_result += f"์ด {len(results)}๊ฐ ์ด๋ฏธ์ง๋ฅผ ์ธ์ํ์ต๋๋ค.\n\n" | |
| for i, result in enumerate(results, 1): | |
| # merged_result += f"## {i} ๋ฒ์งธ ์ด๋ฏธ์ง ์ธ์ ๊ฒฐ๊ณผ\n\n" | |
| merged_result += result | |
| # merged_result += "\n\n---\n\n" | |
| return merged_result | |
| def get_pdf_files(): | |
| """Get list of PDF files in the resume_samples directory.""" | |
| pdf_files = glob.glob("./resume_samples/**/*.pdf", recursive=True) | |
| if not pdf_files: | |
| # PDF ํ์ผ์ด ์์ผ๋ฉด ๊ธฐ๋ณธ ๋๋ ํ ๋ฆฌ๊ฐ ์๋์ง ํ์ธ | |
| os.makedirs("./resume_samples/pdf/text", exist_ok=True) | |
| return [] | |
| return sorted(pdf_files) | |
| def save_result_to_file(content: str, filename: str) -> str: | |
| """Save the analysis result to a markdown file.""" | |
| if not content: | |
| return "์ ์ฅํ ๋ด์ฉ์ด ์์ต๋๋ค." | |
| if not filename: | |
| return "ํ์ผ ์ด๋ฆ์ด ์ง์ ๋์ง ์์์ต๋๋ค. ํ์ผ ์ด๋ฆ์ ์ ๋ ฅํด์ฃผ์ธ์." | |
| # ํ์ฅ์ ์ถ๊ฐ | |
| if not filename.endswith('.md'): | |
| filename += '.md' | |
| try: | |
| with open(filename, "w", encoding="utf-8") as f: | |
| f.write(content) | |
| return f"๊ฒฐ๊ณผ๊ฐ {filename}์ ์ ์ฅ๋์์ต๋๋ค." | |
| except Exception as e: | |
| return f"ํ์ผ ์ ์ฅ ์ค๋ฅ: {str(e)}" | |
| def extract_text_with_fitz(pdf_path: str) -> str: | |
| """PDF์์ Fitz(PyMuPDF)๋ก ํ ์คํธ ์ถ์ถ""" | |
| try: | |
| doc = fitz.open(pdf_path) | |
| text_content = "" | |
| for page_num in range(len(doc)): | |
| page = doc.load_page(page_num) | |
| text_content += f"## Page {page_num + 1}\n\n" | |
| text_content += page.get_text("text") | |
| if page_num < len(doc) - 1: | |
| text_content += "\n\n---\n\n" | |
| doc.close() | |
| return text_content | |
| except Exception as e: | |
| return f"Fitz ํ ์คํธ ์ถ์ถ ์ค๋ฅ: {str(e)}" | |
| def extract_text_with_docling(pdf_path: str) -> str: | |
| """PDF์์ Docling์ผ๋ก ํ ์คํธ ์ถ์ถ (OCR ํฌํจ)""" | |
| try: | |
| result = converter.convert(pdf_path) | |
| return result.document.export_to_markdown() | |
| except Exception as e: | |
| return f"Docling ํ ์คํธ ์ถ์ถ ์ค๋ฅ: {str(e)}" | |
| def preview_image_processing(pdf_path: str, processing_mode: str = "๊ฐ๋ก ๋ณํฉ (2ํ์ด์ง์ฉ)", overlap_option: str = "์ผ๋ฐ ๋ณํฉ") -> List[str]: | |
| """PDF๋ฅผ ์ด๋ฏธ์ง๋ก ๋ณํํ๊ณ ์ ํํ ๋ฐฉ์์ผ๋ก ์ฒ๋ฆฌํ ๊ฒฐ๊ณผ๋ฅผ ๋ฏธ๋ฆฌ๋ณด๊ธฐ์ฉ์ผ๋ก ๋ฐํ""" | |
| try: | |
| if not pdf_path or not os.path.exists(pdf_path): | |
| return [] | |
| # PDF๋ฅผ ์ด๋ฏธ์ง๋ก ๋ณํ | |
| pdf_images = convert_pdf_to_images(pdf_path) | |
| # ์ฒ๋ฆฌ ๋ฐฉ์์ ๋ฐ๋ผ ๋ถ๊ธฐ | |
| if processing_mode == "๊ฐ๋ก ๋ณํฉ (2ํ์ด์ง์ฉ)": | |
| if overlap_option == "์ค๋ณต ๋ณํฉ (์ฌ๋ผ์ด๋ฉ ์๋์ฐ)": | |
| processed_images = combine_images_with_overlap(pdf_images, "horizontal") | |
| else: | |
| processed_images = combine_images_horizontally(pdf_images) | |
| elif processing_mode == "์ธ๋ก ๋ณํฉ (2ํ์ด์ง์ฉ)": | |
| if overlap_option == "์ค๋ณต ๋ณํฉ (์ฌ๋ผ์ด๋ฉ ์๋์ฐ)": | |
| processed_images = combine_images_with_overlap(pdf_images, "vertical") | |
| else: | |
| processed_images = combine_images_vertically(pdf_images) | |
| else: # "๋ฑ๊ฐ ํ์ด์ง" | |
| processed_images = pdf_images | |
| return processed_images | |
| except Exception as e: | |
| print(f"์ด๋ฏธ์ง ์ฒ๋ฆฌ ๋ฏธ๋ฆฌ๋ณด๊ธฐ ์ค๋ฅ: {e}") | |
| return [] | |
| def process_request( | |
| prompt: str, | |
| system_prompt: str, | |
| use_images: bool, | |
| use_docling: bool, | |
| pdf_file_path: str, | |
| uploaded_file: str, | |
| output_filename: str, | |
| image_processing_mode: str = "๊ฐ๋ก ๋ณํฉ (2ํ์ด์ง์ฉ)", | |
| overlap_option: str = "์ผ๋ฐ ๋ณํฉ", | |
| batch_size: int = 3, | |
| use_postprocess: bool = True, | |
| postprocess_prompt: str = "", | |
| progress = None | |
| ) -> Generator[Tuple[str, str, str, str, str], None, None]: | |
| """Process the request with all the options and yield intermediate results.""" | |
| import time | |
| # ์ ์ฒด ์ฒ๋ฆฌ ์์ ์๊ฐ ๊ธฐ๋ก | |
| total_start_time = time.time() | |
| global current_log_messages, current_request_info | |
| current_log_messages = [] # ๋ก๊ทธ ์ด๊ธฐํ | |
| current_request_info = "" # API ์์ฒญ ์ ๋ณด ์ด๊ธฐํ | |
| # ํ์ผ ๊ฒฝ๋ก ๊ฒฐ์ (์ ๋ก๋๋ ํ์ผ์ด ์์ผ๋ฉด ์ฐ์ ์ฌ์ฉ) | |
| final_pdf_path = uploaded_file if uploaded_file else pdf_file_path | |
| # ์ด๊ธฐํ | |
| full_prompt = prompt | |
| docling_output = "" | |
| images_to_use = [] | |
| temp_dirs_to_cleanup = [] | |
| response_content = "" # ์ต์ข ๊ฒฐ๊ณผ | |
| batch_content = "" # ๋ฐฐ์น ์ฒ๋ฆฌ ๊ฒฐ๊ณผ | |
| def add_log(message): | |
| current_log_messages.append(f"[{len(current_log_messages)+1:02d}] {message}") | |
| log_text = "\n".join(current_log_messages) | |
| # ์ค๊ฐ ๊ฒฐ๊ณผ๋ฅผ yieldํฉ๋๋ค (๋ฐฐ์น ๊ฒฐ๊ณผ, ์ต์ข ๊ฒฐ๊ณผ, ํ์ฑ ๊ฒฐ๊ณผ, ๋ก๊ทธ, API ์์ฒญ ์ ๋ณด) | |
| yield batch_content, response_content, docling_output, log_text, current_request_info | |
| return log_text | |
| # ๋ก๊ทธ ์์ฑ๊ธฐ ๊ฐ์ฒด ์์ฑ | |
| log_generator = add_log("์์...") | |
| next(log_generator) # ์ฒซ ๋ฒ์งธ ๋ก๊ทธ ์์ฑ | |
| try: | |
| # PDF ํ์ผ์ด ์ ํ๋์ง ์์ ๊ฒฝ์ฐ | |
| if not final_pdf_path or not os.path.exists(final_pdf_path): | |
| msg = "PDF ํ์ผ์ ์ ํํ๊ฑฐ๋ ์ ๋ก๋ํด ์ฃผ์ธ์." | |
| for result in add_log("โ PDF ํ์ผ์ด ์ ํ๋์ง ์์์ต๋๋ค."): | |
| yield result | |
| yield "", msg, "", "\n".join(current_log_messages), current_request_info | |
| return | |
| for result in add_log(f"โ ์ฒ๋ฆฌํ PDF ํ์ผ: {os.path.basename(final_pdf_path)}"): | |
| yield result | |
| # PDF๋ฅผ ์ด๋ฏธ์ง๋ก ์๋ ๋ณํ (์ด๋ฏธ์ง ์ฌ์ฉ์ด ํ์ฑํ๋ ๊ฒฝ์ฐ) | |
| if use_images: | |
| for result in add_log("๐ผ๏ธ PDF๋ฅผ ์ด๋ฏธ์ง๋ก ๋ณํ ์ค..."): | |
| yield result | |
| print(f"PDF๋ฅผ ์ด๋ฏธ์ง๋ก ๋ณํ ์ค: {final_pdf_path}") | |
| # PDF๋ฅผ ์ด๋ฏธ์ง๋ก ๋ณํ | |
| pdf_images = convert_pdf_to_images(final_pdf_path) | |
| temp_dirs_to_cleanup.extend([os.path.dirname(path) for path in pdf_images]) | |
| for result in add_log(f"๐ PDF์์ {len(pdf_images)}๊ฐ ํ์ด์ง ์ถ์ถ ์๋ฃ"): | |
| yield result | |
| # ์ด๋ฏธ์ง ์ฒ๋ฆฌ ๋ฐฉ์์ ๋ฐ๋ผ ๋ถ๊ธฐ | |
| if image_processing_mode == "๊ฐ๋ก ๋ณํฉ (2ํ์ด์ง์ฉ)": | |
| if overlap_option == "์ค๋ณต ๋ณํฉ (์ฌ๋ผ์ด๋ฉ ์๋์ฐ)": | |
| # ์ค๋ณต ๊ฐ๋ก ๋ณํฉ | |
| for result in add_log("๐ ํ์ด์ง๋ค์ ์ฌ๋ผ์ด๋ฉ ์๋์ฐ ๋ฐฉ์์ผ๋ก ๊ฐ๋ก ๋ณํฉ ์ค..."): | |
| yield result | |
| combined_images = combine_images_with_overlap(pdf_images, "horizontal") | |
| for result in add_log(f"โ {len(combined_images)}๊ฐ์ ์ค๋ณต ๊ฐ๋ก ๋ณํฉ ์ด๋ฏธ์ง ์์ฑ ์๋ฃ"): | |
| yield result | |
| else: | |
| # ์ผ๋ฐ ๊ฐ๋ก ๋ณํฉ | |
| for result in add_log("๐ ํ์ด์ง๋ค์ 2์ฅ์ฉ ๊ฐ๋ก๋ก ๋ณํฉ ์ค..."): | |
| yield result | |
| combined_images = combine_images_horizontally(pdf_images) | |
| for result in add_log(f"โ {len(combined_images)}๊ฐ์ ๊ฐ๋ก ๋ณํฉ ์ด๋ฏธ์ง ์์ฑ ์๋ฃ"): | |
| yield result | |
| temp_dirs_to_cleanup.extend([os.path.dirname(path) for path in combined_images]) | |
| images_to_use = combined_images | |
| print(f"PDF์์ {len(pdf_images)}๊ฐ ํ์ด์ง๋ฅผ {len(combined_images)}๊ฐ ๊ฐ๋ก ๋ณํฉ ์ด๋ฏธ์ง๋ก ๋ณํ ์๋ฃ") | |
| elif image_processing_mode == "์ธ๋ก ๋ณํฉ (2ํ์ด์ง์ฉ)": | |
| if overlap_option == "์ค๋ณต ๋ณํฉ (์ฌ๋ผ์ด๋ฉ ์๋์ฐ)": | |
| # ์ค๋ณต ์ธ๋ก ๋ณํฉ | |
| for result in add_log("๐ ํ์ด์ง๋ค์ ์ฌ๋ผ์ด๋ฉ ์๋์ฐ ๋ฐฉ์์ผ๋ก ์ธ๋ก ๋ณํฉ ์ค..."): | |
| yield result | |
| combined_images = combine_images_with_overlap(pdf_images, "vertical") | |
| for result in add_log(f"โ {len(combined_images)}๊ฐ์ ์ค๋ณต ์ธ๋ก ๋ณํฉ ์ด๋ฏธ์ง ์์ฑ ์๋ฃ"): | |
| yield result | |
| else: | |
| # ์ผ๋ฐ ์ธ๋ก ๋ณํฉ | |
| for result in add_log("๐ ํ์ด์ง๋ค์ 2์ฅ์ฉ ์ธ๋ก๋ก ๋ณํฉ ์ค..."): | |
| yield result | |
| combined_images = combine_images_vertically(pdf_images) | |
| for result in add_log(f"โ {len(combined_images)}๊ฐ์ ์ธ๋ก ๋ณํฉ ์ด๋ฏธ์ง ์์ฑ ์๋ฃ"): | |
| yield result | |
| temp_dirs_to_cleanup.extend([os.path.dirname(path) for path in combined_images]) | |
| images_to_use = combined_images | |
| print(f"PDF์์ {len(pdf_images)}๊ฐ ํ์ด์ง๋ฅผ {len(combined_images)}๊ฐ ์ธ๋ก ๋ณํฉ ์ด๋ฏธ์ง๋ก ๋ณํ ์๋ฃ") | |
| else: # "๋ฑ๊ฐ ํ์ด์ง" | |
| # ํ์ด์ง๋ฅผ ๊ทธ๋๋ก ์ฌ์ฉ | |
| images_to_use = pdf_images | |
| for result in add_log(f"โ {len(pdf_images)}๊ฐ์ ๊ฐ๋ณ ํ์ด์ง ์ด๋ฏธ์ง ์ค๋น ์๋ฃ"): | |
| yield result | |
| print(f"PDF์์ {len(pdf_images)}๊ฐ ํ์ด์ง๋ฅผ ๊ฐ๋ณ ์ด๋ฏธ์ง๋ก ์ฌ์ฉ") | |
| # docling ์ฒ๋ฆฌ | |
| if use_docling: | |
| for result in add_log("๐ Docling์ผ๋ก PDF ํ ์คํธ ํ์ฑ ์ค..."): | |
| yield result | |
| try: | |
| result = converter.convert(final_pdf_path) | |
| docling_output = result.document.export_to_markdown() | |
| full_prompt += f"\n\nํ์ฑ๋ ์ด๋ ฅ์ ๋ด์ฉ: {docling_output}" | |
| for result in add_log(f"โ ํ ์คํธ ํ์ฑ ์๋ฃ (๊ธธ์ด: {len(docling_output)} ๋ฌธ์)"): | |
| yield result | |
| except Exception as e: | |
| error_msg = f"Docling ๋ณํ ์ค๋ฅ: {str(e)}" | |
| for result in add_log(f"โ Docling ๋ณํ ์ค๋ฅ: {str(e)}"): | |
| yield result | |
| for result in add_log(f"โ ์ฒ๋ฆฌ ์ค๋จ๋จ"): | |
| yield result | |
| yield "", error_msg, docling_output, "\n".join(current_log_messages), current_request_info | |
| return | |
| # ์ด๋ฏธ์ง ์ฒ๋ฆฌ (๋ฐฐ์น ์ฒ๋ฆฌ) | |
| if images_to_use: | |
| # ์์คํ ํ๋กฌํํธ ์ ๋ณด๋ง ๋ก๊ทธ์ ์ถ๊ฐ (๊ธธ์ด ์ ๊ฑฐ) | |
| for result in add_log(f"๐ค ์์คํ ํ๋กฌํํธ: {system_prompt[:50]}{'...' if len(system_prompt) > 50 else ''}"): | |
| yield result | |
| if len(images_to_use) <= batch_size: | |
| # ์ค์ ๋ ๋ฐฐ์น ํฌ๊ธฐ ์ดํ๋ฉด ํ ๋ฒ์ ์ฒ๋ฆฌ | |
| for result in add_log(f"๐ค API ์์ฒญ ์ค๋น ์ค... (์ด๋ฏธ์ง {len(images_to_use)}์ฅ)"): | |
| yield result | |
| # API ์์ฒญ ์ ๋ณด๋ฅผ ๋จผ์ ๋ก๊น ํ๊ณ ์ฆ์ UI์ ํ์ | |
| system_prompt_clean = system_prompt if system_prompt.strip() else load_system_prompt() | |
| messages = [ | |
| {"role": "system", "content": system_prompt_clean}, | |
| { | |
| "role": "user", | |
| "content": create_prompt_content_with_image(images_to_use, full_prompt) | |
| } | |
| ] | |
| log_api_request(messages, model) | |
| # ์ฆ์ API ์์ฒญ ์ ๋ณด๋ฅผ UI์ ํ์ | |
| yield "", "", docling_output, "\n".join(current_log_messages), current_request_info | |
| for result in add_log(f"๐ค LLM API ์์ฒญ ์ค... (์ด๋ฏธ์ง {len(images_to_use)}์ฅ)"): | |
| yield result | |
| # ๋ฐฐ์น ์์ ์๊ฐ ๊ธฐ๋ก | |
| import time | |
| batch_start_time = time.time() | |
| completion = send_chat_completion_request(images_to_use, full_prompt, system_prompt) | |
| response_content = completion.choices[0].message.content | |
| batch_content = response_content # ๋จ์ผ ๋ฐฐ์น๋ ๋ฐฐ์น ๊ฒฐ๊ณผ์ ๋์ผ | |
| print(response_content) | |
| # ๋ฐฐ์น ์ฒ๋ฆฌ ์๊ฐ ๊ณ์ฐ | |
| batch_duration = time.time() - batch_start_time | |
| for result in add_log(f"โ LLM ๋ถ์ ์๋ฃ (์ฒ๋ฆฌ ์๊ฐ: {batch_duration:.1f}์ด)"): | |
| yield result | |
| else: | |
| # ์ค์ ๋ ๋ฐฐ์น ํฌ๊ธฐ ์ด๊ณผ๋ฉด ๋ฐฐ์น๋ก ๋๋์ด ์ฒ๋ฆฌ | |
| num_batches = (len(images_to_use) + batch_size - 1) // batch_size | |
| for result in add_log(f"๐ฆ ์ด๋ฏธ์ง๊ฐ {len(images_to_use)}์ฅ์ด๋ฏ๋ก {num_batches}๊ฐ ๋ฐฐ์น๋ก ๋๋์ด ์ฒ๋ฆฌ (๋ฐฐ์น๋น {batch_size}์ฅ)"): | |
| yield result | |
| print(f"์ด๋ฏธ์ง๊ฐ {len(images_to_use)}์ฅ์ด๋ฏ๋ก ๋ฐฐ์น ์ฒ๋ฆฌ๋ฅผ ์์ํฉ๋๋ค. (๋ฐฐ์น๋น {batch_size}์ฅ)") | |
| batch_results = [] | |
| for i in range(0, len(images_to_use), batch_size): | |
| batch_num = i // batch_size + 1 | |
| batch_images = images_to_use[i:i + batch_size] | |
| for result in add_log(f"๐ค ๋ฐฐ์น {batch_num}/{num_batches} API ์์ฒญ ์ค๋น ์ค... (์ด๋ฏธ์ง {len(batch_images)}์ฅ)"): | |
| yield result | |
| # API ์์ฒญ ์ ๋ณด๋ฅผ ๋จผ์ ๋ก๊น ํ๊ณ ์ฆ์ UI์ ํ์ | |
| batch_prompt = f"{full_prompt}" | |
| system_prompt_clean = system_prompt if system_prompt.strip() else load_system_prompt() | |
| messages = [ | |
| {"role": "system", "content": system_prompt_clean}, | |
| { | |
| "role": "user", | |
| "content": create_prompt_content_with_image(batch_images, batch_prompt) | |
| } | |
| ] | |
| log_api_request(messages, model) | |
| # ์ฆ์ API ์์ฒญ ์ ๋ณด๋ฅผ UI์ ํ์ | |
| yield "", "", docling_output, "\n".join(current_log_messages), current_request_info | |
| for result in add_log(f"๐ค ๋ฐฐ์น {batch_num}/{num_batches} ์ฒ๋ฆฌ ์ค... (์ด๋ฏธ์ง {len(batch_images)}์ฅ)"): | |
| yield result | |
| try: | |
| # ๋ฐฐ์น ์์ ์๊ฐ ๊ธฐ๋ก | |
| import time | |
| batch_start_time = time.time() | |
| completion = send_chat_completion_request(batch_images, batch_prompt, system_prompt) | |
| batch_response = completion.choices[0].message.content | |
| batch_results.append(batch_response) | |
| print(batch_response) | |
| # ๋ฐฐ์น ์ฒ๋ฆฌ ์๊ฐ ๊ณ์ฐ | |
| batch_duration = time.time() - batch_start_time | |
| for result in add_log(f"โ ๋ฐฐ์น {batch_num} ์๋ฃ (์ฒ๋ฆฌ ์๊ฐ: {batch_duration:.1f}์ด)"): | |
| yield result | |
| except Exception as e: | |
| batch_results.append(f"๋ฐฐ์น {batch_num} ์ฒ๋ฆฌ ์ค ์ค๋ฅ ๋ฐ์: {str(e)}") | |
| for result in add_log(f"โ ๋ฐฐ์น {batch_num} ์ค๋ฅ: {str(e)}"): | |
| yield result | |
| batch_content = merge_batch_results(batch_results) # ๋ฐฐ์น ๊ฒฐ๊ณผ ์ ์ฅ | |
| response_content = batch_content # ์ด๊ธฐ ๊ฒฐ๊ณผ๋ ๋ฐฐ์น ๊ฒฐ๊ณผ์ ๋์ผ | |
| for result in add_log("๐ ๋ชจ๋ ๋ฐฐ์น ๊ฒฐ๊ณผ ๋ณํฉ ์๋ฃ"): | |
| yield result | |
| else: | |
| # ์ด๋ฏธ์ง๊ฐ ์์ผ๋ฉด ํ ์คํธ๋ง ์ฒ๋ฆฌ - ์์คํ ํ๋กฌํํธ ์ ๋ณด๋ง ํ์ | |
| for result in add_log(f"๐ค ์์คํ ํ๋กฌํํธ: {system_prompt[:50]}{'...' if len(system_prompt) > 50 else ''}"): | |
| yield result | |
| for result in add_log("๐ค ํ ์คํธ ์ ์ฉ API ์์ฒญ ์ค๋น ์ค..."): | |
| yield result | |
| # API ์์ฒญ ์ ๋ณด๋ฅผ ๋จผ์ ๋ก๊น ํ๊ณ ์ฆ์ UI์ ํ์ | |
| system_prompt_clean = system_prompt if system_prompt.strip() else load_system_prompt() | |
| messages = [ | |
| {"role": "system", "content": system_prompt_clean}, | |
| { | |
| "role": "user", | |
| "content": create_prompt_content_with_image([], full_prompt) | |
| } | |
| ] | |
| log_api_request(messages, model) | |
| # ์ฆ์ API ์์ฒญ ์ ๋ณด๋ฅผ UI์ ํ์ | |
| yield "", "", docling_output, "\n".join(current_log_messages), current_request_info | |
| for result in add_log("๐ค ํ ์คํธ ์ ์ฉ LLM API ์์ฒญ ์ค..."): | |
| yield result | |
| # ํ ์คํธ ์ ์ฉ ์ฒ๋ฆฌ ์์ ์๊ฐ ๊ธฐ๋ก | |
| import time | |
| text_start_time = time.time() | |
| completion = send_chat_completion_request([], full_prompt, system_prompt) | |
| response_content = completion.choices[0].message.content | |
| batch_content = response_content # ํ ์คํธ ์ ์ฉ์ ๋ฐฐ์น ๊ฒฐ๊ณผ์ ๋์ผ | |
| print(response_content) | |
| # ํ ์คํธ ์ ์ฉ ์ฒ๋ฆฌ ์๊ฐ ๊ณ์ฐ | |
| text_duration = time.time() - text_start_time | |
| for result in add_log(f"โ ํ ์คํธ ๋ถ์ ์๋ฃ (์ฒ๋ฆฌ ์๊ฐ: {text_duration:.1f}์ด)"): | |
| yield result | |
| # ํ์ฒ๋ฆฌ ์ํ (๋ค์ค ๋ฐฐ์น์ธ ๊ฒฝ์ฐ์๋ง) | |
| if use_postprocess and len(images_to_use) > batch_size: | |
| for result in add_log("๐ ํ์ฒ๋ฆฌ ์์ ์ ์์ํฉ๋๋ค..."): | |
| yield result | |
| # ํ์ฒ๋ฆฌ ํ๋กฌํํธ๊ฐ ๋น์ด์์ผ๋ฉด ๊ธฐ๋ณธ๊ฐ ์ฌ์ฉ | |
| if not postprocess_prompt.strip(): | |
| postprocess_prompt = load_postprocess_prompt() | |
| # ๋ฐฐ์น ๊ฒฐ๊ณผ๋ค์ ํ๋์ ํ ์คํธ๋ก ํฉ์นจ | |
| combined_results = f"{postprocess_prompt}\n\n=== ๋ฐฐ์น ์ฒ๋ฆฌ ๊ฒฐ๊ณผ ===\n\n{response_content}" | |
| for result in add_log("๐ค ํ์ฒ๋ฆฌ API ์์ฒญ ์ค๋น ์ค..."): | |
| yield result | |
| # ํ์ฒ๋ฆฌ API ์์ฒญ ์ ๋ณด๋ฅผ ๋จผ์ ๋ก๊น ํ๊ณ ์ฆ์ UI์ ํ์ | |
| system_prompt_clean = system_prompt if system_prompt.strip() else load_system_prompt() | |
| messages = [ | |
| {"role": "system", "content": system_prompt_clean}, | |
| { | |
| "role": "user", | |
| "content": combined_results | |
| } | |
| ] | |
| log_api_request(messages, model) | |
| # ์ฆ์ API ์์ฒญ ์ ๋ณด๋ฅผ UI์ ํ์ | |
| yield "", "", docling_output, "\n".join(current_log_messages), current_request_info | |
| for result in add_log("๐ค ํ์ฒ๋ฆฌ LLM API ์์ฒญ ์ค..."): | |
| yield result | |
| # ํ์ฒ๋ฆฌ ์์ ์๊ฐ ๊ธฐ๋ก | |
| import time | |
| postprocess_start_time = time.time() | |
| # ํ์ฒ๋ฆฌ API ํธ์ถ (์ด๋ฏธ์ง ์์ด ํ ์คํธ๋ง) | |
| completion = send_chat_completion_request([], combined_results, system_prompt) | |
| response_content = completion.choices[0].message.content | |
| print(response_content) | |
| # ํ์ฒ๋ฆฌ ์๊ฐ ๊ณ์ฐ | |
| postprocess_duration = time.time() - postprocess_start_time | |
| for result in add_log(f"โ ํ์ฒ๋ฆฌ ์๋ฃ (์ฒ๋ฆฌ ์๊ฐ: {postprocess_duration:.1f}์ด)"): | |
| yield result | |
| # ์ ์ฒด ์ฒ๋ฆฌ ์๊ฐ ๊ณ์ฐ ๋ฐ ๋ก๊น | |
| total_duration = time.time() - total_start_time | |
| for result in add_log(f"โฑ๏ธ ์ ์ฒด ์ฒ๋ฆฌ ์๊ฐ: {total_duration:.1f}์ด"): | |
| yield result | |
| for result in add_log("๐ ๋ชจ๋ ์ฒ๋ฆฌ๊ฐ ์ฑ๊ณต์ ์ผ๋ก ์๋ฃ๋์์ต๋๋ค!"): | |
| yield result | |
| yield batch_content, response_content, docling_output, "\n".join(current_log_messages), current_request_info | |
| except Exception as e: | |
| # ์ค๋ฅ ๋ฐ์ ์์๋ ์ ์ฒด ์ฒ๋ฆฌ ์๊ฐ ๊ธฐ๋ก | |
| total_duration = time.time() - total_start_time | |
| for result in add_log(f"โฑ๏ธ ์ ์ฒด ์ฒ๋ฆฌ ์๊ฐ: {total_duration:.1f}์ด (์ค๋ฅ๋ก ์ธํ ์ค๋จ)"): | |
| yield result | |
| error_msg = f"์ค๋ฅ ๋ฐ์: {str(e)}" | |
| for result in add_log(f"โ {error_msg}"): | |
| yield result | |
| for result in add_log("์ฒ๋ฆฌ๊ฐ ์ค๋จ๋์์ต๋๋ค."): | |
| yield result | |
| yield "", error_msg, docling_output, "\n".join(current_log_messages), current_request_info | |
| finally: | |
| # ์์ ๋๋ ํ ๋ฆฌ ์ ๋ฆฌ | |
| if temp_dirs_to_cleanup: | |
| for result in add_log("๐งน ์์ ํ์ผ ์ ๋ฆฌ ์ค..."): | |
| yield result | |
| for temp_dir in set(temp_dirs_to_cleanup): # ์ค๋ณต ์ ๊ฑฐ | |
| try: | |
| if os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir) | |
| print(f"์์ ๋๋ ํ ๋ฆฌ ์ ๋ฆฌ: {temp_dir}") | |
| except Exception as e: | |
| print(f"์์ ๋๋ ํ ๋ฆฌ ์ ๋ฆฌ ์คํจ: {temp_dir}, ์ค๋ฅ: {e}") | |
| def process_request_preprocessing_only( | |
| prompt: str, | |
| system_prompt: str, | |
| use_images: bool, | |
| use_docling: bool, | |
| pdf_file_path: str, | |
| uploaded_file: str, | |
| output_filename: str, | |
| image_processing_mode: str = "๊ฐ๋ก ๋ณํฉ (2ํ์ด์ง์ฉ)", | |
| overlap_option: str = "์ผ๋ฐ ๋ณํฉ", | |
| batch_size: int = 3, | |
| progress = None | |
| ) -> Generator[Tuple[str, str, str, str, str], None, None]: | |
| """์ ์ฒ๋ฆฌ(๋ฐฐ์น ์ฒ๋ฆฌ)๋ง ์ํํ๋ ํจ์""" | |
| # ์๋ณธ ํจ์๋ฅผ ํธ์ถํ๋, use_postprocess=False๋ก ์ค์ | |
| yield from process_request( | |
| prompt=prompt, | |
| system_prompt=system_prompt, | |
| use_images=use_images, | |
| use_docling=use_docling, | |
| pdf_file_path=pdf_file_path, | |
| uploaded_file=uploaded_file, | |
| output_filename=output_filename, | |
| image_processing_mode=image_processing_mode, | |
| overlap_option=overlap_option, | |
| batch_size=batch_size, | |
| use_postprocess=False, # ํ์ฒ๋ฆฌ ๋นํ์ฑํ | |
| postprocess_prompt="", | |
| progress=progress | |
| ) | |
| def process_request_postprocessing_only( | |
| batch_result: str, | |
| system_prompt: str, | |
| postprocess_prompt: str = "", | |
| progress = None | |
| ) -> Generator[Tuple[str, str, str, str, str], None, None]: | |
| """ํ์ฒ๋ฆฌ(๋ฐฐ์น ๊ฒฐ๊ณผ ๋ณํฉ)๋ง ์ํํ๋ ํจ์""" | |
| import time | |
| global current_log_messages, current_request_info | |
| current_log_messages = [] # ๋ก๊ทธ ์ด๊ธฐํ | |
| current_request_info = "" # API ์์ฒญ ์ ๋ณด ์ด๊ธฐํ | |
| # ์ ์ฒด ์ฒ๋ฆฌ ์์ ์๊ฐ ๊ธฐ๋ก | |
| total_start_time = time.time() | |
| def add_log(message): | |
| current_log_messages.append(f"[{len(current_log_messages)+1:02d}] {message}") | |
| log_text = "\n".join(current_log_messages) | |
| # ํ์ฒ๋ฆฌ์์๋ ๋ฐฐ์น ๊ฒฐ๊ณผ๋ ๊ทธ๋๋ก ์ ์งํ๊ณ ์ต์ข ๊ฒฐ๊ณผ๋ง ์ ๋ฐ์ดํธ | |
| yield batch_result, "", "", log_text, current_request_info | |
| return log_text | |
| # ๋ก๊ทธ ์์ฑ๊ธฐ ๊ฐ์ฒด ์์ฑ | |
| log_generator = add_log("ํ์ฒ๋ฆฌ ์์...") | |
| next(log_generator) # ์ฒซ ๋ฒ์งธ ๋ก๊ทธ ์์ฑ | |
| try: | |
| if not batch_result or not batch_result.strip(): | |
| msg = "ํ์ฒ๋ฆฌํ ๋ฐฐ์น ๊ฒฐ๊ณผ๊ฐ ์์ต๋๋ค. ๋จผ์ ์ ์ฒ๋ฆฌ๋ฅผ ์ํํด์ฃผ์ธ์." | |
| for result in add_log("โ ๋ฐฐ์น ๊ฒฐ๊ณผ๊ฐ ์์ต๋๋ค."): | |
| yield result | |
| yield batch_result, msg, "", "\n".join(current_log_messages), current_request_info | |
| return | |
| for result in add_log("๐ ํ์ฒ๋ฆฌ ์์ ์ ์์ํฉ๋๋ค..."): | |
| yield result | |
| # ํ์ฒ๋ฆฌ ํ๋กฌํํธ๊ฐ ๋น์ด์์ผ๋ฉด ๊ธฐ๋ณธ๊ฐ ์ฌ์ฉ | |
| if not postprocess_prompt.strip(): | |
| postprocess_prompt = load_postprocess_prompt() | |
| # ๋ฐฐ์น ๊ฒฐ๊ณผ๋ค์ ํ๋์ ํ ์คํธ๋ก ํฉ์นจ | |
| combined_results = f"{postprocess_prompt}\n\n=== ๋ฐฐ์น ์ฒ๋ฆฌ ๊ฒฐ๊ณผ ===\n\n{batch_result}" | |
| for result in add_log("๐ค ํ์ฒ๋ฆฌ API ์์ฒญ ์ค๋น ์ค..."): | |
| yield result | |
| # ํ์ฒ๋ฆฌ API ์์ฒญ ์ ๋ณด๋ฅผ ๋จผ์ ๋ก๊น ํ๊ณ ์ฆ์ UI์ ํ์ | |
| system_prompt_clean = system_prompt if system_prompt.strip() else load_system_prompt() | |
| messages = [ | |
| {"role": "system", "content": system_prompt_clean}, | |
| { | |
| "role": "user", | |
| "content": combined_results | |
| } | |
| ] | |
| log_api_request(messages, model) | |
| # ์ฆ์ API ์์ฒญ ์ ๋ณด๋ฅผ UI์ ํ์ | |
| yield batch_result, "", "", "\n".join(current_log_messages), current_request_info | |
| for result in add_log("๐ค ํ์ฒ๋ฆฌ LLM API ์์ฒญ ์ค..."): | |
| yield result | |
| # ํ์ฒ๋ฆฌ ์์ ์๊ฐ ๊ธฐ๋ก | |
| postprocess_start_time = time.time() | |
| # ํ์ฒ๋ฆฌ API ํธ์ถ (์ด๋ฏธ์ง ์์ด ํ ์คํธ๋ง) | |
| completion = send_chat_completion_request([], combined_results, system_prompt) | |
| final_result = completion.choices[0].message.content | |
| # ํ์ฒ๋ฆฌ ์๊ฐ ๊ณ์ฐ | |
| postprocess_duration = time.time() - postprocess_start_time | |
| for result in add_log(f"โ ํ์ฒ๋ฆฌ ์๋ฃ (์ฒ๋ฆฌ ์๊ฐ: {postprocess_duration:.1f}์ด)"): | |
| yield result | |
| # ์ ์ฒด ์ฒ๋ฆฌ ์๊ฐ ๊ณ์ฐ ๋ฐ ๋ก๊น | |
| total_duration = time.time() - total_start_time | |
| for result in add_log(f"โฑ๏ธ ์ ์ฒด ์ฒ๋ฆฌ ์๊ฐ: {total_duration:.1f}์ด"): | |
| yield result | |
| for result in add_log("๐ ํ์ฒ๋ฆฌ๊ฐ ์ฑ๊ณต์ ์ผ๋ก ์๋ฃ๋์์ต๋๋ค!"): | |
| yield result | |
| # ์ต์ข ๊ฒฐ๊ณผ ๋ฐํ (๋ฐฐ์น ๊ฒฐ๊ณผ๋ ๊ทธ๋๋ก, ์ต์ข ๊ฒฐ๊ณผ๋ง ์ ๋ฐ์ดํธ) | |
| yield batch_result, final_result, "", "\n".join(current_log_messages), current_request_info | |
| except Exception as e: | |
| # ์ค๋ฅ ๋ฐ์ ์์๋ ์ ์ฒด ์ฒ๋ฆฌ ์๊ฐ ๊ธฐ๋ก | |
| total_duration = time.time() - total_start_time | |
| for result in add_log(f"โฑ๏ธ ์ ์ฒด ์ฒ๋ฆฌ ์๊ฐ: {total_duration:.1f}์ด (์ค๋ฅ๋ก ์ธํ ์ค๋จ)"): | |
| yield result | |
| error_msg = f"ํ์ฒ๋ฆฌ ์ค๋ฅ ๋ฐ์: {str(e)}" | |
| for result in add_log(f"โ {error_msg}"): | |
| yield result | |
| for result in add_log("ํ์ฒ๋ฆฌ๊ฐ ์ค๋จ๋์์ต๋๋ค."): | |
| yield result | |
| yield batch_result, error_msg, "", "\n".join(current_log_messages), current_request_info | |