|
import spaces |
|
import gradio as gr |
|
import numpy as np |
|
|
|
|
|
|
|
import random |
|
import torch |
|
from diffusers import StableDiffusion3Pipeline, AutoencoderKL, StableDiffusionXLImg2ImgPipeline, EulerAncestralDiscreteScheduler |
|
from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer |
|
|
|
|
|
from transformers import T5Tokenizer, T5ForConditionalGeneration |
|
import re |
|
import paramiko |
|
import urllib |
|
import time |
|
import os |
|
|
|
FTP_HOST = "1ink.us" |
|
FTP_USER = "ford442" |
|
FTP_PASS = "GoogleBez12!" |
|
FTP_DIR = "1ink.us/stable_diff/" |
|
|
|
torch.backends.cuda.matmul.allow_tf32 = False |
|
torch.backends.cuda.matmul.allow_bf16_reduced_precision_reduction = False |
|
torch.backends.cuda.matmul.allow_fp16_reduced_precision_reduction = False |
|
torch.backends.cudnn.allow_tf32 = False |
|
torch.backends.cudnn.deterministic = False |
|
|
|
torch.backends.cuda.preferred_blas_library="cublas" |
|
torch.backends.cuda.preferred_linalg_library="cusolver" |
|
|
|
torch.set_float32_matmul_precision("highest") |
|
|
|
hftoken = os.getenv("HF_AUTH_TOKEN") |
|
|
|
def upload_to_ftp(filename): |
|
try: |
|
transport = paramiko.Transport((FTP_HOST, 22)) |
|
destination_path=FTP_DIR+filename |
|
transport.connect(username = FTP_USER, password = FTP_PASS) |
|
sftp = paramiko.SFTPClient.from_transport(transport) |
|
sftp.put(filename, destination_path) |
|
sftp.close() |
|
transport.close() |
|
print(f"Uploaded {filename} to FTP server") |
|
except Exception as e: |
|
print(f"FTP upload error: {e}") |
|
|
|
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") |
|
torch_dtype = torch.bfloat16 |
|
|
|
checkpoint = "microsoft/Phi-3.5-mini-instruct" |
|
|
|
vae = AutoencoderKL.from_pretrained("ford442/sdxl-vae-bf16", torch_dtype=torch.bfloat16).to(torch.device("cuda:0")) |
|
|
|
|
|
pipe = StableDiffusion3Pipeline.from_pretrained("ford442/stable-diffusion-3.5-medium-bf16", torch_dtype=torch.bfloat16).to(torch.device("cuda:0")) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
refiner = StableDiffusionXLImg2ImgPipeline.from_pretrained("ford442/stable-diffusion-xl-refiner-1.0-bf16", vae=vae, torch_dtype=torch.bfloat16, use_safetensors=True, requires_aesthetics_score=True).to(torch.device("cuda:0")) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
refiner.scheduler = EulerAncestralDiscreteScheduler.from_config(refiner.scheduler.config) |
|
|
|
tokenizer = AutoTokenizer.from_pretrained(checkpoint, add_prefix_space=False, device_map='balanced') |
|
tokenizer.tokenizer_legacy=False |
|
model = AutoModelForCausalLM.from_pretrained(checkpoint, device_map='balanced') |
|
|
|
|
|
def filter_text(text,phraseC): |
|
"""Filters out the text up to and including 'Rewritten Prompt:'.""" |
|
phrase = "Rewritten Prompt:" |
|
phraseB = "rewritten text:" |
|
pattern = f"(.*?){re.escape(phrase)}(.*)" |
|
patternB = f"(.*?){re.escape(phraseB)}(.*)" |
|
|
|
matchB = re.search(patternB, text, flags=re.DOTALL) |
|
if matchB: |
|
filtered_text = matchB.group(2) |
|
match = re.search(pattern, filtered_text, flags=re.DOTALL) |
|
if match: |
|
filtered_text = match.group(2) |
|
filtered_text = re.sub(phraseC, "", filtered_text, flags=re.DOTALL) |
|
return filtered_text |
|
else: |
|
return filtered_text |
|
else: |
|
|
|
return text |
|
|
|
MAX_SEED = np.iinfo(np.int32).max |
|
MAX_IMAGE_SIZE = 4096 |
|
|
|
@spaces.GPU(duration=80) |
|
def infer( |
|
prompt, |
|
negative_prompt, |
|
seed, |
|
randomize_seed, |
|
width, |
|
height, |
|
guidance_scale, |
|
num_inference_steps, |
|
latent_file, |
|
progress=gr.Progress(track_tqdm=True), |
|
): |
|
seed = random.randint(0, MAX_SEED) |
|
generator = torch.Generator(device='cuda').manual_seed(seed) |
|
system_prompt_rewrite = ( |
|
"You are an AI assistant that rewrites image prompts to be more descriptive and detailed." |
|
) |
|
user_prompt_rewrite = ( |
|
"Rewrite this prompt to be more descriptive and detailed and only return the rewritten text: " |
|
) |
|
user_prompt_rewrite_2 = ( |
|
"Rephrase this scene to have more elaborate details: " |
|
) |
|
input_text = f"{system_prompt_rewrite} {user_prompt_rewrite} {prompt}" |
|
input_text_2 = f"{system_prompt_rewrite} {user_prompt_rewrite_2} {prompt}" |
|
print("-- got prompt --") |
|
|
|
encoded_inputs = tokenizer(input_text, return_tensors="pt", return_attention_mask=True) |
|
encoded_inputs_2 = tokenizer(input_text_2, return_tensors="pt", return_attention_mask=True) |
|
|
|
input_ids = encoded_inputs["input_ids"].to(device) |
|
input_ids_2 = encoded_inputs_2["input_ids"].to(device) |
|
attention_mask = encoded_inputs["attention_mask"].to(device) |
|
attention_mask_2 = encoded_inputs_2["attention_mask"].to(device) |
|
print("-- tokenize prompt --") |
|
|
|
|
|
outputs = model.generate( |
|
input_ids=input_ids, |
|
attention_mask=attention_mask, |
|
max_new_tokens=240, |
|
temperature=0.2, |
|
top_p=0.9, |
|
do_sample=True, |
|
) |
|
outputs_2 = model.generate( |
|
input_ids=input_ids_2, |
|
attention_mask=attention_mask_2, |
|
max_new_tokens=65, |
|
temperature=0.2, |
|
top_p=0.9, |
|
do_sample=True, |
|
) |
|
|
|
enhanced_prompt = tokenizer.decode(outputs[0], skip_special_tokens=True) |
|
enhanced_prompt_2 = tokenizer.decode(outputs_2[0], skip_special_tokens=True) |
|
print('-- generated prompt --') |
|
enhanced_prompt = filter_text(enhanced_prompt,prompt) |
|
enhanced_prompt_2 = filter_text(enhanced_prompt_2,prompt) |
|
print('-- filtered prompt --') |
|
print(enhanced_prompt) |
|
print('-- filtered prompt 2 --') |
|
print(enhanced_prompt_2) |
|
if latent_file: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sd_image_a = torch.load(latent_file.name) |
|
|
|
|
|
print("-- using latent file --") |
|
print('-- generating image --') |
|
with torch.no_grad(): |
|
sd_image = pipe( |
|
prompt=enhanced_prompt, |
|
negative_prompt=negative_prompt, |
|
guidance_scale=guidance_scale, |
|
num_inference_steps=num_inference_steps, |
|
width=width, |
|
height=height, |
|
latents=sd_image_a, |
|
generator=generator |
|
).images[0] |
|
else: |
|
print('-- generating image --') |
|
with torch.no_grad(): |
|
sd_image = pipe( |
|
prompt=enhanced_prompt, |
|
prompt_2=enhanced_prompt_2, |
|
prompt_3=prompt, |
|
negative_prompt=negative_prompt, |
|
guidance_scale=guidance_scale, |
|
num_inference_steps=num_inference_steps, |
|
width=width, |
|
height=height, |
|
|
|
generator=generator, |
|
).images[0] |
|
print('-- got image --') |
|
image_path = f"sd35m_{seed}.png" |
|
sd_image.save(image_path,optimize=False,compress_level=0) |
|
upload_to_ftp(image_path) |
|
|
|
generated_image_tensor = torch.tensor([np.array(sd_image).transpose(2, 0, 1)]).to('cuda') / 255.0 |
|
|
|
with torch.no_grad(): |
|
generated_latents = pipe.vae.encode(generated_image_tensor.to(torch.bfloat16)).latent_dist.sample().mul_(0.18215) |
|
latent_path = f"sd35m_{seed}.pt" |
|
|
|
torch.save(generated_latents, latent_path) |
|
upload_to_ftp(latent_path) |
|
|
|
refine = refiner( |
|
prompt=f"{enhanced_prompt_2}, high quality masterpiece, complex details", |
|
negative_prompt = negative_prompt, |
|
guidance_scale=7.5, |
|
num_inference_steps=num_inference_steps, |
|
image=sd_image, |
|
generator=generator, |
|
).images[0] |
|
refine_path = f"sd35m_refine_{seed}.png" |
|
refine.save(refine_path,optimize=False,compress_level=0) |
|
upload_to_ftp(refine_path) |
|
return refine, seed, enhanced_prompt |
|
|
|
examples = [ |
|
"Astronaut in a jungle, cold color palette, muted colors, detailed, 8k", |
|
"An astronaut riding a green horse", |
|
"A delicious ceviche cheesecake slice", |
|
] |
|
|
|
css = """ |
|
#col-container { |
|
margin: 0 auto; |
|
max-width: 640px; |
|
} |
|
""" |
|
|
|
def repeat_infer( |
|
prompt, |
|
negative_prompt, |
|
seed, |
|
randomize_seed, |
|
width, |
|
height, |
|
guidance_scale, |
|
num_inference_steps, |
|
num_iterations, |
|
): |
|
i = 0 |
|
while i < num_iterations: |
|
time.sleep(700) |
|
result, seed, image_path, enhanced_prompt = infer( |
|
prompt, |
|
negative_prompt, |
|
seed, |
|
randomize_seed, |
|
width, |
|
height, |
|
guidance_scale, |
|
num_inference_steps, |
|
) |
|
|
|
|
|
|
|
i += 1 |
|
return result, seed, image_path, enhanced_prompt |
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Origin()) as demo: |
|
with gr.Column(elem_id="col-container"): |
|
gr.Markdown(" # Text-to-Text-to-Image StableDiffusion 3.5 Medium (with refine)") |
|
expanded_prompt_output = gr.Textbox(label="Expanded Prompt", lines=5) |
|
with gr.Row(): |
|
prompt = gr.Text( |
|
label="Prompt", |
|
show_label=False, |
|
max_lines=1, |
|
placeholder="Enter your prompt", |
|
value="A captivating Christmas scene.", |
|
container=False, |
|
) |
|
run_button = gr.Button("Run", scale=0, variant="primary") |
|
result = gr.Image(label="Result", show_label=False) |
|
with gr.Accordion("Advanced Settings", open=False): |
|
latent_file = gr.File(label="Latents File (optional)") |
|
negative_prompt = gr.Text( |
|
label="Negative prompt", |
|
max_lines=1, |
|
placeholder="Enter a negative prompt", |
|
visible=False, |
|
) |
|
num_iterations = gr.Number( |
|
value=1000, |
|
label="Number of Iterations") |
|
seed = gr.Slider( |
|
label="Seed", |
|
minimum=0, |
|
maximum=MAX_SEED, |
|
step=1, |
|
value=0, |
|
) |
|
randomize_seed = gr.Checkbox(label="Randomize seed", value=True) |
|
with gr.Row(): |
|
width = gr.Slider( |
|
label="Width", |
|
minimum=256, |
|
maximum=MAX_IMAGE_SIZE, |
|
step=32, |
|
value=768, |
|
) |
|
height = gr.Slider( |
|
label="Height", |
|
minimum=256, |
|
maximum=MAX_IMAGE_SIZE, |
|
step=32, |
|
value=768, |
|
) |
|
guidance_scale = gr.Slider( |
|
label="Guidance scale", |
|
minimum=0.0, |
|
maximum=10.0, |
|
step=0.1, |
|
value=4.2, |
|
) |
|
num_inference_steps = gr.Slider( |
|
label="Number of inference steps", |
|
minimum=1, |
|
maximum=500, |
|
step=1, |
|
value=150, |
|
) |
|
gr.Examples(examples=examples, inputs=[prompt]) |
|
gr.on( |
|
triggers=[run_button.click, prompt.submit], |
|
fn=infer, |
|
inputs=[ |
|
prompt, |
|
negative_prompt, |
|
seed, |
|
randomize_seed, |
|
width, |
|
height, |
|
guidance_scale, |
|
num_inference_steps, |
|
latent_file, |
|
], |
|
outputs=[result, seed, expanded_prompt_output], |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |