fantos's picture
Update app.py
2473562 verified
raw
history blame
7.36 kB
# img_bot.py
import os, io, re, random, asyncio, logging, subprocess, base64
from urllib.parse import urljoin, quote_plus
import discord
import requests
import replicate
from transformers import pipeline as transformers_pipeline
from gradio_client import Client
from PIL import Image # WEBP β†’ PNG λ³€ν™˜μš©
# ────────────────── ν™˜κ²½ λ³€μˆ˜ ──────────────────
TOKEN = os.getenv("DISCORD_TOKEN")
CHANNEL_ID = int(os.getenv("DISCORD_CHANNEL_ID"))
REPL_TOKEN = (os.getenv("OPENAI_API_KEY") or "").strip()
HF_TOKEN = (os.getenv("HF_TOKEN") or "").strip()
if not TOKEN or not CHANNEL_ID:
raise RuntimeError("DISCORD_TOKEN κ³Ό DISCORD_CHANNEL_ID ν™˜κ²½ λ³€μˆ˜λ₯Ό λͺ¨λ‘ μ§€μ •ν•˜μ„Έμš”.")
if not REPL_TOKEN:
raise RuntimeError("OPENAI_API_KEY 에 Replicate Personal Access Token 값을 λ„£μ–΄μ£Όμ„Έμš”.")
os.environ["REPLICATE_API_TOKEN"] = REPL_TOKEN # Replicate λ―Έμ‚¬μš©μ΄μ§€λ§Œ ꡬ쑰 μœ μ§€
# ────────────────── Gradio μ„œλ²„ ─────────────────
GRADIO_URL = "http://211.233.58.201:7971"
GRADIO_API = "/process_and_save_image"
# ────────────────── λ²ˆμ—­ νŒŒμ΄ν”„λΌμΈ ─────────────
translator = transformers_pipeline(
"translation",
model="Helsinki-NLP/opus-mt-ko-en",
device=-1,
**({"token": HF_TOKEN} if HF_TOKEN else {})
)
async def ko2en_async(text: str) -> str:
"""ν•œκΈ€ 포함 μ‹œ μ˜μ–΄ λ²ˆμ—­(비동기)."""
if not re.search(r"[κ°€-힣]", text):
return text
loop = asyncio.get_running_loop()
try:
return await loop.run_in_executor(
None,
lambda: translator(text, max_length=256, num_beams=1)[0]["translation_text"].strip()
)
except Exception as e:
logging.warning(f"λ²ˆμ—­ μ‹€νŒ¨, 원문 μ‚¬μš©: {e}")
return text
# ────────────────── λ‘œκΉ… ────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler()]
)
# ────────────────── Discord ν΄λΌμ΄μ–ΈνŠΈ ──────────
intents = discord.Intents.default()
intents.message_content = True
class ImageBot(discord.Client):
async def on_ready(self):
logging.info(f"Logged in as {self.user} (id={self.user.id})")
try:
subprocess.Popen(["python", "web.py"])
logging.info("web.py server has been started.")
except Exception as e:
logging.warning(f"web.py μ‹€ν–‰ μ‹€νŒ¨: {e}")
async def on_message(self, message: discord.Message):
if message.author.id == self.user.id or message.channel.id != CHANNEL_ID:
return
prompt_raw = message.content.strip()
if not prompt_raw:
return
prompt_en = await ko2en_async(prompt_raw)
await message.channel.typing()
# ───────────── Gradio 호좜 ──────────────
def generate_image():
client = Client(GRADIO_URL)
return client.predict(
height=768,
width=768,
steps=30,
scales=3.5,
prompt=prompt_en,
seed=random.randint(0, 2**32 - 1),
api_name=GRADIO_API
)
try:
result = await asyncio.get_running_loop().run_in_executor(None, generate_image)
except Exception as e:
logging.error(f"Gradio API error: {e}")
await message.reply("⚠️ 이미지 생성 μ‹€νŒ¨!")
return
# κ²°κ³Ό μ •κ·œν™”(list β†’ 첫 μš”μ†Œ)
if isinstance(result, list):
result = result[0]
# ───────────── 이미지 데이터 확보 ─────────────
data = None
candidate_urls = []
# dict 결과 처리
if isinstance(result, dict):
# url ν•„λ“œ
url = result.get("url")
if url:
if url.startswith("/"):
url = urljoin(GRADIO_URL, url)
candidate_urls.append(url)
# path ν•„λ“œ
path = result.get("path")
if path:
# β‘  원격 Gradio 파일 경둜
candidate_urls.append(urljoin(GRADIO_URL, f"/gradio_api/file={quote_plus(path)}"))
# β‘‘ 둜컬 파일 경둜
if os.path.isfile(path):
try:
with open(path, "rb") as f:
data = f.read()
except Exception as e:
logging.warning(f"둜컬 파일 μ—΄κΈ° μ‹€νŒ¨: {e}")
# str 결과 처리
elif isinstance(result, str):
s = result.strip()
if s.startswith("data:"):
# data URI
try:
data = base64.b64decode(re.sub(r"^data:image/[^;]+;base64,", "", s))
except Exception as e:
logging.warning(f"base64 λ””μ½”λ”© μ‹€νŒ¨: {e}")
elif s.startswith("http"):
candidate_urls.append(s)
elif s.startswith("/"):
candidate_urls.append(urljoin(GRADIO_URL, s))
else: # 둜컬/원격 tmp 경둜
candidate_urls.append(urljoin(GRADIO_URL, f"/gradio_api/file={quote_plus(s)}"))
if os.path.isfile(s):
try:
with open(s, "rb") as f:
data = f.read()
except Exception as e:
logging.warning(f"둜컬 파일 μ—΄κΈ° μ‹€νŒ¨: {e}")
# URL λͺ©λ‘μ—μ„œ λ‹€μš΄λ‘œλ“œ
for u in candidate_urls:
if data:
break
try:
r = requests.get(u, timeout=10)
if r.ok and r.headers.get("content-type", "").startswith("image/"):
data = r.content
break
except Exception as e:
logging.warning(f"URL λ‹€μš΄λ‘œλ“œ μ‹€νŒ¨({u}): {e}")
if not data:
await message.reply("⚠️ 이미지λ₯Ό 전솑할 수 μ—†μŠ΅λ‹ˆλ‹€.")
return
# ───────────── WEBP β†’ PNG λ³€ν™˜ ─────────────
buf = io.BytesIO()
try:
Image.open(io.BytesIO(data)).convert("RGB").save(buf, format="PNG")
buf.seek(0)
await message.reply(files=[discord.File(buf, filename="image.png")])
except Exception as e:
logging.warning(f"PNG λ³€ν™˜ μ‹€νŒ¨: {e}")
# λ³€ν™˜ μ‹€νŒ¨ β†’ 원본 ν™•μž₯자 μœ μ§€
await message.reply(files=[discord.File(io.BytesIO(data), filename="image.webp")])
# ────────────────── μ‹€ν–‰ ────────────────────────
if __name__ == "__main__":
replicate.Client(api_token=REPL_TOKEN) # ꡬ쑰 μœ μ§€
ImageBot(intents=intents).run(TOKEN)