|
|
|
|
|
|
|
import logging |
|
import torch |
|
import gc |
|
from typing import List, Dict, Any |
|
|
|
from src.utils import ( |
|
ProcessingContext, |
|
cleanup_memory, |
|
LOG_LEVEL_MAP, |
|
EMOJI_MAP |
|
) |
|
|
|
|
|
|
|
|
|
def clear_gpu_memory(): |
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
torch.cuda.synchronize() |
|
gc.collect() |
|
|
|
logging.info( |
|
f"{EMOJI_MAP['INFO']} GPU memory cleared - " |
|
f"Allocated: {torch.cuda.memory_allocated() / 1024**2:.2f}MB, " |
|
f"Reserved: {torch.cuda.memory_reserved() / 1024**2:.2f}MB" |
|
) |
|
|
|
def get_gpu_memory_info() -> Dict[str, float]: |
|
if not torch.cuda.is_available(): |
|
return {"available": False} |
|
|
|
return { |
|
"available": True, |
|
"allocated_mb": torch.cuda.memory_allocated() / 1024**2, |
|
"reserved_mb": torch.cuda.memory_reserved() / 1024**2, |
|
"max_allocated_mb": torch.cuda.max_memory_allocated() / 1024**2 |
|
} |
|
|
|
|
|
|
|
|
|
def process_batch_with_gpu( |
|
contexts: List[ProcessingContext], |
|
batch_size: int = 4 |
|
) -> List[ProcessingContext]: |
|
|
|
total_contexts = len(contexts) |
|
processed = 0 |
|
|
|
for i in range(0, total_contexts, batch_size): |
|
batch = contexts[i:i + batch_size] |
|
batch_nums = f"{i+1}-{min(i+batch_size, total_contexts)}/{total_contexts}" |
|
|
|
logging.info( |
|
f"{EMOJI_MAP['PROCESSING']} Processing batch {batch_nums}" |
|
) |
|
|
|
for ctx in batch: |
|
if ctx.skip_processing: |
|
continue |
|
|
|
try: |
|
yield ctx |
|
processed += 1 |
|
except Exception as e: |
|
logging.error( |
|
f"{EMOJI_MAP['ERROR']} Error processing {ctx.url}: {str(e)}" |
|
) |
|
ctx.error = str(e) |
|
ctx.skip_processing = True |
|
|
|
if torch.cuda.is_available() and i + batch_size < total_contexts: |
|
clear_gpu_memory() |
|
|
|
logging.info( |
|
f"{EMOJI_MAP['SUCCESS']} Processed {processed}/{total_contexts} images" |
|
) |
|
|
|
|
|
|
|
|
|
def with_gpu_memory_management(func): |
|
def wrapper(*args, **kwargs): |
|
try: |
|
clear_gpu_memory() |
|
result = func(*args, **kwargs) |
|
return result |
|
finally: |
|
clear_gpu_memory() |
|
|
|
wrapper.__name__ = func.__name__ |
|
return wrapper |
|
|