import os import base64 import requests from typing import List, Optional, Generator, Tuple from openai import OpenAI from docling.document_converter import DocumentConverter import glob from pdf2image import convert_from_path from PIL import Image import tempfile import shutil import fitz # PyMuPDF 추가 # Initialize the docling DocumentConverter converter = DocumentConverter() # 전역 설정 openai_api_key = "EMPTY" openai_api_base = "http://118.38.20.101:8080/v1" model = "Qwen/Qwen2.5-VL-7B-Instruct-AWQ" # 전역 로그 시스템 current_log_messages = [] current_request_info = "" # 누적된 API 요청 정보 저장 # OpenAI 클라이언트 초기화 client = OpenAI( api_key=openai_api_key, base_url=openai_api_base ) def load_system_prompt() -> str: """시스템 프롬프트 파일을 로드합니다.""" try: with open("prompt_system.txt", "r", encoding="utf-8") as f: return f.read().strip() except Exception as e: print(f"시스템 프롬프트 파일 로드 오류: {e}") return "당신은 이력서 분석을 도와주는 AI 어시스턴트입니다." def load_user_prompt() -> str: """사용자 프롬프트 파일을 로드합니다.""" try: with open("prompt_user.txt", "r", encoding="utf-8") as f: return f.read().strip() except Exception as e: print(f"사용자 프롬프트 파일 로드 오류: {e}") return "[텍스트 추출 작업]\n\n첨부된 이미지들은 이력서를 이미지화 한 결과물이야. 이미지의 내용 및 레이아웃을 참고해서 이력서의 내용을 정리 후 마크다운 형식으로 정리해줘." def load_postprocess_prompt() -> str: """후처리 프롬프트 파일을 로드합니다.""" try: with open("prompt_postprocess.txt", "r", encoding="utf-8") as f: return f.read().strip() except Exception as e: print(f"후처리 프롬프트 파일 로드 오류: {e}") return "[텍스트 병합 작업]\n배치 작업으로 수집된 텍스트입니다. 이제 완전한 형태의 이력서를 만들어 주세요. 출력 포맷은 마크다운입니다." def encode_image_base64_from_url(image_path: str) -> str: """Encode an image retrieved from a file path or url to base64 format.""" try: if isinstance(image_path, str) and 'http' in image_path: with requests.get(image_path) as response: response.raise_for_status() result = base64.b64encode(response.content).decode('utf-8') return result elif isinstance(image_path, str) and os.path.isfile(image_path): with open(image_path, 'rb') as image_file: result = base64.b64encode(image_file.read()).decode('utf-8') return result else: raise ValueError(f"Invalid image URL or file path: {image_path}") except Exception as e: print(f"Error encoding image: {e}") raise def convert_pdf_to_images(pdf_path: str, dpi: int = 200) -> List[str]: """Convert PDF to images using pdf2image and return list of image file paths.""" try: # PDF를 이미지로 변환 images = convert_from_path(pdf_path, dpi=dpi) # 임시 디렉토리 생성 temp_dir = tempfile.mkdtemp() image_paths = [] for i, image in enumerate(images): # 이미지를 임시 파일로 저장 image_path = os.path.join(temp_dir, f"page_{i+1:03d}.png") image.save(image_path, "PNG") image_paths.append(image_path) return image_paths except Exception as e: print(f"Error converting PDF to images: {e}") raise def combine_images_horizontally(image_paths: List[str]) -> List[str]: """Combine images in pairs horizontally. Returns list of combined image paths.""" if not image_paths: return [] combined_paths = [] temp_dir = tempfile.mkdtemp() # 2장씩 가로로 붙이기 for i in range(0, len(image_paths), 2): if i + 1 < len(image_paths): # 2장을 가로로 붙이기 img1 = Image.open(image_paths[i]) img2 = Image.open(image_paths[i + 1]) # 높이를 맞춤 (더 높은 쪽 기준) max_height = max(img1.height, img2.height) # 비율을 유지하면서 높이 조정 if img1.height != max_height: ratio = max_height / img1.height img1 = img1.resize((int(img1.width * ratio), max_height), Image.Resampling.LANCZOS) if img2.height != max_height: ratio = max_height / img2.height img2 = img2.resize((int(img2.width * ratio), max_height), Image.Resampling.LANCZOS) # 가로로 붙이기 combined_width = img1.width + img2.width combined_image = Image.new('RGB', (combined_width, max_height), 'white') combined_image.paste(img1, (0, 0)) combined_image.paste(img2, (img1.width, 0)) # 저장 combined_path = os.path.join(temp_dir, f"combined_{i//2 + 1:03d}.png") combined_image.save(combined_path, "PNG") combined_paths.append(combined_path) img1.close() img2.close() combined_image.close() else: # 홀수 개의 경우 마지막 이미지는 그대로 복사 img = Image.open(image_paths[i]) single_path = os.path.join(temp_dir, f"single_{i//2 + 1:03d}.png") img.save(single_path, "PNG") combined_paths.append(single_path) img.close() return combined_paths def combine_images_vertically(image_paths: List[str]) -> List[str]: """Combine images in pairs vertically. Returns list of combined image paths.""" if not image_paths: return [] combined_paths = [] temp_dir = tempfile.mkdtemp() # 2장씩 세로로 붙이기 for i in range(0, len(image_paths), 2): if i + 1 < len(image_paths): # 2장을 세로로 붙이기 img1 = Image.open(image_paths[i]) img2 = Image.open(image_paths[i + 1]) # 너비를 맞춤 (더 넓은 쪽 기준) max_width = max(img1.width, img2.width) # 비율을 유지하면서 너비 조정 if img1.width != max_width: ratio = max_width / img1.width img1 = img1.resize((max_width, int(img1.height * ratio)), Image.Resampling.LANCZOS) if img2.width != max_width: ratio = max_width / img2.width img2 = img2.resize((max_width, int(img2.height * ratio)), Image.Resampling.LANCZOS) # 세로로 붙이기 combined_height = img1.height + img2.height combined_image = Image.new('RGB', (max_width, combined_height), 'white') combined_image.paste(img1, (0, 0)) combined_image.paste(img2, (0, img1.height)) # 저장 combined_path = os.path.join(temp_dir, f"vertical_combined_{i//2 + 1:03d}.png") combined_image.save(combined_path, "PNG") combined_paths.append(combined_path) img1.close() img2.close() combined_image.close() else: # 홀수 개의 경우 마지막 이미지는 그대로 복사 img = Image.open(image_paths[i]) single_path = os.path.join(temp_dir, f"vertical_single_{i//2 + 1:03d}.png") img.save(single_path, "PNG") combined_paths.append(single_path) img.close() return combined_paths def combine_images_with_overlap(image_paths: List[str], direction: str = "horizontal") -> List[str]: """Combine images with sliding window (overlap). Returns list of combined image paths.""" if not image_paths or len(image_paths) < 2: return image_paths combined_paths = [] temp_dir = tempfile.mkdtemp() # 슬라이딩 윈도우로 2장씩 병합 (1,2), (2,3), (3,4), (4,5)... for i in range(len(image_paths) - 1): img1 = Image.open(image_paths[i]) img2 = Image.open(image_paths[i + 1]) if direction == "horizontal": # 가로 병합 - 높이를 맞춤 max_height = max(img1.height, img2.height) if img1.height != max_height: ratio = max_height / img1.height img1 = img1.resize((int(img1.width * ratio), max_height), Image.Resampling.LANCZOS) if img2.height != max_height: ratio = max_height / img2.height img2 = img2.resize((int(img2.width * ratio), max_height), Image.Resampling.LANCZOS) # 가로로 붙이기 combined_width = img1.width + img2.width combined_image = Image.new('RGB', (combined_width, max_height), 'white') combined_image.paste(img1, (0, 0)) combined_image.paste(img2, (img1.width, 0)) combined_path = os.path.join(temp_dir, f"overlap_h_{i+1}_{i+2}.png") else: # vertical # 세로 병합 - 너비를 맞춤 max_width = max(img1.width, img2.width) if img1.width != max_width: ratio = max_width / img1.width img1 = img1.resize((max_width, int(img1.height * ratio)), Image.Resampling.LANCZOS) if img2.width != max_width: ratio = max_width / img2.width img2 = img2.resize((max_width, int(img2.height * ratio)), Image.Resampling.LANCZOS) # 세로로 붙이기 combined_height = img1.height + img2.height combined_image = Image.new('RGB', (max_width, combined_height), 'white') combined_image.paste(img1, (0, 0)) combined_image.paste(img2, (0, img1.height)) combined_path = os.path.join(temp_dir, f"overlap_v_{i+1}_{i+2}.png") combined_image.save(combined_path, "PNG") combined_paths.append(combined_path) img1.close() img2.close() combined_image.close() return combined_paths def create_prompt_content_with_image(image_paths: List[str], prompt: str) -> list: """Create a prompt content with image URLs.""" if not image_paths: return [{"type": "text", "text": prompt}] else: content = [{"type": "text", "text": prompt}] for path in image_paths: try: content.append({ "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{encode_image_base64_from_url(path)}" }, }) except Exception as e: print(f"Error encoding image {path}: {e}") return content def log_api_request(messages: List[dict], model_name: str) -> str: """Log the actual API request content for debugging - appends to accumulated requests.""" import json import datetime global current_request_info # 현재 시간 가져오기 timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 실제 API 요청과 같은 구조를 생성 (이미지 데이터는 축약) api_request = { "model": model_name, "messages": [] } for message in messages: message_copy = {"role": message.get("role", "unknown")} content = message.get("content", "") if isinstance(content, str): # 텍스트 내용인 경우 그대로 포함 message_copy["content"] = content elif isinstance(content, list): # 멀티모달 콘텐츠인 경우 content_copy = [] for item in content: if item.get("type") == "text": content_copy.append({ "type": "text", "text": item.get("text", "") }) elif item.get("type") == "image_url": image_url = item.get("image_url", {}).get("url", "") if image_url.startswith("data:image"): # Base64 이미지 데이터는 축약해서 표시 content_copy.append({ "type": "image_url", "image_url": { "url": f"data:image/*;base64,[BASE64_DATA_{len(image_url)}_CHARS]" } }) else: content_copy.append({ "type": "image_url", "image_url": {"url": image_url} }) message_copy["content"] = content_copy api_request["messages"].append(message_copy) # JSON으로 포맷팅 request_json = json.dumps(api_request, ensure_ascii=False, indent=2) # 새 요청을 구분선과 함께 기존 내용에 append separator = f"\n{'='*80}\n🚀 API 요청 [{timestamp}]\n{'='*80}\n" new_request = f"{separator}{request_json}\n" if current_request_info: current_request_info += new_request else: # 첫 번째 요청인 경우 구분선 없이 시작 current_request_info = f"🚀 API 요청 [{timestamp}]\n{'='*80}\n{request_json}\n" return current_request_info def send_chat_completion_request(image_paths: List[str], prompt: str, system_prompt: str = ""): """Send a chat completion request with images.""" # 시스템 프롬프트가 비어있으면 파일에서 로드 if not system_prompt.strip(): system_prompt = load_system_prompt() messages = [ {"role": "system", "content": system_prompt}, { "role": "user", "content": create_prompt_content_with_image(image_paths, prompt), }, ] # 실제 API 호출 (로깅은 process_request에서 별도로 처리) return client.chat.completions.create(model=model, messages=messages) def process_images_in_batches(image_paths: List[str], prompt: str, system_prompt: str, batch_size: int = 3) -> List[str]: """Process images in batches of specified size and return list of results.""" if not image_paths: return [] results = [] # 이미지를 배치 크기로 나누어 처리 for i in range(0, len(image_paths), batch_size): batch_images = image_paths[i:i + batch_size] try: # 배치 정보를 프롬프트에 추가 # batch_prompt = f"{prompt}\n\n[배치 {i//batch_size + 1}/{(len(image_paths) + batch_size - 1)//batch_size}]" batch_prompt = f"{prompt}" # API 요청 completion = send_chat_completion_request(batch_images, batch_prompt, system_prompt) response_content = completion.choices[0].message.content print(response_content) results.append(response_content) except Exception as e: results.append(f"배치 {i//batch_size + 1} 처리 중 오류 발생: {str(e)}") return results def merge_batch_results(results: List[str]) -> str: """Merge results from multiple batches into a single document.""" if not results: return "" if len(results) == 1: return results[0] merged_result = "" # merged_result = "# 모든 이력서 분석 결과\n\n" # merged_result += f"총 {len(results)}개 이미지를 인식했습니다.\n\n" for i, result in enumerate(results, 1): # merged_result += f"## {i} 번째 이미지 인식 결과\n\n" merged_result += result # merged_result += "\n\n---\n\n" return merged_result def get_pdf_files(): """Get list of PDF files in the resume_samples directory.""" pdf_files = glob.glob("./resume_samples/**/*.pdf", recursive=True) if not pdf_files: # PDF 파일이 없으면 기본 디렉토리가 있는지 확인 os.makedirs("./resume_samples/pdf/text", exist_ok=True) return [] return sorted(pdf_files) def save_result_to_file(content: str, filename: str) -> str: """Save the analysis result to a markdown file.""" if not content: return "저장할 내용이 없습니다." if not filename: return "파일 이름이 지정되지 않았습니다. 파일 이름을 입력해주세요." # 확장자 추가 if not filename.endswith('.md'): filename += '.md' try: with open(filename, "w", encoding="utf-8") as f: f.write(content) return f"결과가 {filename}에 저장되었습니다." except Exception as e: return f"파일 저장 오류: {str(e)}" def extract_text_with_fitz(pdf_path: str) -> str: """PDF에서 Fitz(PyMuPDF)로 텍스트 추출""" try: doc = fitz.open(pdf_path) text_content = "" for page_num in range(len(doc)): page = doc.load_page(page_num) text_content += f"## Page {page_num + 1}\n\n" text_content += page.get_text("text") if page_num < len(doc) - 1: text_content += "\n\n---\n\n" doc.close() return text_content except Exception as e: return f"Fitz 텍스트 추출 오류: {str(e)}" def extract_text_with_docling(pdf_path: str) -> str: """PDF에서 Docling으로 텍스트 추출 (OCR 포함)""" try: result = converter.convert(pdf_path) return result.document.export_to_markdown() except Exception as e: return f"Docling 텍스트 추출 오류: {str(e)}" def preview_image_processing(pdf_path: str, processing_mode: str = "가로 병합 (2페이지씩)", overlap_option: str = "일반 병합") -> List[str]: """PDF를 이미지로 변환하고 선택한 방식으로 처리한 결과를 미리보기용으로 반환""" try: if not pdf_path or not os.path.exists(pdf_path): return [] # PDF를 이미지로 변환 pdf_images = convert_pdf_to_images(pdf_path) # 처리 방식에 따라 분기 if processing_mode == "가로 병합 (2페이지씩)": if overlap_option == "중복 병합 (슬라이딩 윈도우)": processed_images = combine_images_with_overlap(pdf_images, "horizontal") else: processed_images = combine_images_horizontally(pdf_images) elif processing_mode == "세로 병합 (2페이지씩)": if overlap_option == "중복 병합 (슬라이딩 윈도우)": processed_images = combine_images_with_overlap(pdf_images, "vertical") else: processed_images = combine_images_vertically(pdf_images) else: # "낱개 페이지" processed_images = pdf_images return processed_images except Exception as e: print(f"이미지 처리 미리보기 오류: {e}") return [] def process_request( prompt: str, system_prompt: str, use_images: bool, use_docling: bool, pdf_file_path: str, uploaded_file: str, output_filename: str, image_processing_mode: str = "가로 병합 (2페이지씩)", overlap_option: str = "일반 병합", batch_size: int = 3, use_postprocess: bool = True, postprocess_prompt: str = "", progress = None ) -> Generator[Tuple[str, str, str, str, str], None, None]: """Process the request with all the options and yield intermediate results.""" import time # 전체 처리 시작 시간 기록 total_start_time = time.time() global current_log_messages, current_request_info current_log_messages = [] # 로그 초기화 current_request_info = "" # API 요청 정보 초기화 # 파일 경로 결정 (업로드된 파일이 있으면 우선 사용) final_pdf_path = uploaded_file if uploaded_file else pdf_file_path # 초기화 full_prompt = prompt docling_output = "" images_to_use = [] temp_dirs_to_cleanup = [] response_content = "" # 최종 결과 batch_content = "" # 배치 처리 결과 def add_log(message): current_log_messages.append(f"[{len(current_log_messages)+1:02d}] {message}") log_text = "\n".join(current_log_messages) # 중간 결과를 yield합니다 (배치 결과, 최종 결과, 파싱 결과, 로그, API 요청 정보) yield batch_content, response_content, docling_output, log_text, current_request_info return log_text # 로그 생성기 객체 생성 log_generator = add_log("시작...") next(log_generator) # 첫 번째 로그 생성 try: # PDF 파일이 선택되지 않은 경우 if not final_pdf_path or not os.path.exists(final_pdf_path): msg = "PDF 파일을 선택하거나 업로드해 주세요." for result in add_log("❌ PDF 파일이 선택되지 않았습니다."): yield result yield "", msg, "", "\n".join(current_log_messages), current_request_info return for result in add_log(f"✅ 처리할 PDF 파일: {os.path.basename(final_pdf_path)}"): yield result # PDF를 이미지로 자동 변환 (이미지 사용이 활성화된 경우) if use_images: for result in add_log("🖼️ PDF를 이미지로 변환 중..."): yield result print(f"PDF를 이미지로 변환 중: {final_pdf_path}") # PDF를 이미지로 변환 pdf_images = convert_pdf_to_images(final_pdf_path) temp_dirs_to_cleanup.extend([os.path.dirname(path) for path in pdf_images]) for result in add_log(f"📄 PDF에서 {len(pdf_images)}개 페이지 추출 완료"): yield result # 이미지 처리 방식에 따라 분기 if image_processing_mode == "가로 병합 (2페이지씩)": if overlap_option == "중복 병합 (슬라이딩 윈도우)": # 중복 가로 병합 for result in add_log("🔗 페이지들을 슬라이딩 윈도우 방식으로 가로 병합 중..."): yield result combined_images = combine_images_with_overlap(pdf_images, "horizontal") for result in add_log(f"✅ {len(combined_images)}개의 중복 가로 병합 이미지 생성 완료"): yield result else: # 일반 가로 병합 for result in add_log("🔗 페이지들을 2장씩 가로로 병합 중..."): yield result combined_images = combine_images_horizontally(pdf_images) for result in add_log(f"✅ {len(combined_images)}개의 가로 병합 이미지 생성 완료"): yield result temp_dirs_to_cleanup.extend([os.path.dirname(path) for path in combined_images]) images_to_use = combined_images print(f"PDF에서 {len(pdf_images)}개 페이지를 {len(combined_images)}개 가로 병합 이미지로 변환 완료") elif image_processing_mode == "세로 병합 (2페이지씩)": if overlap_option == "중복 병합 (슬라이딩 윈도우)": # 중복 세로 병합 for result in add_log("🔗 페이지들을 슬라이딩 윈도우 방식으로 세로 병합 중..."): yield result combined_images = combine_images_with_overlap(pdf_images, "vertical") for result in add_log(f"✅ {len(combined_images)}개의 중복 세로 병합 이미지 생성 완료"): yield result else: # 일반 세로 병합 for result in add_log("🔗 페이지들을 2장씩 세로로 병합 중..."): yield result combined_images = combine_images_vertically(pdf_images) for result in add_log(f"✅ {len(combined_images)}개의 세로 병합 이미지 생성 완료"): yield result temp_dirs_to_cleanup.extend([os.path.dirname(path) for path in combined_images]) images_to_use = combined_images print(f"PDF에서 {len(pdf_images)}개 페이지를 {len(combined_images)}개 세로 병합 이미지로 변환 완료") else: # "낱개 페이지" # 페이지를 그대로 사용 images_to_use = pdf_images for result in add_log(f"✅ {len(pdf_images)}개의 개별 페이지 이미지 준비 완료"): yield result print(f"PDF에서 {len(pdf_images)}개 페이지를 개별 이미지로 사용") # docling 처리 if use_docling: for result in add_log("📝 Docling으로 PDF 텍스트 파싱 중..."): yield result try: result = converter.convert(final_pdf_path) docling_output = result.document.export_to_markdown() full_prompt += f"\n\n파싱된 이력서 내용: {docling_output}" for result in add_log(f"✅ 텍스트 파싱 완료 (길이: {len(docling_output)} 문자)"): yield result except Exception as e: error_msg = f"Docling 변환 오류: {str(e)}" for result in add_log(f"❌ Docling 변환 오류: {str(e)}"): yield result for result in add_log(f"❌ 처리 중단됨"): yield result yield "", error_msg, docling_output, "\n".join(current_log_messages), current_request_info return # 이미지 처리 (배치 처리) if images_to_use: # 시스템 프롬프트 정보만 로그에 추가 (길이 제거) for result in add_log(f"🤖 시스템 프롬프트: {system_prompt[:50]}{'...' if len(system_prompt) > 50 else ''}"): yield result if len(images_to_use) <= batch_size: # 설정된 배치 크기 이하면 한 번에 처리 for result in add_log(f"📤 API 요청 준비 중... (이미지 {len(images_to_use)}장)"): yield result # API 요청 정보를 먼저 로깅하고 즉시 UI에 표시 system_prompt_clean = system_prompt if system_prompt.strip() else load_system_prompt() messages = [ {"role": "system", "content": system_prompt_clean}, { "role": "user", "content": create_prompt_content_with_image(images_to_use, full_prompt) } ] log_api_request(messages, model) # 즉시 API 요청 정보를 UI에 표시 yield "", "", docling_output, "\n".join(current_log_messages), current_request_info for result in add_log(f"🤖 LLM API 요청 중... (이미지 {len(images_to_use)}장)"): yield result # 배치 시작 시간 기록 import time batch_start_time = time.time() completion = send_chat_completion_request(images_to_use, full_prompt, system_prompt) response_content = completion.choices[0].message.content batch_content = response_content # 단일 배치는 배치 결과와 동일 print(response_content) # 배치 처리 시간 계산 batch_duration = time.time() - batch_start_time for result in add_log(f"✅ LLM 분석 완료 (처리 시간: {batch_duration:.1f}초)"): yield result else: # 설정된 배치 크기 초과면 배치로 나누어 처리 num_batches = (len(images_to_use) + batch_size - 1) // batch_size for result in add_log(f"📦 이미지가 {len(images_to_use)}장이므로 {num_batches}개 배치로 나누어 처리 (배치당 {batch_size}장)"): yield result print(f"이미지가 {len(images_to_use)}장이므로 배치 처리를 시작합니다. (배치당 {batch_size}장)") batch_results = [] for i in range(0, len(images_to_use), batch_size): batch_num = i // batch_size + 1 batch_images = images_to_use[i:i + batch_size] for result in add_log(f"📤 배치 {batch_num}/{num_batches} API 요청 준비 중... (이미지 {len(batch_images)}장)"): yield result # API 요청 정보를 먼저 로깅하고 즉시 UI에 표시 batch_prompt = f"{full_prompt}" system_prompt_clean = system_prompt if system_prompt.strip() else load_system_prompt() messages = [ {"role": "system", "content": system_prompt_clean}, { "role": "user", "content": create_prompt_content_with_image(batch_images, batch_prompt) } ] log_api_request(messages, model) # 즉시 API 요청 정보를 UI에 표시 yield "", "", docling_output, "\n".join(current_log_messages), current_request_info for result in add_log(f"🤖 배치 {batch_num}/{num_batches} 처리 중... (이미지 {len(batch_images)}장)"): yield result try: # 배치 시작 시간 기록 import time batch_start_time = time.time() completion = send_chat_completion_request(batch_images, batch_prompt, system_prompt) batch_response = completion.choices[0].message.content batch_results.append(batch_response) print(batch_response) # 배치 처리 시간 계산 batch_duration = time.time() - batch_start_time for result in add_log(f"✅ 배치 {batch_num} 완료 (처리 시간: {batch_duration:.1f}초)"): yield result except Exception as e: batch_results.append(f"배치 {batch_num} 처리 중 오류 발생: {str(e)}") for result in add_log(f"❌ 배치 {batch_num} 오류: {str(e)}"): yield result batch_content = merge_batch_results(batch_results) # 배치 결과 저장 response_content = batch_content # 초기 결과는 배치 결과와 동일 for result in add_log("🔗 모든 배치 결과 병합 완료"): yield result else: # 이미지가 없으면 텍스트만 처리 - 시스템 프롬프트 정보만 표시 for result in add_log(f"🤖 시스템 프롬프트: {system_prompt[:50]}{'...' if len(system_prompt) > 50 else ''}"): yield result for result in add_log("📤 텍스트 전용 API 요청 준비 중..."): yield result # API 요청 정보를 먼저 로깅하고 즉시 UI에 표시 system_prompt_clean = system_prompt if system_prompt.strip() else load_system_prompt() messages = [ {"role": "system", "content": system_prompt_clean}, { "role": "user", "content": create_prompt_content_with_image([], full_prompt) } ] log_api_request(messages, model) # 즉시 API 요청 정보를 UI에 표시 yield "", "", docling_output, "\n".join(current_log_messages), current_request_info for result in add_log("🤖 텍스트 전용 LLM API 요청 중..."): yield result # 텍스트 전용 처리 시작 시간 기록 import time text_start_time = time.time() completion = send_chat_completion_request([], full_prompt, system_prompt) response_content = completion.choices[0].message.content batch_content = response_content # 텍스트 전용은 배치 결과와 동일 print(response_content) # 텍스트 전용 처리 시간 계산 text_duration = time.time() - text_start_time for result in add_log(f"✅ 텍스트 분석 완료 (처리 시간: {text_duration:.1f}초)"): yield result # 후처리 수행 (다중 배치인 경우에만) if use_postprocess and len(images_to_use) > batch_size: for result in add_log("🔄 후처리 작업을 시작합니다..."): yield result # 후처리 프롬프트가 비어있으면 기본값 사용 if not postprocess_prompt.strip(): postprocess_prompt = load_postprocess_prompt() # 배치 결과들을 하나의 텍스트로 합침 combined_results = f"{postprocess_prompt}\n\n=== 배치 처리 결과 ===\n\n{response_content}" for result in add_log("📤 후처리 API 요청 준비 중..."): yield result # 후처리 API 요청 정보를 먼저 로깅하고 즉시 UI에 표시 system_prompt_clean = system_prompt if system_prompt.strip() else load_system_prompt() messages = [ {"role": "system", "content": system_prompt_clean}, { "role": "user", "content": combined_results } ] log_api_request(messages, model) # 즉시 API 요청 정보를 UI에 표시 yield "", "", docling_output, "\n".join(current_log_messages), current_request_info for result in add_log("🤖 후처리 LLM API 요청 중..."): yield result # 후처리 시작 시간 기록 import time postprocess_start_time = time.time() # 후처리 API 호출 (이미지 없이 텍스트만) completion = send_chat_completion_request([], combined_results, system_prompt) response_content = completion.choices[0].message.content print(response_content) # 후처리 시간 계산 postprocess_duration = time.time() - postprocess_start_time for result in add_log(f"✅ 후처리 완료 (처리 시간: {postprocess_duration:.1f}초)"): yield result # 전체 처리 시간 계산 및 로깅 total_duration = time.time() - total_start_time for result in add_log(f"⏱️ 전체 처리 시간: {total_duration:.1f}초"): yield result for result in add_log("🎉 모든 처리가 성공적으로 완료되었습니다!"): yield result yield batch_content, response_content, docling_output, "\n".join(current_log_messages), current_request_info except Exception as e: # 오류 발생 시에도 전체 처리 시간 기록 total_duration = time.time() - total_start_time for result in add_log(f"⏱️ 전체 처리 시간: {total_duration:.1f}초 (오류로 인한 중단)"): yield result error_msg = f"오류 발생: {str(e)}" for result in add_log(f"❌ {error_msg}"): yield result for result in add_log("처리가 중단되었습니다."): yield result yield "", error_msg, docling_output, "\n".join(current_log_messages), current_request_info finally: # 임시 디렉토리 정리 if temp_dirs_to_cleanup: for result in add_log("🧹 임시 파일 정리 중..."): yield result for temp_dir in set(temp_dirs_to_cleanup): # 중복 제거 try: if os.path.exists(temp_dir): shutil.rmtree(temp_dir) print(f"임시 디렉토리 정리: {temp_dir}") except Exception as e: print(f"임시 디렉토리 정리 실패: {temp_dir}, 오류: {e}") def process_request_preprocessing_only( prompt: str, system_prompt: str, use_images: bool, use_docling: bool, pdf_file_path: str, uploaded_file: str, output_filename: str, image_processing_mode: str = "가로 병합 (2페이지씩)", overlap_option: str = "일반 병합", batch_size: int = 3, progress = None ) -> Generator[Tuple[str, str, str, str, str], None, None]: """전처리(배치 처리)만 수행하는 함수""" # 원본 함수를 호출하되, use_postprocess=False로 설정 yield from process_request( prompt=prompt, system_prompt=system_prompt, use_images=use_images, use_docling=use_docling, pdf_file_path=pdf_file_path, uploaded_file=uploaded_file, output_filename=output_filename, image_processing_mode=image_processing_mode, overlap_option=overlap_option, batch_size=batch_size, use_postprocess=False, # 후처리 비활성화 postprocess_prompt="", progress=progress ) def process_request_postprocessing_only( batch_result: str, system_prompt: str, postprocess_prompt: str = "", progress = None ) -> Generator[Tuple[str, str, str, str, str], None, None]: """후처리(배치 결과 병합)만 수행하는 함수""" import time global current_log_messages, current_request_info current_log_messages = [] # 로그 초기화 current_request_info = "" # API 요청 정보 초기화 # 전체 처리 시작 시간 기록 total_start_time = time.time() def add_log(message): current_log_messages.append(f"[{len(current_log_messages)+1:02d}] {message}") log_text = "\n".join(current_log_messages) # 후처리에서는 배치 결과는 그대로 유지하고 최종 결과만 업데이트 yield batch_result, "", "", log_text, current_request_info return log_text # 로그 생성기 객체 생성 log_generator = add_log("후처리 시작...") next(log_generator) # 첫 번째 로그 생성 try: if not batch_result or not batch_result.strip(): msg = "후처리할 배치 결과가 없습니다. 먼저 전처리를 수행해주세요." for result in add_log("❌ 배치 결과가 없습니다."): yield result yield batch_result, msg, "", "\n".join(current_log_messages), current_request_info return for result in add_log("🔄 후처리 작업을 시작합니다..."): yield result # 후처리 프롬프트가 비어있으면 기본값 사용 if not postprocess_prompt.strip(): postprocess_prompt = load_postprocess_prompt() # 배치 결과들을 하나의 텍스트로 합침 combined_results = f"{postprocess_prompt}\n\n=== 배치 처리 결과 ===\n\n{batch_result}" for result in add_log("📤 후처리 API 요청 준비 중..."): yield result # 후처리 API 요청 정보를 먼저 로깅하고 즉시 UI에 표시 system_prompt_clean = system_prompt if system_prompt.strip() else load_system_prompt() messages = [ {"role": "system", "content": system_prompt_clean}, { "role": "user", "content": combined_results } ] log_api_request(messages, model) # 즉시 API 요청 정보를 UI에 표시 yield batch_result, "", "", "\n".join(current_log_messages), current_request_info for result in add_log("🤖 후처리 LLM API 요청 중..."): yield result # 후처리 시작 시간 기록 postprocess_start_time = time.time() # 후처리 API 호출 (이미지 없이 텍스트만) completion = send_chat_completion_request([], combined_results, system_prompt) final_result = completion.choices[0].message.content # 후처리 시간 계산 postprocess_duration = time.time() - postprocess_start_time for result in add_log(f"✅ 후처리 완료 (처리 시간: {postprocess_duration:.1f}초)"): yield result # 전체 처리 시간 계산 및 로깅 total_duration = time.time() - total_start_time for result in add_log(f"⏱️ 전체 처리 시간: {total_duration:.1f}초"): yield result for result in add_log("🎉 후처리가 성공적으로 완료되었습니다!"): yield result # 최종 결과 반환 (배치 결과는 그대로, 최종 결과만 업데이트) yield batch_result, final_result, "", "\n".join(current_log_messages), current_request_info except Exception as e: # 오류 발생 시에도 전체 처리 시간 기록 total_duration = time.time() - total_start_time for result in add_log(f"⏱️ 전체 처리 시간: {total_duration:.1f}초 (오류로 인한 중단)"): yield result error_msg = f"후처리 오류 발생: {str(e)}" for result in add_log(f"❌ {error_msg}"): yield result for result in add_log("후처리가 중단되었습니다."): yield result yield batch_result, error_msg, "", "\n".join(current_log_messages), current_request_info