fantos's picture
Update app.py
c3a9e90 verified
raw
history blame
6.46 kB
# img_bot.py
import os, io, re, random, asyncio, logging, subprocess, base64
from urllib.parse import urljoin, quote_plus
import discord
import requests
import replicate
from transformers import pipeline as transformers_pipeline
from gradio_client import Client
from PIL import Image # WEBP β†’ PNG λ³€ν™˜μš©
# ────────────────── ν™˜κ²½ λ³€μˆ˜ ──────────────────
TOKEN = os.getenv("DISCORD_TOKEN")
CHANNEL_ID = int(os.getenv("DISCORD_CHANNEL_ID"))
REPL_TOKEN = (os.getenv("OPENAI_API_KEY") or "").strip()
HF_TOKEN = (os.getenv("HF_TOKEN") or "").strip()
if not TOKEN or not CHANNEL_ID:
raise RuntimeError("DISCORD_TOKEN κ³Ό DISCORD_CHANNEL_ID ν™˜κ²½ λ³€μˆ˜λ₯Ό λͺ¨λ‘ μ§€μ •ν•˜μ„Έμš”.")
if not REPL_TOKEN:
raise RuntimeError("OPENAI_API_KEY 에 Replicate Personal Access Token 값을 λ„£μ–΄μ£Όμ„Έμš”.")
os.environ["REPLICATE_API_TOKEN"] = REPL_TOKEN # Replicate λ―Έμ‚¬μš©μ΄μ§€λ§Œ ꡬ쑰 μœ μ§€
# ────────────────── Gradio μ„œλ²„ ─────────────────
GRADIO_URL = "http://211.233.58.201:7971"
GRADIO_API = "/process_and_save_image"
# ────────────────── λ²ˆμ—­ νŒŒμ΄ν”„λΌμΈ ─────────────
translator = transformers_pipeline(
"translation",
model="Helsinki-NLP/opus-mt-ko-en",
device=-1,
**({"token": HF_TOKEN} if HF_TOKEN else {})
)
async def ko2en_async(text: str) -> str:
"""ν•œκΈ€ 포함 μ‹œ μ˜μ–΄ λ²ˆμ—­(비동기)."""
if not re.search(r"[κ°€-힣]", text):
return text
loop = asyncio.get_running_loop()
try:
return await loop.run_in_executor(
None,
lambda: translator(text, max_length=256, num_beams=1)[0]["translation_text"].strip()
)
except Exception as e:
logging.warning(f"λ²ˆμ—­ μ‹€νŒ¨, 원문 μ‚¬μš©: {e}")
return text
# ────────────────── λ‘œκΉ… ────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler()]
)
# ────────────────── Discord ν΄λΌμ΄μ–ΈνŠΈ ──────────
intents = discord.Intents.default()
intents.message_content = True
class ImageBot(discord.Client):
async def on_ready(self):
logging.info(f"Logged in as {self.user} (id={self.user.id})")
try:
subprocess.Popen(["python", "web.py"])
logging.info("web.py server has been started.")
except Exception as e:
logging.warning(f"web.py μ‹€ν–‰ μ‹€νŒ¨: {e}")
async def on_message(self, message: discord.Message):
if message.author.id == self.user.id or message.channel.id != CHANNEL_ID:
return
prompt_raw = message.content.strip()
if not prompt_raw:
return
prompt_en = await ko2en_async(prompt_raw)
await message.channel.typing()
# ───────────── Gradio 호좜 ──────────────
def generate_image():
client = Client(GRADIO_URL)
return client.predict(
height=768,
width=768,
steps=30,
scales=3.5,
prompt=prompt_en,
seed=random.randint(0, 2**32 - 1),
api_name=GRADIO_API
)
try:
result = await asyncio.get_running_loop().run_in_executor(None, generate_image)
except Exception as e:
logging.error(f"Gradio API error: {e}")
await message.reply("⚠️ 이미지 생성 μ‹€νŒ¨!")
return
# Gradio κ²°κ³Ό β†’ list / dict / str 쀑 첫 μš”μ†Œλ‘œ μ •κ·œν™”
if isinstance(result, list):
result = result[0]
# ───────────── URL 있으면 링크둜 전솑 ─────────────
url = None
if isinstance(result, dict):
url = result.get("url")
if url and url.startswith("/"):
url = urljoin(GRADIO_URL, url)
elif isinstance(result, str) and result.startswith(("http", "/")):
url = result if result.startswith("http") else urljoin(GRADIO_URL, result)
if url:
await message.reply(content=url) # Discordκ°€ embed 미리보기
return
# ───────────── 첨뢀 파일 μ€€λΉ„ ─────────────
data = None
# dict β†’ path
if isinstance(result, dict):
path = result.get("path")
if path:
# Gradio 파일 λ‹€μš΄λ‘œλ“œ μš°μ„ 
remote = urljoin(GRADIO_URL, f"/gradio_api/file={quote_plus(path)}")
try:
data = requests.get(remote, timeout=10).content
except Exception:
if os.path.isfile(path):
with open(path, "rb") as f:
data = f.read()
# str β†’ data URI
elif isinstance(result, str) and result.startswith("data:"):
try:
b64 = re.sub(r"^data:image/[^;]+;base64,", "", result)
data = base64.b64decode(b64)
except Exception as e:
logging.warning(f"base64 λ””μ½”λ”© μ‹€νŒ¨: {e}")
if not data:
await message.reply("⚠️ 이미지λ₯Ό 전솑할 수 μ—†μŠ΅λ‹ˆλ‹€.")
return
# ───────────── WEBP β†’ PNG λ³€ν™˜ (미리보기용) ─────────────
buf = io.BytesIO()
try:
Image.open(io.BytesIO(data)).convert("RGB").save(buf, format="PNG")
buf.seek(0)
await message.reply(files=[discord.File(buf, filename="image.png")])
except Exception:
# λ³€ν™˜ μ‹€νŒ¨ μ‹œ 원본 κ·ΈλŒ€λ‘œ
await message.reply(files=[discord.File(io.BytesIO(data), filename="image.webp")])
# ────────────────── μ‹€ν–‰ ────────────────────────
if __name__ == "__main__":
replicate.Client(api_token=REPL_TOKEN) # ꡬ쑰 μœ μ§€
ImageBot(intents=intents).run(TOKEN)