|
|
|
|
|
import os |
|
|
os.system('pip install --upgrade --pre --extra-index-url https://download.pytorch.org/whl/nightly/cu126 "torch<2.9" spaces') |
|
|
|
|
|
|
|
|
import spaces |
|
|
import torch |
|
|
from diffusers.pipelines.wan.pipeline_wan_i2v import WanImageToVideoPipeline |
|
|
from diffusers.models.transformers.transformer_wan import WanTransformer3DModel |
|
|
from diffusers.utils.export_utils import export_to_video |
|
|
import gradio as gr |
|
|
import tempfile |
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
import random |
|
|
import gc |
|
|
import logging |
|
|
from optimization import optimize_pipeline_ |
|
|
|
|
|
|
|
|
from huggingface_hub import HfApi, upload_file |
|
|
import uuid |
|
|
from datetime import datetime |
|
|
from queue import Queue |
|
|
from threading import Thread |
|
|
import time |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MODEL_ID = "Wan-AI/Wan2.2-I2V-A14B-Diffusers" |
|
|
LANDSCAPE_WIDTH = 832 |
|
|
LANDSCAPE_HEIGHT = 480 |
|
|
MAX_SEED = np.iinfo(np.int32).max |
|
|
FIXED_FPS = 16 |
|
|
MIN_FRAMES_MODEL = 8 |
|
|
MAX_FRAMES_MODEL = 81 |
|
|
MIN_DURATION = round(MIN_FRAMES_MODEL/FIXED_FPS,1) |
|
|
MAX_DURATION = round(MAX_FRAMES_MODEL/FIXED_FPS,1) |
|
|
HF_MODEL = os.environ.get("HF_UPLOAD_REPO", "rahul7star/VideoExplain") |
|
|
|
|
|
default_prompt_i2v = "make this image come alive, cinematic motion, smooth animation" |
|
|
default_negative_prompt = "色调艳丽, 过曝, 静态, 细节模糊不清, 字幕, 风格, 作品, 画作, 画面, 静止, 整体发灰, 最差质量, 低质量, JPEG压缩残留, 丑陋的, 残缺的, 多余的手指, 画得不好的手部, 画得不好的脸部, 畸形的, 毁容的, 形态畸形的肢体, 手指融合, 静止不动的画面, 杂乱的背景, 三条腿, 背景人很多, 倒着走" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pipe = WanImageToVideoPipeline.from_pretrained(MODEL_ID, |
|
|
transformer=WanTransformer3DModel.from_pretrained( |
|
|
'cbensimon/Wan2.2-I2V-A14B-bf16-Diffusers', |
|
|
subfolder='transformer', |
|
|
torch_dtype=torch.bfloat16, |
|
|
device_map='cuda', |
|
|
), |
|
|
transformer_2=WanTransformer3DModel.from_pretrained( |
|
|
'cbensimon/Wan2.2-I2V-A14B-bf16-Diffusers', |
|
|
subfolder='transformer_2', |
|
|
torch_dtype=torch.bfloat16, |
|
|
device_map='cuda', |
|
|
), |
|
|
torch_dtype=torch.bfloat16, |
|
|
).to('cuda') |
|
|
|
|
|
|
|
|
for _ in range(3): |
|
|
gc.collect() |
|
|
torch.cuda.synchronize() |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
|
|
|
optimize_pipeline_(pipe, |
|
|
image=Image.new('RGB', (LANDSCAPE_WIDTH, LANDSCAPE_HEIGHT)), |
|
|
prompt='prompt', |
|
|
height=LANDSCAPE_HEIGHT, |
|
|
width=LANDSCAPE_WIDTH, |
|
|
num_frames=MAX_FRAMES_MODEL, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
upload_queue = Queue() |
|
|
|
|
|
def upload_worker(): |
|
|
while True: |
|
|
try: |
|
|
video_path, summary_text = upload_queue.get() |
|
|
logging.info(f"⏳ Uploading video in background: {video_path}") |
|
|
upscale_and_upload_4k(video_path, summary_text) |
|
|
logging.info(f"✅ Background upload finished: {video_path}") |
|
|
except Exception as e: |
|
|
logging.error(f"Upload failed: {e}") |
|
|
time.sleep(3) |
|
|
upload_queue.task_done() |
|
|
|
|
|
Thread(target=upload_worker, daemon=True).start() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def resize_image(image: Image.Image) -> Image.Image: |
|
|
if image.height > image.width: |
|
|
transposed = image.transpose(Image.Transpose.ROTATE_90) |
|
|
resized = resize_image_landscape(transposed) |
|
|
return resized.transpose(Image.Transpose.ROTATE_270) |
|
|
return resize_image_landscape(image) |
|
|
|
|
|
def resize_image_landscape(image: Image.Image) -> Image.Image: |
|
|
target_aspect = LANDSCAPE_WIDTH / LANDSCAPE_HEIGHT |
|
|
width, height = image.size |
|
|
in_aspect = width / height |
|
|
if in_aspect > target_aspect: |
|
|
new_width = round(height * target_aspect) |
|
|
left = (width - new_width) // 2 |
|
|
image = image.crop((left, 0, left + new_width, height)) |
|
|
else: |
|
|
new_height = round(width / target_aspect) |
|
|
top = (height - new_height) // 2 |
|
|
image = image.crop((0, top, width, top + new_height)) |
|
|
return image.resize((LANDSCAPE_WIDTH, LANDSCAPE_HEIGHT), Image.LANCZOS) |
|
|
|
|
|
def get_duration(input_image, prompt, steps, negative_prompt, duration_seconds, guidance_scale, guidance_scale_2, seed, randomize_seed, progress): |
|
|
return int(steps) * 15 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import subprocess |
|
|
|
|
|
def upscale_and_upload_4k(input_video_path: str, summary_text: str) -> str: |
|
|
logging.info(f"Upscaling video to 4K for upload: {input_video_path}") |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_upscaled: |
|
|
upscaled_path = tmp_upscaled.name |
|
|
|
|
|
cmd = [ |
|
|
"ffmpeg", |
|
|
"-i", input_video_path, |
|
|
"-vf", "scale=3840:2160:flags=lanczos", |
|
|
"-c:v", "libx264", |
|
|
"-crf", "18", |
|
|
"-preset", "slow", |
|
|
"-y", |
|
|
upscaled_path, |
|
|
] |
|
|
try: |
|
|
subprocess.run(cmd, check=True, capture_output=True) |
|
|
logging.info(f"✅ Upscaled video created at: {upscaled_path}") |
|
|
except subprocess.CalledProcessError as e: |
|
|
logging.error(f"FFmpeg failed:\n{e.stderr.decode()}") |
|
|
raise |
|
|
|
|
|
|
|
|
today_str = datetime.now().strftime("%Y-%m-%d") |
|
|
unique_subfolder = f"upload_{uuid.uuid4().hex[:8]}" |
|
|
hf_folder = f"{today_str}-WAN-I2V/{unique_subfolder}" |
|
|
|
|
|
|
|
|
video_filename = os.path.basename(input_video_path) |
|
|
video_hf_path = f"{hf_folder}/{video_filename}" |
|
|
upload_file( |
|
|
path_or_fileobj=upscaled_path, |
|
|
path_in_repo=video_hf_path, |
|
|
repo_id=HF_MODEL, |
|
|
repo_type="model", |
|
|
token=os.environ.get("HUGGINGFACE_HUB_TOKEN"), |
|
|
) |
|
|
|
|
|
|
|
|
summary_file = tempfile.NamedTemporaryFile(delete=False, suffix=".txt").name |
|
|
with open(summary_file, "w", encoding="utf-8") as f: |
|
|
f.write(summary_text) |
|
|
|
|
|
summary_hf_path = f"{hf_folder}/summary.txt" |
|
|
upload_file( |
|
|
path_or_fileobj=summary_file, |
|
|
path_in_repo=summary_hf_path, |
|
|
repo_id=HF_MODEL, |
|
|
repo_type="model", |
|
|
token=os.environ.get("HUGGINGFACE_HUB_TOKEN"), |
|
|
) |
|
|
logging.info(f"✅ Uploaded summary to HF: {summary_hf_path}") |
|
|
|
|
|
|
|
|
os.remove(upscaled_path) |
|
|
os.remove(summary_file) |
|
|
return hf_folder |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@spaces.GPU(duration=get_duration) |
|
|
def generate_video( |
|
|
input_image, |
|
|
prompt, |
|
|
steps = 4, |
|
|
negative_prompt=default_negative_prompt, |
|
|
duration_seconds = MAX_DURATION, |
|
|
guidance_scale = 1, |
|
|
guidance_scale_2 = 1, |
|
|
seed = 42, |
|
|
randomize_seed = False, |
|
|
progress=gr.Progress(track_tqdm=True), |
|
|
): |
|
|
if input_image is None: |
|
|
raise gr.Error("Please upload an input image.") |
|
|
|
|
|
num_frames = np.clip(int(round(duration_seconds * FIXED_FPS)), MIN_FRAMES_MODEL, MAX_FRAMES_MODEL) |
|
|
current_seed = random.randint(0, MAX_SEED) if randomize_seed else int(seed) |
|
|
resized_image = resize_image(input_image) |
|
|
|
|
|
output_frames_list = pipe( |
|
|
image=resized_image, |
|
|
prompt=prompt, |
|
|
negative_prompt=negative_prompt, |
|
|
height=resized_image.height, |
|
|
width=resized_image.width, |
|
|
num_frames=num_frames, |
|
|
guidance_scale=float(guidance_scale), |
|
|
guidance_scale_2=float(guidance_scale_2), |
|
|
num_inference_steps=int(steps), |
|
|
generator=torch.Generator(device="cuda").manual_seed(current_seed), |
|
|
).frames[0] |
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmpfile: |
|
|
video_path = tmpfile.name |
|
|
|
|
|
export_to_video(output_frames_list, video_path, fps=FIXED_FPS) |
|
|
|
|
|
|
|
|
upload_queue.put((video_path, prompt)) |
|
|
logging.info(f"Video queued for background upload: {video_path}") |
|
|
|
|
|
return video_path, current_seed |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
gr.Markdown("# Fast 4 steps Wan 2.2 I2V (14B) with Lightning LoRA") |
|
|
gr.Markdown("run Wan 2.2 in just 4-8 steps, with Lightning LoRA, fp8 quantization & AoT compilation") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
input_image_component = gr.Image(type="pil", label="Input Image (auto-resized)") |
|
|
prompt_input = gr.Textbox(label="Prompt", value=default_prompt_i2v) |
|
|
duration_seconds_input = gr.Slider(minimum=MIN_DURATION, maximum=MAX_DURATION, step=0.1, value=MAX_DURATION, label="Duration (seconds)") |
|
|
|
|
|
with gr.Accordion("Advanced Settings", open=False): |
|
|
negative_prompt_input = gr.Textbox(label="Negative Prompt", value=default_negative_prompt, lines=3) |
|
|
seed_input = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, step=1, value=42, interactive=True) |
|
|
randomize_seed_checkbox = gr.Checkbox(label="Randomize seed", value=True, interactive=True) |
|
|
steps_slider = gr.Slider(minimum=1, maximum=30, step=1, value=6, label="Inference Steps") |
|
|
guidance_scale_input = gr.Slider(minimum=0.0, maximum=10.0, step=0.5, value=1, label="Guidance Scale - high noise stage") |
|
|
guidance_scale_2_input = gr.Slider(minimum=0.0, maximum=10.0, step=0.5, value=1, label="Guidance Scale 2 - low noise stage") |
|
|
|
|
|
generate_button = gr.Button("Generate Video", variant="primary") |
|
|
with gr.Column(): |
|
|
video_output = gr.Video(label="Generated Video", autoplay=True, interactive=False) |
|
|
|
|
|
ui_inputs = [ |
|
|
input_image_component, prompt_input, steps_slider, |
|
|
negative_prompt_input, duration_seconds_input, |
|
|
guidance_scale_input, guidance_scale_2_input, seed_input, randomize_seed_checkbox |
|
|
] |
|
|
generate_button.click(fn=generate_video, inputs=ui_inputs, outputs=[video_output, seed_input]) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.queue().launch(mcp_server=True) |