Spaces:
Runtime error
Runtime error
# img_bot.py | |
# βμ΄λ―Έμ§ μ²¨λΆ β μ€ν¨ μ URL μΆλ ₯β μκ²°ν | |
import os, io, re, random, asyncio, logging, subprocess, base64 | |
from urllib.parse import urljoin, quote_plus | |
import discord | |
import requests | |
import replicate | |
from transformers import pipeline as transformers_pipeline | |
from gradio_client import Client | |
try: | |
from PIL import Image # WEBP β PNG λ³ν | |
PIL_OK = True | |
except Exception: | |
PIL_OK = False # Pillow λ―Έμ€μΉ μ λ³ν μλ΅ | |
# ββββββββββββββββββ νκ²½ λ³μ ββββββββββββββββββ | |
TOKEN = os.getenv("DISCORD_TOKEN") | |
CHANNEL_ID = int(os.getenv("DISCORD_CHANNEL_ID")) | |
REPL_TOKEN = (os.getenv("OPENAI_API_KEY") or "").strip() | |
HF_TOKEN = (os.getenv("HF_TOKEN") or "").strip() | |
if not TOKEN or not CHANNEL_ID: | |
raise RuntimeError("DISCORD_TOKEN κ³Ό DISCORD_CHANNEL_ID νκ²½ λ³μλ₯Ό λͺ¨λ μ§μ νμΈμ.") | |
if not REPL_TOKEN: | |
raise RuntimeError("OPENAI_API_KEY μ Replicate Personal Access Token κ°μ λ£μ΄μ£ΌμΈμ.") | |
os.environ["REPLICATE_API_TOKEN"] = REPL_TOKEN # (Replicate λ―Έμ¬μ©) | |
# ββββββββββββββββββ Gradio μλ² βββββββββββββββββ | |
GRADIO_URL = "http://211.233.58.201:7971" | |
GRADIO_API = "/process_and_save_image" | |
# ββββββββββββββββββ λ²μ νμ΄νλΌμΈ βββββββββββββ | |
translator = transformers_pipeline( | |
"translation", | |
model="Helsinki-NLP/opus-mt-ko-en", | |
device=-1, | |
**({"token": HF_TOKEN} if HF_TOKEN else {}) | |
) | |
async def ko2en_async(text: str) -> str: | |
"""νκΈ ν¬ν¨ μ μμ΄ λ²μ.""" | |
if not re.search(r"[κ°-ν£]", text): | |
return text | |
loop = asyncio.get_running_loop() | |
try: | |
return await loop.run_in_executor( | |
None, | |
lambda: translator(text, max_length=256, num_beams=1)[0]["translation_text"].strip() | |
) | |
except Exception as e: | |
logging.warning(f"λ²μ μ€ν¨, μλ¬Έ μ¬μ©: {e}") | |
return text | |
# ββββββββββββββββββ λ‘κΉ ββββββββββββββββββββββββ | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s [%(levelname)s] %(message)s", | |
handlers=[logging.StreamHandler()] | |
) | |
# ββββββββββββββββββ Discord ν΄λΌμ΄μΈνΈ ββββββββββ | |
intents = discord.Intents.default() | |
intents.message_content = True | |
class ImageBot(discord.Client): | |
async def on_ready(self): | |
logging.info(f"Logged in as {self.user} (id={self.user.id})") | |
try: | |
subprocess.Popen(["python", "web.py"]) | |
logging.info("web.py server has been started.") | |
except Exception as e: | |
logging.warning(f"web.py μ€ν μ€ν¨: {e}") | |
async def on_message(self, message: discord.Message): | |
if message.author.id == self.user.id or message.channel.id != CHANNEL_ID: | |
return | |
prompt_raw = message.content.strip() | |
if not prompt_raw: | |
return | |
prompt_en = await ko2en_async(prompt_raw) | |
await message.channel.typing() | |
# βββββββββββββ Gradio νΈμΆ ββββββββββββββ | |
def generate_image(): | |
client = Client(GRADIO_URL) | |
return client.predict( | |
height=768, | |
width=768, | |
steps=30, | |
scales=3.5, | |
prompt=prompt_en, | |
seed=random.randint(0, 2**32 - 1), | |
api_name=GRADIO_API | |
) | |
try: | |
result = await asyncio.get_running_loop().run_in_executor(None, generate_image) | |
except Exception as e: | |
logging.error(f"Gradio API error: {e}") | |
await message.reply("β οΈ μ΄λ―Έμ§ μμ± μ€ν¨!") | |
return | |
# 리μ€νΈ κ²°κ³Ό β 첫 μμ | |
if isinstance(result, list): | |
result = result[0] | |
# βββββββββββββ μ΄λ―Έμ§ λ°μ΄ν° ν보 βββββββββββββ | |
data = None | |
remote_path = None # μ€ν¨ μ μ¬μ©μμκ² λ³΄μ¬μ€ URL | |
candidate_urls: list[str] = [] | |
def add_remote(p: str): | |
url = urljoin(GRADIO_URL, f"/gradio_api/file={quote_plus(p)}") | |
candidate_urls.append(url) | |
return url | |
# dict κ²°κ³Ό | |
if isinstance(result, dict): | |
u = result.get("url") | |
if u: | |
if u.startswith("/"): | |
u = urljoin(GRADIO_URL, u) | |
candidate_urls.append(u) | |
remote_path = u | |
p = result.get("path") | |
if p: | |
remote_path = add_remote(p) | |
if os.path.isfile(p): | |
try: | |
with open(p, "rb") as f: | |
data = f.read() | |
except Exception: | |
pass | |
# str κ²°κ³Ό | |
elif isinstance(result, str): | |
s = result.strip() | |
if s.startswith("data:"): | |
try: | |
data = base64.b64decode(re.sub(r"^data:image/[^;]+;base64,", "", s)) | |
except Exception as e: | |
logging.warning(f"base64 λμ½λ© μ€ν¨: {e}") | |
elif s.startswith("http"): | |
candidate_urls.append(s) | |
remote_path = s | |
else: | |
remote_path = add_remote(s) | |
if s.startswith("/"): | |
candidate_urls.append(urljoin(GRADIO_URL, s)) | |
if os.path.isfile(s): | |
try: | |
with open(s, "rb") as f: | |
data = f.read() | |
except Exception: | |
pass | |
# URL λ€μ΄λ‘λ (ν€λ 무μ, 200 OK λ§ νμΈ) | |
for u in candidate_urls: | |
if data: | |
break | |
try: | |
r = requests.get(u, timeout=10) | |
if r.ok and r.content: | |
data = r.content | |
break | |
except Exception as e: | |
logging.warning(f"URL λ€μ΄λ‘λ μ€ν¨({u}): {e}") | |
# βββββββββββββ Discord μ μ‘ βββββββββββββ | |
if data: | |
if PIL_OK: # WEBP β PNG λ³ν | |
try: | |
buf = io.BytesIO() | |
Image.open(io.BytesIO(data)).convert("RGB").save(buf, format="PNG") | |
buf.seek(0) | |
await message.reply(files=[discord.File(buf, filename="image.png")]) | |
return | |
except Exception as e: | |
logging.warning(f"PNG λ³ν μ€ν¨: {e}") | |
# λ³ν μ€ν¨ λλ Pillow μμ β μλ³Έ μ μ‘ | |
await message.reply(files=[discord.File(io.BytesIO(data), filename="image.webp")]) | |
return | |
# βββββββββββββ μ€ν¨ μ URL μΆλ ₯ βββββββββββββ | |
if remote_path: | |
await message.reply(content=remote_path) | |
else: | |
await message.reply("β οΈ μ΄λ―Έμ§λ₯Ό μ μ‘ν μ μμ΅λλ€.") | |
# ββββββββββββββββββ μ€ν ββββββββββββββββββββββββ | |
if __name__ == "__main__": | |
replicate.Client(api_token=REPL_TOKEN) # ꡬ쑰 μ μ§ | |
ImageBot(intents=intents).run(TOKEN) | |