Spaces:
Runtime error
Runtime error
import os | |
import base64 | |
import requests | |
from typing import List, Optional, Generator, Tuple | |
from openai import OpenAI | |
from docling.document_converter import DocumentConverter | |
import glob | |
from pdf2image import convert_from_path | |
from PIL import Image | |
import tempfile | |
import shutil | |
import fitz # PyMuPDF ์ถ๊ฐ | |
# Initialize the docling DocumentConverter | |
converter = DocumentConverter() | |
# ์ ์ญ ์ค์ | |
openai_api_key = "EMPTY" | |
openai_api_base = "http://118.38.20.101:8080/v1" | |
model = "Qwen/Qwen2.5-VL-7B-Instruct-AWQ" | |
# ์ ์ญ ๋ก๊ทธ ์์คํ | |
current_log_messages = [] | |
current_request_info = "" # ๋์ ๋ API ์์ฒญ ์ ๋ณด ์ ์ฅ | |
# OpenAI ํด๋ผ์ด์ธํธ ์ด๊ธฐํ | |
client = OpenAI( | |
api_key=openai_api_key, | |
base_url=openai_api_base | |
) | |
def load_system_prompt() -> str: | |
"""์์คํ ํ๋กฌํํธ ํ์ผ์ ๋ก๋ํฉ๋๋ค.""" | |
try: | |
with open("prompt_system.txt", "r", encoding="utf-8") as f: | |
return f.read().strip() | |
except Exception as e: | |
print(f"์์คํ ํ๋กฌํํธ ํ์ผ ๋ก๋ ์ค๋ฅ: {e}") | |
return "๋น์ ์ ์ด๋ ฅ์ ๋ถ์์ ๋์์ฃผ๋ AI ์ด์์คํดํธ์ ๋๋ค." | |
def load_user_prompt() -> str: | |
"""์ฌ์ฉ์ ํ๋กฌํํธ ํ์ผ์ ๋ก๋ํฉ๋๋ค.""" | |
try: | |
with open("prompt_user.txt", "r", encoding="utf-8") as f: | |
return f.read().strip() | |
except Exception as e: | |
print(f"์ฌ์ฉ์ ํ๋กฌํํธ ํ์ผ ๋ก๋ ์ค๋ฅ: {e}") | |
return "[ํ ์คํธ ์ถ์ถ ์์ ]\n\n์ฒจ๋ถ๋ ์ด๋ฏธ์ง๋ค์ ์ด๋ ฅ์๋ฅผ ์ด๋ฏธ์งํ ํ ๊ฒฐ๊ณผ๋ฌผ์ด์ผ. ์ด๋ฏธ์ง์ ๋ด์ฉ ๋ฐ ๋ ์ด์์์ ์ฐธ๊ณ ํด์ ์ด๋ ฅ์์ ๋ด์ฉ์ ์ ๋ฆฌ ํ ๋งํฌ๋ค์ด ํ์์ผ๋ก ์ ๋ฆฌํด์ค." | |
def load_postprocess_prompt() -> str: | |
"""ํ์ฒ๋ฆฌ ํ๋กฌํํธ ํ์ผ์ ๋ก๋ํฉ๋๋ค.""" | |
try: | |
with open("prompt_postprocess.txt", "r", encoding="utf-8") as f: | |
return f.read().strip() | |
except Exception as e: | |
print(f"ํ์ฒ๋ฆฌ ํ๋กฌํํธ ํ์ผ ๋ก๋ ์ค๋ฅ: {e}") | |
return "[ํ ์คํธ ๋ณํฉ ์์ ]\n๋ฐฐ์น ์์ ์ผ๋ก ์์ง๋ ํ ์คํธ์ ๋๋ค. ์ด์ ์์ ํ ํํ์ ์ด๋ ฅ์๋ฅผ ๋ง๋ค์ด ์ฃผ์ธ์. ์ถ๋ ฅ ํฌ๋งท์ ๋งํฌ๋ค์ด์ ๋๋ค." | |
def encode_image_base64_from_url(image_path: str) -> str: | |
"""Encode an image retrieved from a file path or url to base64 format.""" | |
try: | |
if isinstance(image_path, str) and 'http' in image_path: | |
with requests.get(image_path) as response: | |
response.raise_for_status() | |
result = base64.b64encode(response.content).decode('utf-8') | |
return result | |
elif isinstance(image_path, str) and os.path.isfile(image_path): | |
with open(image_path, 'rb') as image_file: | |
result = base64.b64encode(image_file.read()).decode('utf-8') | |
return result | |
else: | |
raise ValueError(f"Invalid image URL or file path: {image_path}") | |
except Exception as e: | |
print(f"Error encoding image: {e}") | |
raise | |
def convert_pdf_to_images(pdf_path: str, dpi: int = 200) -> List[str]: | |
"""Convert PDF to images using pdf2image and return list of image file paths.""" | |
try: | |
# PDF๋ฅผ ์ด๋ฏธ์ง๋ก ๋ณํ | |
images = convert_from_path(pdf_path, dpi=dpi) | |
# ์์ ๋๋ ํ ๋ฆฌ ์์ฑ | |
temp_dir = tempfile.mkdtemp() | |
image_paths = [] | |
for i, image in enumerate(images): | |
# ์ด๋ฏธ์ง๋ฅผ ์์ ํ์ผ๋ก ์ ์ฅ | |
image_path = os.path.join(temp_dir, f"page_{i+1:03d}.png") | |
image.save(image_path, "PNG") | |
image_paths.append(image_path) | |
return image_paths | |
except Exception as e: | |
print(f"Error converting PDF to images: {e}") | |
raise | |
def combine_images_horizontally(image_paths: List[str]) -> List[str]: | |
"""Combine images in pairs horizontally. Returns list of combined image paths.""" | |
if not image_paths: | |
return [] | |
combined_paths = [] | |
temp_dir = tempfile.mkdtemp() | |
# 2์ฅ์ฉ ๊ฐ๋ก๋ก ๋ถ์ด๊ธฐ | |
for i in range(0, len(image_paths), 2): | |
if i + 1 < len(image_paths): | |
# 2์ฅ์ ๊ฐ๋ก๋ก ๋ถ์ด๊ธฐ | |
img1 = Image.open(image_paths[i]) | |
img2 = Image.open(image_paths[i + 1]) | |
# ๋์ด๋ฅผ ๋ง์ถค (๋ ๋์ ์ชฝ ๊ธฐ์ค) | |
max_height = max(img1.height, img2.height) | |
# ๋น์จ์ ์ ์งํ๋ฉด์ ๋์ด ์กฐ์ | |
if img1.height != max_height: | |
ratio = max_height / img1.height | |
img1 = img1.resize((int(img1.width * ratio), max_height), Image.Resampling.LANCZOS) | |
if img2.height != max_height: | |
ratio = max_height / img2.height | |
img2 = img2.resize((int(img2.width * ratio), max_height), Image.Resampling.LANCZOS) | |
# ๊ฐ๋ก๋ก ๋ถ์ด๊ธฐ | |
combined_width = img1.width + img2.width | |
combined_image = Image.new('RGB', (combined_width, max_height), 'white') | |
combined_image.paste(img1, (0, 0)) | |
combined_image.paste(img2, (img1.width, 0)) | |
# ์ ์ฅ | |
combined_path = os.path.join(temp_dir, f"combined_{i//2 + 1:03d}.png") | |
combined_image.save(combined_path, "PNG") | |
combined_paths.append(combined_path) | |
img1.close() | |
img2.close() | |
combined_image.close() | |
else: | |
# ํ์ ๊ฐ์ ๊ฒฝ์ฐ ๋ง์ง๋ง ์ด๋ฏธ์ง๋ ๊ทธ๋๋ก ๋ณต์ฌ | |
img = Image.open(image_paths[i]) | |
single_path = os.path.join(temp_dir, f"single_{i//2 + 1:03d}.png") | |
img.save(single_path, "PNG") | |
combined_paths.append(single_path) | |
img.close() | |
return combined_paths | |
def combine_images_vertically(image_paths: List[str]) -> List[str]: | |
"""Combine images in pairs vertically. Returns list of combined image paths.""" | |
if not image_paths: | |
return [] | |
combined_paths = [] | |
temp_dir = tempfile.mkdtemp() | |
# 2์ฅ์ฉ ์ธ๋ก๋ก ๋ถ์ด๊ธฐ | |
for i in range(0, len(image_paths), 2): | |
if i + 1 < len(image_paths): | |
# 2์ฅ์ ์ธ๋ก๋ก ๋ถ์ด๊ธฐ | |
img1 = Image.open(image_paths[i]) | |
img2 = Image.open(image_paths[i + 1]) | |
# ๋๋น๋ฅผ ๋ง์ถค (๋ ๋์ ์ชฝ ๊ธฐ์ค) | |
max_width = max(img1.width, img2.width) | |
# ๋น์จ์ ์ ์งํ๋ฉด์ ๋๋น ์กฐ์ | |
if img1.width != max_width: | |
ratio = max_width / img1.width | |
img1 = img1.resize((max_width, int(img1.height * ratio)), Image.Resampling.LANCZOS) | |
if img2.width != max_width: | |
ratio = max_width / img2.width | |
img2 = img2.resize((max_width, int(img2.height * ratio)), Image.Resampling.LANCZOS) | |
# ์ธ๋ก๋ก ๋ถ์ด๊ธฐ | |
combined_height = img1.height + img2.height | |
combined_image = Image.new('RGB', (max_width, combined_height), 'white') | |
combined_image.paste(img1, (0, 0)) | |
combined_image.paste(img2, (0, img1.height)) | |
# ์ ์ฅ | |
combined_path = os.path.join(temp_dir, f"vertical_combined_{i//2 + 1:03d}.png") | |
combined_image.save(combined_path, "PNG") | |
combined_paths.append(combined_path) | |
img1.close() | |
img2.close() | |
combined_image.close() | |
else: | |
# ํ์ ๊ฐ์ ๊ฒฝ์ฐ ๋ง์ง๋ง ์ด๋ฏธ์ง๋ ๊ทธ๋๋ก ๋ณต์ฌ | |
img = Image.open(image_paths[i]) | |
single_path = os.path.join(temp_dir, f"vertical_single_{i//2 + 1:03d}.png") | |
img.save(single_path, "PNG") | |
combined_paths.append(single_path) | |
img.close() | |
return combined_paths | |
def combine_images_with_overlap(image_paths: List[str], direction: str = "horizontal") -> List[str]: | |
"""Combine images with sliding window (overlap). Returns list of combined image paths.""" | |
if not image_paths or len(image_paths) < 2: | |
return image_paths | |
combined_paths = [] | |
temp_dir = tempfile.mkdtemp() | |
# ์ฌ๋ผ์ด๋ฉ ์๋์ฐ๋ก 2์ฅ์ฉ ๋ณํฉ (1,2), (2,3), (3,4), (4,5)... | |
for i in range(len(image_paths) - 1): | |
img1 = Image.open(image_paths[i]) | |
img2 = Image.open(image_paths[i + 1]) | |
if direction == "horizontal": | |
# ๊ฐ๋ก ๋ณํฉ - ๋์ด๋ฅผ ๋ง์ถค | |
max_height = max(img1.height, img2.height) | |
if img1.height != max_height: | |
ratio = max_height / img1.height | |
img1 = img1.resize((int(img1.width * ratio), max_height), Image.Resampling.LANCZOS) | |
if img2.height != max_height: | |
ratio = max_height / img2.height | |
img2 = img2.resize((int(img2.width * ratio), max_height), Image.Resampling.LANCZOS) | |
# ๊ฐ๋ก๋ก ๋ถ์ด๊ธฐ | |
combined_width = img1.width + img2.width | |
combined_image = Image.new('RGB', (combined_width, max_height), 'white') | |
combined_image.paste(img1, (0, 0)) | |
combined_image.paste(img2, (img1.width, 0)) | |
combined_path = os.path.join(temp_dir, f"overlap_h_{i+1}_{i+2}.png") | |
else: # vertical | |
# ์ธ๋ก ๋ณํฉ - ๋๋น๋ฅผ ๋ง์ถค | |
max_width = max(img1.width, img2.width) | |
if img1.width != max_width: | |
ratio = max_width / img1.width | |
img1 = img1.resize((max_width, int(img1.height * ratio)), Image.Resampling.LANCZOS) | |
if img2.width != max_width: | |
ratio = max_width / img2.width | |
img2 = img2.resize((max_width, int(img2.height * ratio)), Image.Resampling.LANCZOS) | |
# ์ธ๋ก๋ก ๋ถ์ด๊ธฐ | |
combined_height = img1.height + img2.height | |
combined_image = Image.new('RGB', (max_width, combined_height), 'white') | |
combined_image.paste(img1, (0, 0)) | |
combined_image.paste(img2, (0, img1.height)) | |
combined_path = os.path.join(temp_dir, f"overlap_v_{i+1}_{i+2}.png") | |
combined_image.save(combined_path, "PNG") | |
combined_paths.append(combined_path) | |
img1.close() | |
img2.close() | |
combined_image.close() | |
return combined_paths | |
def create_prompt_content_with_image(image_paths: List[str], prompt: str) -> list: | |
"""Create a prompt content with image URLs.""" | |
if not image_paths: | |
return [{"type": "text", "text": prompt}] | |
else: | |
content = [{"type": "text", "text": prompt}] | |
for path in image_paths: | |
try: | |
content.append({ | |
"type": "image_url", | |
"image_url": { | |
"url": f"data:image/jpeg;base64,{encode_image_base64_from_url(path)}" | |
}, | |
}) | |
except Exception as e: | |
print(f"Error encoding image {path}: {e}") | |
return content | |
def log_api_request(messages: List[dict], model_name: str) -> str: | |
"""Log the actual API request content for debugging - appends to accumulated requests.""" | |
import json | |
import datetime | |
global current_request_info | |
# ํ์ฌ ์๊ฐ ๊ฐ์ ธ์ค๊ธฐ | |
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
# ์ค์ API ์์ฒญ๊ณผ ๊ฐ์ ๊ตฌ์กฐ๋ฅผ ์์ฑ (์ด๋ฏธ์ง ๋ฐ์ดํฐ๋ ์ถ์ฝ) | |
api_request = { | |
"model": model_name, | |
"messages": [] | |
} | |
for message in messages: | |
message_copy = {"role": message.get("role", "unknown")} | |
content = message.get("content", "") | |
if isinstance(content, str): | |
# ํ ์คํธ ๋ด์ฉ์ธ ๊ฒฝ์ฐ ๊ทธ๋๋ก ํฌํจ | |
message_copy["content"] = content | |
elif isinstance(content, list): | |
# ๋ฉํฐ๋ชจ๋ฌ ์ฝํ ์ธ ์ธ ๊ฒฝ์ฐ | |
content_copy = [] | |
for item in content: | |
if item.get("type") == "text": | |
content_copy.append({ | |
"type": "text", | |
"text": item.get("text", "") | |
}) | |
elif item.get("type") == "image_url": | |
image_url = item.get("image_url", {}).get("url", "") | |
if image_url.startswith("data:image"): | |
# Base64 ์ด๋ฏธ์ง ๋ฐ์ดํฐ๋ ์ถ์ฝํด์ ํ์ | |
content_copy.append({ | |
"type": "image_url", | |
"image_url": { | |
"url": f"data:image/*;base64,[BASE64_DATA_{len(image_url)}_CHARS]" | |
} | |
}) | |
else: | |
content_copy.append({ | |
"type": "image_url", | |
"image_url": {"url": image_url} | |
}) | |
message_copy["content"] = content_copy | |
api_request["messages"].append(message_copy) | |
# JSON์ผ๋ก ํฌ๋งทํ | |
request_json = json.dumps(api_request, ensure_ascii=False, indent=2) | |
# ์ ์์ฒญ์ ๊ตฌ๋ถ์ ๊ณผ ํจ๊ป ๊ธฐ์กด ๋ด์ฉ์ append | |
separator = f"\n{'='*80}\n๐ API ์์ฒญ [{timestamp}]\n{'='*80}\n" | |
new_request = f"{separator}{request_json}\n" | |
if current_request_info: | |
current_request_info += new_request | |
else: | |
# ์ฒซ ๋ฒ์งธ ์์ฒญ์ธ ๊ฒฝ์ฐ ๊ตฌ๋ถ์ ์์ด ์์ | |
current_request_info = f"๐ API ์์ฒญ [{timestamp}]\n{'='*80}\n{request_json}\n" | |
return current_request_info | |
def send_chat_completion_request(image_paths: List[str], prompt: str, system_prompt: str = ""): | |
"""Send a chat completion request with images.""" | |
# ์์คํ ํ๋กฌํํธ๊ฐ ๋น์ด์์ผ๋ฉด ํ์ผ์์ ๋ก๋ | |
if not system_prompt.strip(): | |
system_prompt = load_system_prompt() | |
messages = [ | |
{"role": "system", "content": system_prompt}, | |
{ | |
"role": "user", | |
"content": create_prompt_content_with_image(image_paths, prompt), | |
}, | |
] | |
# ์ค์ API ํธ์ถ (๋ก๊น ์ process_request์์ ๋ณ๋๋ก ์ฒ๋ฆฌ) | |
return client.chat.completions.create(model=model, messages=messages) | |
def process_images_in_batches(image_paths: List[str], prompt: str, system_prompt: str, batch_size: int = 3) -> List[str]: | |
"""Process images in batches of specified size and return list of results.""" | |
if not image_paths: | |
return [] | |
results = [] | |
# ์ด๋ฏธ์ง๋ฅผ ๋ฐฐ์น ํฌ๊ธฐ๋ก ๋๋์ด ์ฒ๋ฆฌ | |
for i in range(0, len(image_paths), batch_size): | |
batch_images = image_paths[i:i + batch_size] | |
try: | |
# ๋ฐฐ์น ์ ๋ณด๋ฅผ ํ๋กฌํํธ์ ์ถ๊ฐ | |
# batch_prompt = f"{prompt}\n\n[๋ฐฐ์น {i//batch_size + 1}/{(len(image_paths) + batch_size - 1)//batch_size}]" | |
batch_prompt = f"{prompt}" | |
# API ์์ฒญ | |
completion = send_chat_completion_request(batch_images, batch_prompt, system_prompt) | |
response_content = completion.choices[0].message.content | |
print(response_content) | |
results.append(response_content) | |
except Exception as e: | |
results.append(f"๋ฐฐ์น {i//batch_size + 1} ์ฒ๋ฆฌ ์ค ์ค๋ฅ ๋ฐ์: {str(e)}") | |
return results | |
def merge_batch_results(results: List[str]) -> str: | |
"""Merge results from multiple batches into a single document.""" | |
if not results: | |
return "" | |
if len(results) == 1: | |
return results[0] | |
merged_result = "" | |
# merged_result = "# ๋ชจ๋ ์ด๋ ฅ์ ๋ถ์ ๊ฒฐ๊ณผ\n\n" | |
# merged_result += f"์ด {len(results)}๊ฐ ์ด๋ฏธ์ง๋ฅผ ์ธ์ํ์ต๋๋ค.\n\n" | |
for i, result in enumerate(results, 1): | |
# merged_result += f"## {i} ๋ฒ์งธ ์ด๋ฏธ์ง ์ธ์ ๊ฒฐ๊ณผ\n\n" | |
merged_result += result | |
# merged_result += "\n\n---\n\n" | |
return merged_result | |
def get_pdf_files(): | |
"""Get list of PDF files in the resume_samples directory.""" | |
pdf_files = glob.glob("./resume_samples/**/*.pdf", recursive=True) | |
if not pdf_files: | |
# PDF ํ์ผ์ด ์์ผ๋ฉด ๊ธฐ๋ณธ ๋๋ ํ ๋ฆฌ๊ฐ ์๋์ง ํ์ธ | |
os.makedirs("./resume_samples/pdf/text", exist_ok=True) | |
return [] | |
return sorted(pdf_files) | |
def save_result_to_file(content: str, filename: str) -> str: | |
"""Save the analysis result to a markdown file.""" | |
if not content: | |
return "์ ์ฅํ ๋ด์ฉ์ด ์์ต๋๋ค." | |
if not filename: | |
return "ํ์ผ ์ด๋ฆ์ด ์ง์ ๋์ง ์์์ต๋๋ค. ํ์ผ ์ด๋ฆ์ ์ ๋ ฅํด์ฃผ์ธ์." | |
# ํ์ฅ์ ์ถ๊ฐ | |
if not filename.endswith('.md'): | |
filename += '.md' | |
try: | |
with open(filename, "w", encoding="utf-8") as f: | |
f.write(content) | |
return f"๊ฒฐ๊ณผ๊ฐ {filename}์ ์ ์ฅ๋์์ต๋๋ค." | |
except Exception as e: | |
return f"ํ์ผ ์ ์ฅ ์ค๋ฅ: {str(e)}" | |
def extract_text_with_fitz(pdf_path: str) -> str: | |
"""PDF์์ Fitz(PyMuPDF)๋ก ํ ์คํธ ์ถ์ถ""" | |
try: | |
doc = fitz.open(pdf_path) | |
text_content = "" | |
for page_num in range(len(doc)): | |
page = doc.load_page(page_num) | |
text_content += f"## Page {page_num + 1}\n\n" | |
text_content += page.get_text("text") | |
if page_num < len(doc) - 1: | |
text_content += "\n\n---\n\n" | |
doc.close() | |
return text_content | |
except Exception as e: | |
return f"Fitz ํ ์คํธ ์ถ์ถ ์ค๋ฅ: {str(e)}" | |
def extract_text_with_docling(pdf_path: str) -> str: | |
"""PDF์์ Docling์ผ๋ก ํ ์คํธ ์ถ์ถ (OCR ํฌํจ)""" | |
try: | |
result = converter.convert(pdf_path) | |
return result.document.export_to_markdown() | |
except Exception as e: | |
return f"Docling ํ ์คํธ ์ถ์ถ ์ค๋ฅ: {str(e)}" | |
def preview_image_processing(pdf_path: str, processing_mode: str = "๊ฐ๋ก ๋ณํฉ (2ํ์ด์ง์ฉ)", overlap_option: str = "์ผ๋ฐ ๋ณํฉ") -> List[str]: | |
"""PDF๋ฅผ ์ด๋ฏธ์ง๋ก ๋ณํํ๊ณ ์ ํํ ๋ฐฉ์์ผ๋ก ์ฒ๋ฆฌํ ๊ฒฐ๊ณผ๋ฅผ ๋ฏธ๋ฆฌ๋ณด๊ธฐ์ฉ์ผ๋ก ๋ฐํ""" | |
try: | |
if not pdf_path or not os.path.exists(pdf_path): | |
return [] | |
# PDF๋ฅผ ์ด๋ฏธ์ง๋ก ๋ณํ | |
pdf_images = convert_pdf_to_images(pdf_path) | |
# ์ฒ๋ฆฌ ๋ฐฉ์์ ๋ฐ๋ผ ๋ถ๊ธฐ | |
if processing_mode == "๊ฐ๋ก ๋ณํฉ (2ํ์ด์ง์ฉ)": | |
if overlap_option == "์ค๋ณต ๋ณํฉ (์ฌ๋ผ์ด๋ฉ ์๋์ฐ)": | |
processed_images = combine_images_with_overlap(pdf_images, "horizontal") | |
else: | |
processed_images = combine_images_horizontally(pdf_images) | |
elif processing_mode == "์ธ๋ก ๋ณํฉ (2ํ์ด์ง์ฉ)": | |
if overlap_option == "์ค๋ณต ๋ณํฉ (์ฌ๋ผ์ด๋ฉ ์๋์ฐ)": | |
processed_images = combine_images_with_overlap(pdf_images, "vertical") | |
else: | |
processed_images = combine_images_vertically(pdf_images) | |
else: # "๋ฑ๊ฐ ํ์ด์ง" | |
processed_images = pdf_images | |
return processed_images | |
except Exception as e: | |
print(f"์ด๋ฏธ์ง ์ฒ๋ฆฌ ๋ฏธ๋ฆฌ๋ณด๊ธฐ ์ค๋ฅ: {e}") | |
return [] | |
def process_request( | |
prompt: str, | |
system_prompt: str, | |
use_images: bool, | |
use_docling: bool, | |
pdf_file_path: str, | |
uploaded_file: str, | |
output_filename: str, | |
image_processing_mode: str = "๊ฐ๋ก ๋ณํฉ (2ํ์ด์ง์ฉ)", | |
overlap_option: str = "์ผ๋ฐ ๋ณํฉ", | |
batch_size: int = 3, | |
use_postprocess: bool = True, | |
postprocess_prompt: str = "", | |
progress = None | |
) -> Generator[Tuple[str, str, str, str, str], None, None]: | |
"""Process the request with all the options and yield intermediate results.""" | |
import time | |
# ์ ์ฒด ์ฒ๋ฆฌ ์์ ์๊ฐ ๊ธฐ๋ก | |
total_start_time = time.time() | |
global current_log_messages, current_request_info | |
current_log_messages = [] # ๋ก๊ทธ ์ด๊ธฐํ | |
current_request_info = "" # API ์์ฒญ ์ ๋ณด ์ด๊ธฐํ | |
# ํ์ผ ๊ฒฝ๋ก ๊ฒฐ์ (์ ๋ก๋๋ ํ์ผ์ด ์์ผ๋ฉด ์ฐ์ ์ฌ์ฉ) | |
final_pdf_path = uploaded_file if uploaded_file else pdf_file_path | |
# ์ด๊ธฐํ | |
full_prompt = prompt | |
docling_output = "" | |
images_to_use = [] | |
temp_dirs_to_cleanup = [] | |
response_content = "" # ์ต์ข ๊ฒฐ๊ณผ | |
batch_content = "" # ๋ฐฐ์น ์ฒ๋ฆฌ ๊ฒฐ๊ณผ | |
def add_log(message): | |
current_log_messages.append(f"[{len(current_log_messages)+1:02d}] {message}") | |
log_text = "\n".join(current_log_messages) | |
# ์ค๊ฐ ๊ฒฐ๊ณผ๋ฅผ yieldํฉ๋๋ค (๋ฐฐ์น ๊ฒฐ๊ณผ, ์ต์ข ๊ฒฐ๊ณผ, ํ์ฑ ๊ฒฐ๊ณผ, ๋ก๊ทธ, API ์์ฒญ ์ ๋ณด) | |
yield batch_content, response_content, docling_output, log_text, current_request_info | |
return log_text | |
# ๋ก๊ทธ ์์ฑ๊ธฐ ๊ฐ์ฒด ์์ฑ | |
log_generator = add_log("์์...") | |
next(log_generator) # ์ฒซ ๋ฒ์งธ ๋ก๊ทธ ์์ฑ | |
try: | |
# PDF ํ์ผ์ด ์ ํ๋์ง ์์ ๊ฒฝ์ฐ | |
if not final_pdf_path or not os.path.exists(final_pdf_path): | |
msg = "PDF ํ์ผ์ ์ ํํ๊ฑฐ๋ ์ ๋ก๋ํด ์ฃผ์ธ์." | |
for result in add_log("โ PDF ํ์ผ์ด ์ ํ๋์ง ์์์ต๋๋ค."): | |
yield result | |
yield "", msg, "", "\n".join(current_log_messages), current_request_info | |
return | |
for result in add_log(f"โ ์ฒ๋ฆฌํ PDF ํ์ผ: {os.path.basename(final_pdf_path)}"): | |
yield result | |
# PDF๋ฅผ ์ด๋ฏธ์ง๋ก ์๋ ๋ณํ (์ด๋ฏธ์ง ์ฌ์ฉ์ด ํ์ฑํ๋ ๊ฒฝ์ฐ) | |
if use_images: | |
for result in add_log("๐ผ๏ธ PDF๋ฅผ ์ด๋ฏธ์ง๋ก ๋ณํ ์ค..."): | |
yield result | |
print(f"PDF๋ฅผ ์ด๋ฏธ์ง๋ก ๋ณํ ์ค: {final_pdf_path}") | |
# PDF๋ฅผ ์ด๋ฏธ์ง๋ก ๋ณํ | |
pdf_images = convert_pdf_to_images(final_pdf_path) | |
temp_dirs_to_cleanup.extend([os.path.dirname(path) for path in pdf_images]) | |
for result in add_log(f"๐ PDF์์ {len(pdf_images)}๊ฐ ํ์ด์ง ์ถ์ถ ์๋ฃ"): | |
yield result | |
# ์ด๋ฏธ์ง ์ฒ๋ฆฌ ๋ฐฉ์์ ๋ฐ๋ผ ๋ถ๊ธฐ | |
if image_processing_mode == "๊ฐ๋ก ๋ณํฉ (2ํ์ด์ง์ฉ)": | |
if overlap_option == "์ค๋ณต ๋ณํฉ (์ฌ๋ผ์ด๋ฉ ์๋์ฐ)": | |
# ์ค๋ณต ๊ฐ๋ก ๋ณํฉ | |
for result in add_log("๐ ํ์ด์ง๋ค์ ์ฌ๋ผ์ด๋ฉ ์๋์ฐ ๋ฐฉ์์ผ๋ก ๊ฐ๋ก ๋ณํฉ ์ค..."): | |
yield result | |
combined_images = combine_images_with_overlap(pdf_images, "horizontal") | |
for result in add_log(f"โ {len(combined_images)}๊ฐ์ ์ค๋ณต ๊ฐ๋ก ๋ณํฉ ์ด๋ฏธ์ง ์์ฑ ์๋ฃ"): | |
yield result | |
else: | |
# ์ผ๋ฐ ๊ฐ๋ก ๋ณํฉ | |
for result in add_log("๐ ํ์ด์ง๋ค์ 2์ฅ์ฉ ๊ฐ๋ก๋ก ๋ณํฉ ์ค..."): | |
yield result | |
combined_images = combine_images_horizontally(pdf_images) | |
for result in add_log(f"โ {len(combined_images)}๊ฐ์ ๊ฐ๋ก ๋ณํฉ ์ด๋ฏธ์ง ์์ฑ ์๋ฃ"): | |
yield result | |
temp_dirs_to_cleanup.extend([os.path.dirname(path) for path in combined_images]) | |
images_to_use = combined_images | |
print(f"PDF์์ {len(pdf_images)}๊ฐ ํ์ด์ง๋ฅผ {len(combined_images)}๊ฐ ๊ฐ๋ก ๋ณํฉ ์ด๋ฏธ์ง๋ก ๋ณํ ์๋ฃ") | |
elif image_processing_mode == "์ธ๋ก ๋ณํฉ (2ํ์ด์ง์ฉ)": | |
if overlap_option == "์ค๋ณต ๋ณํฉ (์ฌ๋ผ์ด๋ฉ ์๋์ฐ)": | |
# ์ค๋ณต ์ธ๋ก ๋ณํฉ | |
for result in add_log("๐ ํ์ด์ง๋ค์ ์ฌ๋ผ์ด๋ฉ ์๋์ฐ ๋ฐฉ์์ผ๋ก ์ธ๋ก ๋ณํฉ ์ค..."): | |
yield result | |
combined_images = combine_images_with_overlap(pdf_images, "vertical") | |
for result in add_log(f"โ {len(combined_images)}๊ฐ์ ์ค๋ณต ์ธ๋ก ๋ณํฉ ์ด๋ฏธ์ง ์์ฑ ์๋ฃ"): | |
yield result | |
else: | |
# ์ผ๋ฐ ์ธ๋ก ๋ณํฉ | |
for result in add_log("๐ ํ์ด์ง๋ค์ 2์ฅ์ฉ ์ธ๋ก๋ก ๋ณํฉ ์ค..."): | |
yield result | |
combined_images = combine_images_vertically(pdf_images) | |
for result in add_log(f"โ {len(combined_images)}๊ฐ์ ์ธ๋ก ๋ณํฉ ์ด๋ฏธ์ง ์์ฑ ์๋ฃ"): | |
yield result | |
temp_dirs_to_cleanup.extend([os.path.dirname(path) for path in combined_images]) | |
images_to_use = combined_images | |
print(f"PDF์์ {len(pdf_images)}๊ฐ ํ์ด์ง๋ฅผ {len(combined_images)}๊ฐ ์ธ๋ก ๋ณํฉ ์ด๋ฏธ์ง๋ก ๋ณํ ์๋ฃ") | |
else: # "๋ฑ๊ฐ ํ์ด์ง" | |
# ํ์ด์ง๋ฅผ ๊ทธ๋๋ก ์ฌ์ฉ | |
images_to_use = pdf_images | |
for result in add_log(f"โ {len(pdf_images)}๊ฐ์ ๊ฐ๋ณ ํ์ด์ง ์ด๋ฏธ์ง ์ค๋น ์๋ฃ"): | |
yield result | |
print(f"PDF์์ {len(pdf_images)}๊ฐ ํ์ด์ง๋ฅผ ๊ฐ๋ณ ์ด๋ฏธ์ง๋ก ์ฌ์ฉ") | |
# docling ์ฒ๋ฆฌ | |
if use_docling: | |
for result in add_log("๐ Docling์ผ๋ก PDF ํ ์คํธ ํ์ฑ ์ค..."): | |
yield result | |
try: | |
result = converter.convert(final_pdf_path) | |
docling_output = result.document.export_to_markdown() | |
full_prompt += f"\n\nํ์ฑ๋ ์ด๋ ฅ์ ๋ด์ฉ: {docling_output}" | |
for result in add_log(f"โ ํ ์คํธ ํ์ฑ ์๋ฃ (๊ธธ์ด: {len(docling_output)} ๋ฌธ์)"): | |
yield result | |
except Exception as e: | |
error_msg = f"Docling ๋ณํ ์ค๋ฅ: {str(e)}" | |
for result in add_log(f"โ Docling ๋ณํ ์ค๋ฅ: {str(e)}"): | |
yield result | |
for result in add_log(f"โ ์ฒ๋ฆฌ ์ค๋จ๋จ"): | |
yield result | |
yield "", error_msg, docling_output, "\n".join(current_log_messages), current_request_info | |
return | |
# ์ด๋ฏธ์ง ์ฒ๋ฆฌ (๋ฐฐ์น ์ฒ๋ฆฌ) | |
if images_to_use: | |
# ์์คํ ํ๋กฌํํธ ์ ๋ณด๋ง ๋ก๊ทธ์ ์ถ๊ฐ (๊ธธ์ด ์ ๊ฑฐ) | |
for result in add_log(f"๐ค ์์คํ ํ๋กฌํํธ: {system_prompt[:50]}{'...' if len(system_prompt) > 50 else ''}"): | |
yield result | |
if len(images_to_use) <= batch_size: | |
# ์ค์ ๋ ๋ฐฐ์น ํฌ๊ธฐ ์ดํ๋ฉด ํ ๋ฒ์ ์ฒ๋ฆฌ | |
for result in add_log(f"๐ค API ์์ฒญ ์ค๋น ์ค... (์ด๋ฏธ์ง {len(images_to_use)}์ฅ)"): | |
yield result | |
# API ์์ฒญ ์ ๋ณด๋ฅผ ๋จผ์ ๋ก๊น ํ๊ณ ์ฆ์ UI์ ํ์ | |
system_prompt_clean = system_prompt if system_prompt.strip() else load_system_prompt() | |
messages = [ | |
{"role": "system", "content": system_prompt_clean}, | |
{ | |
"role": "user", | |
"content": create_prompt_content_with_image(images_to_use, full_prompt) | |
} | |
] | |
log_api_request(messages, model) | |
# ์ฆ์ API ์์ฒญ ์ ๋ณด๋ฅผ UI์ ํ์ | |
yield "", "", docling_output, "\n".join(current_log_messages), current_request_info | |
for result in add_log(f"๐ค LLM API ์์ฒญ ์ค... (์ด๋ฏธ์ง {len(images_to_use)}์ฅ)"): | |
yield result | |
# ๋ฐฐ์น ์์ ์๊ฐ ๊ธฐ๋ก | |
import time | |
batch_start_time = time.time() | |
completion = send_chat_completion_request(images_to_use, full_prompt, system_prompt) | |
response_content = completion.choices[0].message.content | |
batch_content = response_content # ๋จ์ผ ๋ฐฐ์น๋ ๋ฐฐ์น ๊ฒฐ๊ณผ์ ๋์ผ | |
print(response_content) | |
# ๋ฐฐ์น ์ฒ๋ฆฌ ์๊ฐ ๊ณ์ฐ | |
batch_duration = time.time() - batch_start_time | |
for result in add_log(f"โ LLM ๋ถ์ ์๋ฃ (์ฒ๋ฆฌ ์๊ฐ: {batch_duration:.1f}์ด)"): | |
yield result | |
else: | |
# ์ค์ ๋ ๋ฐฐ์น ํฌ๊ธฐ ์ด๊ณผ๋ฉด ๋ฐฐ์น๋ก ๋๋์ด ์ฒ๋ฆฌ | |
num_batches = (len(images_to_use) + batch_size - 1) // batch_size | |
for result in add_log(f"๐ฆ ์ด๋ฏธ์ง๊ฐ {len(images_to_use)}์ฅ์ด๋ฏ๋ก {num_batches}๊ฐ ๋ฐฐ์น๋ก ๋๋์ด ์ฒ๋ฆฌ (๋ฐฐ์น๋น {batch_size}์ฅ)"): | |
yield result | |
print(f"์ด๋ฏธ์ง๊ฐ {len(images_to_use)}์ฅ์ด๋ฏ๋ก ๋ฐฐ์น ์ฒ๋ฆฌ๋ฅผ ์์ํฉ๋๋ค. (๋ฐฐ์น๋น {batch_size}์ฅ)") | |
batch_results = [] | |
for i in range(0, len(images_to_use), batch_size): | |
batch_num = i // batch_size + 1 | |
batch_images = images_to_use[i:i + batch_size] | |
for result in add_log(f"๐ค ๋ฐฐ์น {batch_num}/{num_batches} API ์์ฒญ ์ค๋น ์ค... (์ด๋ฏธ์ง {len(batch_images)}์ฅ)"): | |
yield result | |
# API ์์ฒญ ์ ๋ณด๋ฅผ ๋จผ์ ๋ก๊น ํ๊ณ ์ฆ์ UI์ ํ์ | |
batch_prompt = f"{full_prompt}" | |
system_prompt_clean = system_prompt if system_prompt.strip() else load_system_prompt() | |
messages = [ | |
{"role": "system", "content": system_prompt_clean}, | |
{ | |
"role": "user", | |
"content": create_prompt_content_with_image(batch_images, batch_prompt) | |
} | |
] | |
log_api_request(messages, model) | |
# ์ฆ์ API ์์ฒญ ์ ๋ณด๋ฅผ UI์ ํ์ | |
yield "", "", docling_output, "\n".join(current_log_messages), current_request_info | |
for result in add_log(f"๐ค ๋ฐฐ์น {batch_num}/{num_batches} ์ฒ๋ฆฌ ์ค... (์ด๋ฏธ์ง {len(batch_images)}์ฅ)"): | |
yield result | |
try: | |
# ๋ฐฐ์น ์์ ์๊ฐ ๊ธฐ๋ก | |
import time | |
batch_start_time = time.time() | |
completion = send_chat_completion_request(batch_images, batch_prompt, system_prompt) | |
batch_response = completion.choices[0].message.content | |
batch_results.append(batch_response) | |
print(batch_response) | |
# ๋ฐฐ์น ์ฒ๋ฆฌ ์๊ฐ ๊ณ์ฐ | |
batch_duration = time.time() - batch_start_time | |
for result in add_log(f"โ ๋ฐฐ์น {batch_num} ์๋ฃ (์ฒ๋ฆฌ ์๊ฐ: {batch_duration:.1f}์ด)"): | |
yield result | |
except Exception as e: | |
batch_results.append(f"๋ฐฐ์น {batch_num} ์ฒ๋ฆฌ ์ค ์ค๋ฅ ๋ฐ์: {str(e)}") | |
for result in add_log(f"โ ๋ฐฐ์น {batch_num} ์ค๋ฅ: {str(e)}"): | |
yield result | |
batch_content = merge_batch_results(batch_results) # ๋ฐฐ์น ๊ฒฐ๊ณผ ์ ์ฅ | |
response_content = batch_content # ์ด๊ธฐ ๊ฒฐ๊ณผ๋ ๋ฐฐ์น ๊ฒฐ๊ณผ์ ๋์ผ | |
for result in add_log("๐ ๋ชจ๋ ๋ฐฐ์น ๊ฒฐ๊ณผ ๋ณํฉ ์๋ฃ"): | |
yield result | |
else: | |
# ์ด๋ฏธ์ง๊ฐ ์์ผ๋ฉด ํ ์คํธ๋ง ์ฒ๋ฆฌ - ์์คํ ํ๋กฌํํธ ์ ๋ณด๋ง ํ์ | |
for result in add_log(f"๐ค ์์คํ ํ๋กฌํํธ: {system_prompt[:50]}{'...' if len(system_prompt) > 50 else ''}"): | |
yield result | |
for result in add_log("๐ค ํ ์คํธ ์ ์ฉ API ์์ฒญ ์ค๋น ์ค..."): | |
yield result | |
# API ์์ฒญ ์ ๋ณด๋ฅผ ๋จผ์ ๋ก๊น ํ๊ณ ์ฆ์ UI์ ํ์ | |
system_prompt_clean = system_prompt if system_prompt.strip() else load_system_prompt() | |
messages = [ | |
{"role": "system", "content": system_prompt_clean}, | |
{ | |
"role": "user", | |
"content": create_prompt_content_with_image([], full_prompt) | |
} | |
] | |
log_api_request(messages, model) | |
# ์ฆ์ API ์์ฒญ ์ ๋ณด๋ฅผ UI์ ํ์ | |
yield "", "", docling_output, "\n".join(current_log_messages), current_request_info | |
for result in add_log("๐ค ํ ์คํธ ์ ์ฉ LLM API ์์ฒญ ์ค..."): | |
yield result | |
# ํ ์คํธ ์ ์ฉ ์ฒ๋ฆฌ ์์ ์๊ฐ ๊ธฐ๋ก | |
import time | |
text_start_time = time.time() | |
completion = send_chat_completion_request([], full_prompt, system_prompt) | |
response_content = completion.choices[0].message.content | |
batch_content = response_content # ํ ์คํธ ์ ์ฉ์ ๋ฐฐ์น ๊ฒฐ๊ณผ์ ๋์ผ | |
print(response_content) | |
# ํ ์คํธ ์ ์ฉ ์ฒ๋ฆฌ ์๊ฐ ๊ณ์ฐ | |
text_duration = time.time() - text_start_time | |
for result in add_log(f"โ ํ ์คํธ ๋ถ์ ์๋ฃ (์ฒ๋ฆฌ ์๊ฐ: {text_duration:.1f}์ด)"): | |
yield result | |
# ํ์ฒ๋ฆฌ ์ํ (๋ค์ค ๋ฐฐ์น์ธ ๊ฒฝ์ฐ์๋ง) | |
if use_postprocess and len(images_to_use) > batch_size: | |
for result in add_log("๐ ํ์ฒ๋ฆฌ ์์ ์ ์์ํฉ๋๋ค..."): | |
yield result | |
# ํ์ฒ๋ฆฌ ํ๋กฌํํธ๊ฐ ๋น์ด์์ผ๋ฉด ๊ธฐ๋ณธ๊ฐ ์ฌ์ฉ | |
if not postprocess_prompt.strip(): | |
postprocess_prompt = load_postprocess_prompt() | |
# ๋ฐฐ์น ๊ฒฐ๊ณผ๋ค์ ํ๋์ ํ ์คํธ๋ก ํฉ์นจ | |
combined_results = f"{postprocess_prompt}\n\n=== ๋ฐฐ์น ์ฒ๋ฆฌ ๊ฒฐ๊ณผ ===\n\n{response_content}" | |
for result in add_log("๐ค ํ์ฒ๋ฆฌ API ์์ฒญ ์ค๋น ์ค..."): | |
yield result | |
# ํ์ฒ๋ฆฌ API ์์ฒญ ์ ๋ณด๋ฅผ ๋จผ์ ๋ก๊น ํ๊ณ ์ฆ์ UI์ ํ์ | |
system_prompt_clean = system_prompt if system_prompt.strip() else load_system_prompt() | |
messages = [ | |
{"role": "system", "content": system_prompt_clean}, | |
{ | |
"role": "user", | |
"content": combined_results | |
} | |
] | |
log_api_request(messages, model) | |
# ์ฆ์ API ์์ฒญ ์ ๋ณด๋ฅผ UI์ ํ์ | |
yield "", "", docling_output, "\n".join(current_log_messages), current_request_info | |
for result in add_log("๐ค ํ์ฒ๋ฆฌ LLM API ์์ฒญ ์ค..."): | |
yield result | |
# ํ์ฒ๋ฆฌ ์์ ์๊ฐ ๊ธฐ๋ก | |
import time | |
postprocess_start_time = time.time() | |
# ํ์ฒ๋ฆฌ API ํธ์ถ (์ด๋ฏธ์ง ์์ด ํ ์คํธ๋ง) | |
completion = send_chat_completion_request([], combined_results, system_prompt) | |
response_content = completion.choices[0].message.content | |
print(response_content) | |
# ํ์ฒ๋ฆฌ ์๊ฐ ๊ณ์ฐ | |
postprocess_duration = time.time() - postprocess_start_time | |
for result in add_log(f"โ ํ์ฒ๋ฆฌ ์๋ฃ (์ฒ๋ฆฌ ์๊ฐ: {postprocess_duration:.1f}์ด)"): | |
yield result | |
# ์ ์ฒด ์ฒ๋ฆฌ ์๊ฐ ๊ณ์ฐ ๋ฐ ๋ก๊น | |
total_duration = time.time() - total_start_time | |
for result in add_log(f"โฑ๏ธ ์ ์ฒด ์ฒ๋ฆฌ ์๊ฐ: {total_duration:.1f}์ด"): | |
yield result | |
for result in add_log("๐ ๋ชจ๋ ์ฒ๋ฆฌ๊ฐ ์ฑ๊ณต์ ์ผ๋ก ์๋ฃ๋์์ต๋๋ค!"): | |
yield result | |
yield batch_content, response_content, docling_output, "\n".join(current_log_messages), current_request_info | |
except Exception as e: | |
# ์ค๋ฅ ๋ฐ์ ์์๋ ์ ์ฒด ์ฒ๋ฆฌ ์๊ฐ ๊ธฐ๋ก | |
total_duration = time.time() - total_start_time | |
for result in add_log(f"โฑ๏ธ ์ ์ฒด ์ฒ๋ฆฌ ์๊ฐ: {total_duration:.1f}์ด (์ค๋ฅ๋ก ์ธํ ์ค๋จ)"): | |
yield result | |
error_msg = f"์ค๋ฅ ๋ฐ์: {str(e)}" | |
for result in add_log(f"โ {error_msg}"): | |
yield result | |
for result in add_log("์ฒ๋ฆฌ๊ฐ ์ค๋จ๋์์ต๋๋ค."): | |
yield result | |
yield "", error_msg, docling_output, "\n".join(current_log_messages), current_request_info | |
finally: | |
# ์์ ๋๋ ํ ๋ฆฌ ์ ๋ฆฌ | |
if temp_dirs_to_cleanup: | |
for result in add_log("๐งน ์์ ํ์ผ ์ ๋ฆฌ ์ค..."): | |
yield result | |
for temp_dir in set(temp_dirs_to_cleanup): # ์ค๋ณต ์ ๊ฑฐ | |
try: | |
if os.path.exists(temp_dir): | |
shutil.rmtree(temp_dir) | |
print(f"์์ ๋๋ ํ ๋ฆฌ ์ ๋ฆฌ: {temp_dir}") | |
except Exception as e: | |
print(f"์์ ๋๋ ํ ๋ฆฌ ์ ๋ฆฌ ์คํจ: {temp_dir}, ์ค๋ฅ: {e}") | |
def process_request_preprocessing_only( | |
prompt: str, | |
system_prompt: str, | |
use_images: bool, | |
use_docling: bool, | |
pdf_file_path: str, | |
uploaded_file: str, | |
output_filename: str, | |
image_processing_mode: str = "๊ฐ๋ก ๋ณํฉ (2ํ์ด์ง์ฉ)", | |
overlap_option: str = "์ผ๋ฐ ๋ณํฉ", | |
batch_size: int = 3, | |
progress = None | |
) -> Generator[Tuple[str, str, str, str, str], None, None]: | |
"""์ ์ฒ๋ฆฌ(๋ฐฐ์น ์ฒ๋ฆฌ)๋ง ์ํํ๋ ํจ์""" | |
# ์๋ณธ ํจ์๋ฅผ ํธ์ถํ๋, use_postprocess=False๋ก ์ค์ | |
yield from process_request( | |
prompt=prompt, | |
system_prompt=system_prompt, | |
use_images=use_images, | |
use_docling=use_docling, | |
pdf_file_path=pdf_file_path, | |
uploaded_file=uploaded_file, | |
output_filename=output_filename, | |
image_processing_mode=image_processing_mode, | |
overlap_option=overlap_option, | |
batch_size=batch_size, | |
use_postprocess=False, # ํ์ฒ๋ฆฌ ๋นํ์ฑํ | |
postprocess_prompt="", | |
progress=progress | |
) | |
def process_request_postprocessing_only( | |
batch_result: str, | |
system_prompt: str, | |
postprocess_prompt: str = "", | |
progress = None | |
) -> Generator[Tuple[str, str, str, str, str], None, None]: | |
"""ํ์ฒ๋ฆฌ(๋ฐฐ์น ๊ฒฐ๊ณผ ๋ณํฉ)๋ง ์ํํ๋ ํจ์""" | |
import time | |
global current_log_messages, current_request_info | |
current_log_messages = [] # ๋ก๊ทธ ์ด๊ธฐํ | |
current_request_info = "" # API ์์ฒญ ์ ๋ณด ์ด๊ธฐํ | |
# ์ ์ฒด ์ฒ๋ฆฌ ์์ ์๊ฐ ๊ธฐ๋ก | |
total_start_time = time.time() | |
def add_log(message): | |
current_log_messages.append(f"[{len(current_log_messages)+1:02d}] {message}") | |
log_text = "\n".join(current_log_messages) | |
# ํ์ฒ๋ฆฌ์์๋ ๋ฐฐ์น ๊ฒฐ๊ณผ๋ ๊ทธ๋๋ก ์ ์งํ๊ณ ์ต์ข ๊ฒฐ๊ณผ๋ง ์ ๋ฐ์ดํธ | |
yield batch_result, "", "", log_text, current_request_info | |
return log_text | |
# ๋ก๊ทธ ์์ฑ๊ธฐ ๊ฐ์ฒด ์์ฑ | |
log_generator = add_log("ํ์ฒ๋ฆฌ ์์...") | |
next(log_generator) # ์ฒซ ๋ฒ์งธ ๋ก๊ทธ ์์ฑ | |
try: | |
if not batch_result or not batch_result.strip(): | |
msg = "ํ์ฒ๋ฆฌํ ๋ฐฐ์น ๊ฒฐ๊ณผ๊ฐ ์์ต๋๋ค. ๋จผ์ ์ ์ฒ๋ฆฌ๋ฅผ ์ํํด์ฃผ์ธ์." | |
for result in add_log("โ ๋ฐฐ์น ๊ฒฐ๊ณผ๊ฐ ์์ต๋๋ค."): | |
yield result | |
yield batch_result, msg, "", "\n".join(current_log_messages), current_request_info | |
return | |
for result in add_log("๐ ํ์ฒ๋ฆฌ ์์ ์ ์์ํฉ๋๋ค..."): | |
yield result | |
# ํ์ฒ๋ฆฌ ํ๋กฌํํธ๊ฐ ๋น์ด์์ผ๋ฉด ๊ธฐ๋ณธ๊ฐ ์ฌ์ฉ | |
if not postprocess_prompt.strip(): | |
postprocess_prompt = load_postprocess_prompt() | |
# ๋ฐฐ์น ๊ฒฐ๊ณผ๋ค์ ํ๋์ ํ ์คํธ๋ก ํฉ์นจ | |
combined_results = f"{postprocess_prompt}\n\n=== ๋ฐฐ์น ์ฒ๋ฆฌ ๊ฒฐ๊ณผ ===\n\n{batch_result}" | |
for result in add_log("๐ค ํ์ฒ๋ฆฌ API ์์ฒญ ์ค๋น ์ค..."): | |
yield result | |
# ํ์ฒ๋ฆฌ API ์์ฒญ ์ ๋ณด๋ฅผ ๋จผ์ ๋ก๊น ํ๊ณ ์ฆ์ UI์ ํ์ | |
system_prompt_clean = system_prompt if system_prompt.strip() else load_system_prompt() | |
messages = [ | |
{"role": "system", "content": system_prompt_clean}, | |
{ | |
"role": "user", | |
"content": combined_results | |
} | |
] | |
log_api_request(messages, model) | |
# ์ฆ์ API ์์ฒญ ์ ๋ณด๋ฅผ UI์ ํ์ | |
yield batch_result, "", "", "\n".join(current_log_messages), current_request_info | |
for result in add_log("๐ค ํ์ฒ๋ฆฌ LLM API ์์ฒญ ์ค..."): | |
yield result | |
# ํ์ฒ๋ฆฌ ์์ ์๊ฐ ๊ธฐ๋ก | |
postprocess_start_time = time.time() | |
# ํ์ฒ๋ฆฌ API ํธ์ถ (์ด๋ฏธ์ง ์์ด ํ ์คํธ๋ง) | |
completion = send_chat_completion_request([], combined_results, system_prompt) | |
final_result = completion.choices[0].message.content | |
# ํ์ฒ๋ฆฌ ์๊ฐ ๊ณ์ฐ | |
postprocess_duration = time.time() - postprocess_start_time | |
for result in add_log(f"โ ํ์ฒ๋ฆฌ ์๋ฃ (์ฒ๋ฆฌ ์๊ฐ: {postprocess_duration:.1f}์ด)"): | |
yield result | |
# ์ ์ฒด ์ฒ๋ฆฌ ์๊ฐ ๊ณ์ฐ ๋ฐ ๋ก๊น | |
total_duration = time.time() - total_start_time | |
for result in add_log(f"โฑ๏ธ ์ ์ฒด ์ฒ๋ฆฌ ์๊ฐ: {total_duration:.1f}์ด"): | |
yield result | |
for result in add_log("๐ ํ์ฒ๋ฆฌ๊ฐ ์ฑ๊ณต์ ์ผ๋ก ์๋ฃ๋์์ต๋๋ค!"): | |
yield result | |
# ์ต์ข ๊ฒฐ๊ณผ ๋ฐํ (๋ฐฐ์น ๊ฒฐ๊ณผ๋ ๊ทธ๋๋ก, ์ต์ข ๊ฒฐ๊ณผ๋ง ์ ๋ฐ์ดํธ) | |
yield batch_result, final_result, "", "\n".join(current_log_messages), current_request_info | |
except Exception as e: | |
# ์ค๋ฅ ๋ฐ์ ์์๋ ์ ์ฒด ์ฒ๋ฆฌ ์๊ฐ ๊ธฐ๋ก | |
total_duration = time.time() - total_start_time | |
for result in add_log(f"โฑ๏ธ ์ ์ฒด ์ฒ๋ฆฌ ์๊ฐ: {total_duration:.1f}์ด (์ค๋ฅ๋ก ์ธํ ์ค๋จ)"): | |
yield result | |
error_msg = f"ํ์ฒ๋ฆฌ ์ค๋ฅ ๋ฐ์: {str(e)}" | |
for result in add_log(f"โ {error_msg}"): | |
yield result | |
for result in add_log("ํ์ฒ๋ฆฌ๊ฐ ์ค๋จ๋์์ต๋๋ค."): | |
yield result | |
yield batch_result, error_msg, "", "\n".join(current_log_messages), current_request_info | |