Spaces:
Runtime error
Runtime error
# img_bot.py | |
import os, io, re, random, asyncio, logging, subprocess, base64 | |
from urllib.parse import urljoin, quote_plus | |
import discord | |
import requests | |
import replicate | |
from transformers import pipeline as transformers_pipeline | |
from gradio_client import Client | |
from PIL import Image # WEBP β PNG λ³νμ© | |
# ββββββββββββββββββ νκ²½ λ³μ ββββββββββββββββββ | |
TOKEN = os.getenv("DISCORD_TOKEN") | |
CHANNEL_ID = int(os.getenv("DISCORD_CHANNEL_ID")) | |
REPL_TOKEN = (os.getenv("OPENAI_API_KEY") or "").strip() | |
HF_TOKEN = (os.getenv("HF_TOKEN") or "").strip() | |
if not TOKEN or not CHANNEL_ID: | |
raise RuntimeError("DISCORD_TOKEN κ³Ό DISCORD_CHANNEL_ID νκ²½ λ³μλ₯Ό λͺ¨λ μ§μ νμΈμ.") | |
if not REPL_TOKEN: | |
raise RuntimeError("OPENAI_API_KEY μ Replicate Personal Access Token κ°μ λ£μ΄μ£ΌμΈμ.") | |
os.environ["REPLICATE_API_TOKEN"] = REPL_TOKEN # Replicate λ―Έμ¬μ©μ΄μ§λ§ ꡬ쑰 μ μ§ | |
# ββββββββββββββββββ Gradio μλ² βββββββββββββββββ | |
GRADIO_URL = "http://211.233.58.201:7971" | |
GRADIO_API = "/process_and_save_image" | |
# ββββββββββββββββββ λ²μ νμ΄νλΌμΈ βββββββββββββ | |
translator = transformers_pipeline( | |
"translation", | |
model="Helsinki-NLP/opus-mt-ko-en", | |
device=-1, | |
**({"token": HF_TOKEN} if HF_TOKEN else {}) | |
) | |
async def ko2en_async(text: str) -> str: | |
"""νκΈ ν¬ν¨ μ μμ΄ λ²μ(λΉλκΈ°).""" | |
if not re.search(r"[κ°-ν£]", text): | |
return text | |
loop = asyncio.get_running_loop() | |
try: | |
return await loop.run_in_executor( | |
None, | |
lambda: translator(text, max_length=256, num_beams=1)[0]["translation_text"].strip() | |
) | |
except Exception as e: | |
logging.warning(f"λ²μ μ€ν¨, μλ¬Έ μ¬μ©: {e}") | |
return text | |
# ββββββββββββββββββ λ‘κΉ ββββββββββββββββββββββββ | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s [%(levelname)s] %(message)s", | |
handlers=[logging.StreamHandler()] | |
) | |
# ββββββββββββββββββ Discord ν΄λΌμ΄μΈνΈ ββββββββββ | |
intents = discord.Intents.default() | |
intents.message_content = True | |
class ImageBot(discord.Client): | |
async def on_ready(self): | |
logging.info(f"Logged in as {self.user} (id={self.user.id})") | |
try: | |
subprocess.Popen(["python", "web.py"]) | |
logging.info("web.py server has been started.") | |
except Exception as e: | |
logging.warning(f"web.py μ€ν μ€ν¨: {e}") | |
async def on_message(self, message: discord.Message): | |
if message.author.id == self.user.id or message.channel.id != CHANNEL_ID: | |
return | |
prompt_raw = message.content.strip() | |
if not prompt_raw: | |
return | |
prompt_en = await ko2en_async(prompt_raw) | |
await message.channel.typing() | |
# βββββββββββββ Gradio νΈμΆ ββββββββββββββ | |
def generate_image(): | |
client = Client(GRADIO_URL) | |
return client.predict( | |
height=768, | |
width=768, | |
steps=30, | |
scales=3.5, | |
prompt=prompt_en, | |
seed=random.randint(0, 2**32 - 1), | |
api_name=GRADIO_API | |
) | |
try: | |
result = await asyncio.get_running_loop().run_in_executor(None, generate_image) | |
except Exception as e: | |
logging.error(f"Gradio API error: {e}") | |
await message.reply("β οΈ μ΄λ―Έμ§ μμ± μ€ν¨!") | |
return | |
# κ²°κ³Ό μ κ·ν(list β 첫 μμ) | |
if isinstance(result, list): | |
result = result[0] | |
# βββββββββββββ μ΄λ―Έμ§ λ°μ΄ν° ν보 βββββββββββββ | |
data = None | |
candidate_urls = [] | |
# dict κ²°κ³Ό μ²λ¦¬ | |
if isinstance(result, dict): | |
# url νλ | |
url = result.get("url") | |
if url: | |
if url.startswith("/"): | |
url = urljoin(GRADIO_URL, url) | |
candidate_urls.append(url) | |
# path νλ | |
path = result.get("path") | |
if path: | |
# β μ격 Gradio νμΌ κ²½λ‘ | |
candidate_urls.append(urljoin(GRADIO_URL, f"/gradio_api/file={quote_plus(path)}")) | |
# β‘ λ‘컬 νμΌ κ²½λ‘ | |
if os.path.isfile(path): | |
try: | |
with open(path, "rb") as f: | |
data = f.read() | |
except Exception as e: | |
logging.warning(f"λ‘컬 νμΌ μ΄κΈ° μ€ν¨: {e}") | |
# str κ²°κ³Ό μ²λ¦¬ | |
elif isinstance(result, str): | |
s = result.strip() | |
if s.startswith("data:"): | |
# data URI | |
try: | |
data = base64.b64decode(re.sub(r"^data:image/[^;]+;base64,", "", s)) | |
except Exception as e: | |
logging.warning(f"base64 λμ½λ© μ€ν¨: {e}") | |
elif s.startswith("http"): | |
candidate_urls.append(s) | |
elif s.startswith("/"): | |
candidate_urls.append(urljoin(GRADIO_URL, s)) | |
else: # λ‘컬/μ격 tmp κ²½λ‘ | |
candidate_urls.append(urljoin(GRADIO_URL, f"/gradio_api/file={quote_plus(s)}")) | |
if os.path.isfile(s): | |
try: | |
with open(s, "rb") as f: | |
data = f.read() | |
except Exception as e: | |
logging.warning(f"λ‘컬 νμΌ μ΄κΈ° μ€ν¨: {e}") | |
# URL λͺ©λ‘μμ λ€μ΄λ‘λ | |
for u in candidate_urls: | |
if data: | |
break | |
try: | |
r = requests.get(u, timeout=10) | |
if r.ok and r.headers.get("content-type", "").startswith("image/"): | |
data = r.content | |
break | |
except Exception as e: | |
logging.warning(f"URL λ€μ΄λ‘λ μ€ν¨({u}): {e}") | |
if not data: | |
await message.reply("β οΈ μ΄λ―Έμ§λ₯Ό μ μ‘ν μ μμ΅λλ€.") | |
return | |
# βββββββββββββ WEBP β PNG λ³ν βββββββββββββ | |
buf = io.BytesIO() | |
try: | |
Image.open(io.BytesIO(data)).convert("RGB").save(buf, format="PNG") | |
buf.seek(0) | |
await message.reply(files=[discord.File(buf, filename="image.png")]) | |
except Exception as e: | |
logging.warning(f"PNG λ³ν μ€ν¨: {e}") | |
# λ³ν μ€ν¨ β μλ³Έ νμ₯μ μ μ§ | |
await message.reply(files=[discord.File(io.BytesIO(data), filename="image.webp")]) | |
# ββββββββββββββββββ μ€ν ββββββββββββββββββββββββ | |
if __name__ == "__main__": | |
replicate.Client(api_token=REPL_TOKEN) # ꡬ쑰 μ μ§ | |
ImageBot(intents=intents).run(TOKEN) | |