video_mcp / modal_whisper_app.py
jomasego's picture
feat: Update Modal app and .gitignore
5d99cfb
raw
history blame
22.3 kB
import modal
import os
import tempfile
import io
# Environment variable for model name, configurable in Modal UI or via .env
# This will be used by both the pre-caching function and the runtime function
WHISPER_MODEL_NAME = os.environ.get("HF_WHISPER_MODEL_NAME", "openai/whisper-large-v3")
CAPTION_MODEL_NAME = "Neleac/SpaceTimeGPT"
CAPTION_PROCESSOR_NAME = "MCG-NJU/videomae-base"
CAPTION_TOKENIZER_NAME = "gpt2" # SpaceTimeGPT uses GPT-2 as decoder
ACTION_MODEL_NAME = "MCG-NJU/videomae-base-finetuned-kinetics"
ACTION_PROCESSOR_NAME = "MCG-NJU/videomae-base-finetuned-kinetics" # Often the same as model for VideoMAE
# Initialize a Modal Dict for caching results
# The key will be a hash of the video URL or video content
video_analysis_cache = modal.Dict.from_name(
"video-analysis-cache", create_if_missing=True
)
def download_whisper_model():
import torch
from transformers import pipeline
print(f"Downloading and caching Whisper model: {WHISPER_MODEL_NAME}")
pipeline(
"automatic-speech-recognition",
model=WHISPER_MODEL_NAME,
torch_dtype=torch.float32,
device="cpu"
)
print(f"Whisper model {WHISPER_MODEL_NAME} cached successfully.")
def download_caption_model():
import torch
from transformers import VisionEncoderDecoderModel, AutoImageProcessor, AutoTokenizer
print(f"Downloading and caching caption model: {CAPTION_MODEL_NAME}")
# Download image processor
AutoImageProcessor.from_pretrained(CAPTION_PROCESSOR_NAME)
print(f"Image processor {CAPTION_PROCESSOR_NAME} cached.")
# Download tokenizer
AutoTokenizer.from_pretrained(CAPTION_TOKENIZER_NAME)
print(f"Tokenizer {CAPTION_TOKENIZER_NAME} cached.")
# Download main model
VisionEncoderDecoderModel.from_pretrained(CAPTION_MODEL_NAME)
print(f"Caption model {CAPTION_MODEL_NAME} cached successfully.")
def download_action_model():
from transformers import VideoMAEImageProcessor, VideoMAEForVideoClassification
print(f"Downloading and caching action recognition model: {ACTION_MODEL_NAME}")
# Download image processor
VideoMAEImageProcessor.from_pretrained(ACTION_PROCESSOR_NAME)
print(f"Action model processor {ACTION_PROCESSOR_NAME} cached.")
# Download main model
VideoMAEForVideoClassification.from_pretrained(ACTION_MODEL_NAME)
print(f"Action model {ACTION_MODEL_NAME} cached successfully.")
# Define the Modal image
whisper_image = (
modal.Image.debian_slim(python_version="3.10")
.apt_install("ffmpeg")
.run_commands(
"echo 'Force reinstalling moviepy...'",
"pip install --force-reinstall moviepy",
"echo 'Checking moviepy installation...'",
"pip show moviepy || echo 'pip show moviepy failed'",
"echo 'Attempting to import moviepy.editor during build:'",
"python -c 'import moviepy; print(f\"moviepy module loaded from: {moviepy.__file__}\"); from moviepy.video.io.VideoFileClip import VideoFileClip; print(\"moviepy.video.io.VideoFileClip.VideoFileClip class import successful\")'"
) # Force install moviepy and add diagnostics
.pip_install(
"transformers[torch]",
"accelerate",
"soundfile",
"moviepy", # Essential for audio extraction from video
"huggingface_hub",
"ffmpeg-python",
"av", # For video frame extraction
"fastapi[standard]" # For web endpoints
)
.run_function(download_whisper_model)
.run_function(download_caption_model)
.run_function(download_action_model) # This runs download_action_model during image build
)
app = modal.App(name="whisper-transcriber") # Changed from modal.Stub to modal.App
# Hugging Face Token - retrieve from memory and set as Modal Secret
# IMPORTANT: Create a Modal Secret named 'my-huggingface-secret' with your actual HF_TOKEN.
# Example: modal secret create my-huggingface-secret HF_TOKEN=your_hf_token_here
HF_TOKEN_SECRET = modal.Secret.from_name("my-huggingface-secret")
@app.function(
image=whisper_image,
secrets=[HF_TOKEN_SECRET],
timeout=1200,
gpu="any" # Request any available GPU
)
def transcribe_video_audio(video_bytes: bytes) -> str:
# Imports moved inside the function to avoid local ModuleNotFoundError during `modal deploy`
from moviepy.video.io.VideoFileClip import VideoFileClip # More specific import for moviepy 2.2.1
import soundfile as sf
import torch
from transformers import pipeline # This will now use the pre-cached model
from huggingface_hub import login
if not video_bytes:
return "Error: No video data received."
# Login to Hugging Face Hub using the token from Modal secrets
hf_token = os.environ.get("HF_TOKEN") # Standard key for Hugging Face token in Modal secrets if set as HF_TOKEN=...
if hf_token:
try:
login(token=hf_token)
print("Successfully logged into Hugging Face Hub.")
except Exception as e:
print(f"Hugging Face Hub login failed: {e}. Proceeding, but private models may not be accessible.")
else:
print("HF_TOKEN secret not found. Proceeding without login (works for public models).")
print(f"Processing video for transcription using model: {WHISPER_MODEL_NAME}")
# Initialize pipeline inside the function.
# For production/frequent use, consider @stub.cls to load the model once per container lifecycle.
print("Loading Whisper model...")
device_map = "cuda:0" if torch.cuda.is_available() else "cpu"
# Use float16 for GPU for faster inference and less memory, float32 for CPU
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32
transcriber = pipeline(
"automatic-speech-recognition",
model=WHISPER_MODEL_NAME,
torch_dtype=torch_dtype,
device=device_map,
)
print(f"Whisper model loaded on device: {device_map} with dtype: {torch_dtype}")
video_path = None
audio_path = None
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_file:
tmp_video_file.write(video_bytes)
video_path = tmp_video_file.name
print(f"Temporary video file saved: {video_path}")
print("Extracting audio from video...")
video_clip = VideoFileClip(video_path)
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_audio_file:
audio_path = tmp_audio_file.name
video_clip.audio.write_audiofile(audio_path, codec='pcm_s16le', logger=None)
video_clip.close()
print(f"Audio extracted to: {audio_path}")
audio_input, samplerate = sf.read(audio_path)
if audio_input.ndim > 1:
audio_input = audio_input.mean(axis=1) # Convert to mono
print(f"Audio data shape: {audio_input.shape}, Samplerate: {samplerate}")
print("Starting transcription...")
# Pass audio as a dictionary for more control, or directly as numpy array
# Adding chunk_length_s for handling long audio files better.
result = transcriber(audio_input.copy(), chunk_length_s=30, batch_size=8, return_timestamps=False, generate_kwargs={"temperature": 0.2, "no_repeat_ngram_size": 3, "language": "en"})
transcribed_text = result["text"]
print(f"Transcription successful. Length: {len(transcribed_text)}")
if len(transcribed_text) > 100:
print(f"Transcription preview: {transcribed_text[:100]}...")
else:
print(f"Transcription: {transcribed_text}")
return transcribed_text
except Exception as e:
print(f"Error during transcription process: {e}")
import traceback
traceback.print_exc()
return f"Error: Transcription failed. Details: {str(e)}"
finally:
for p in [video_path, audio_path]:
if p and os.path.exists(p):
try:
os.remove(p)
print(f"Removed temporary file: {p}")
except Exception as e_rm:
print(f"Error removing temporary file {p}: {e_rm}")
# This is a local entrypoint for testing the Modal function if you run `modal run modal_whisper_app.py`
@app.local_entrypoint()
def main():
# This is just an example of how you might test.
# You'd need a sample video file (e.g., "sample.mp4") in the same directory.
# For actual deployment, this main function isn't strictly necessary as Gradio will call the webhook.
sample_video_path = "sample.mp4"
if not os.path.exists(sample_video_path):
print(f"Sample video {sample_video_path} not found. Skipping local test run.")
return
with open(sample_video_path, "rb") as f:
video_bytes_content = f.read()
print(f"Testing transcription with {sample_video_path}...")
transcription = transcribe_video_audio.remote(video_bytes_content)
print("----")
print(f"Transcription Result: {transcription}")
print("----")
# To call this function from another Python script (after deployment):
# import modal
# Ensure the app name matches the one in modal.App(name=...)
# The exact lookup method might vary slightly with modal.App, often it's:
# deployed_app = modal.App.lookup("whisper-transcriber")
# or by accessing the function directly if the app is deployed with a name.
# For a deployed function, you might use its tag or webhook URL directly.
# Example using a direct function call if deployed and accessible:
# f = modal.Function.lookup("whisper-transcriber/transcribe_video_audio") # Or similar based on deployment output
# For invoking:
# result = f.remote(your_video_bytes) # for async
# print(result)
# Or, if you have the app object:
# result = app.functions.transcribe_video_audio.remote(your_video_bytes)
# Consult Modal documentation for the precise invocation method for your Modal version and deployment style.
# Note: When deploying to Modal, Modal uses the `app.serve()` or `app.deploy()` mechanism.
# The Gradio app will call the deployed Modal function via its HTTP endpoint.
@app.function(
image=whisper_image,
secrets=[HF_TOKEN_SECRET],
timeout=900, # Potentially shorter if model is pre-loaded and efficient
gpu="any" # Request any available GPU
)
def generate_video_caption(video_bytes: bytes) -> str:
import torch
import av # PyAV for frame extraction
from transformers import VisionEncoderDecoderModel, AutoImageProcessor, AutoTokenizer
import tempfile
import os
import numpy as np
if not video_bytes:
return "Error: No video data received for captioning."
print(f"Starting video captioning with {CAPTION_MODEL_NAME}...")
video_path = None
try:
# 1. Load pre-cached model, processor, and tokenizer
# Ensure these names match what's used in download_caption_model
image_processor = AutoImageProcessor.from_pretrained(CAPTION_PROCESSOR_NAME)
tokenizer = AutoTokenizer.from_pretrained(CAPTION_TOKENIZER_NAME)
model = VisionEncoderDecoderModel.from_pretrained(CAPTION_MODEL_NAME)
device = "cuda:0" if torch.cuda.is_available() else "cpu"
model.to(device)
print(f"Caption model loaded on device: {device}")
# 2. Save video_bytes to a temporary file to be read by PyAV
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_file:
tmp_video_file.write(video_bytes)
video_path = tmp_video_file.name
print(f"Temporary video file for captioning saved: {video_path}")
# 3. Frame extraction using PyAV
container = av.open(video_path)
# Select 8 frames evenly spaced throughout the video
# Similar to the SpaceTimeGPT example
total_frames = container.streams.video[0].frames
num_frames_to_sample = 8
indices = np.linspace(0, total_frames - 1, num_frames_to_sample, dtype=int)
frames = []
container.seek(0) # Reset stream to the beginning
frame_idx = 0
target_idx_ptr = 0
for frame in container.decode(video=0):
if target_idx_ptr < len(indices) and frame_idx == indices[target_idx_ptr]:
frames.append(frame.to_image()) # Convert to PIL Image
target_idx_ptr += 1
frame_idx += 1
if len(frames) == num_frames_to_sample:
break
container.close()
if not frames:
print("No frames extracted, cannot generate caption.")
return "Error: Could not extract frames for captioning."
print(f"Extracted {len(frames)} frames for captioning.")
# 4. Generate caption
# The SpaceTimeGPT example doesn't use a specific prompt, it generates from frames directly
pixel_values = image_processor(images=frames, return_tensors="pt").pixel_values.to(device)
# The model card for Neleac/SpaceTimeGPT uses max_length=128, num_beams=5
generated_ids = model.generate(pixel_values, max_length=128, num_beams=5)
caption = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
print(f"Generated caption: {caption}")
return caption
except Exception as e:
print(f"Error during video captioning: {e}")
import traceback
traceback.print_exc()
return f"Error: Video captioning failed. Details: {str(e)}"
finally:
if video_path and os.path.exists(video_path):
try:
os.remove(video_path)
print(f"Removed temporary video file for captioning: {video_path}")
except Exception as e_rm:
print(f"Error removing temporary captioning video file {video_path}: {e_rm}")
@app.function(
image=whisper_image,
secrets=[HF_TOKEN_SECRET],
timeout=1800, # Increased timeout for combined processing
gpu="any"
)
@modal.concurrent(max_inputs=10) # Replaces allow_concurrent_inputs
@modal.fastapi_endpoint(method="POST") # Replaces web_endpoint
async def process_video_context(video_bytes: bytes, video_url: str = None):
import json
import hashlib
if not video_bytes:
return modal.Response(status_code=400, body=json.dumps({"error": "No video data provided."}))
# Generate a cache key
# If URL is provided, use it. Otherwise, hash the video content (can be slow for large videos).
cache_key = ""
if video_url:
cache_key = hashlib.sha256(video_url.encode()).hexdigest()
else:
# Hashing large video_bytes can be memory/CPU intensive. Consider alternatives if this is an issue.
# For now, let's proceed with hashing bytes if no URL.
cache_key = hashlib.sha256(video_bytes).hexdigest()
print(f"Generated cache key: {cache_key}")
# Check cache first
if cache_key in video_analysis_cache:
print(f"Cache hit for key: {cache_key}")
cached_result = video_analysis_cache[cache_key]
return modal.Response(status_code=200, body=json.dumps(cached_result))
print(f"Cache miss for key: {cache_key}. Processing video...")
results = {}
error_messages = []
# Call transcription and captioning in parallel
transcription_future = transcribe_video_audio.spawn(video_bytes)
caption_call = generate_video_caption.spawn(video_bytes)
action_call = generate_action_labels.spawn(video_bytes) # Placeholder for now
try:
transcription_result = await transcription_future
if transcription_result.startswith("Error:"):
error_messages.append(f"Transcription: {transcription_result}")
results["transcription"] = None
else:
results["transcription"] = transcription_result
except Exception as e:
print(f"Error in transcription task: {e}")
error_messages.append(f"Transcription: Failed with exception - {str(e)}")
results["transcription"] = None
try:
caption_result = await caption_call
if caption_result.startswith("Error:"):
error_messages.append(f"Captioning: {caption_result}")
results["video_caption"] = None
else:
results["video_caption"] = caption_result
except Exception as e:
print(f"Error in captioning task: {e}")
error_messages.append(f"Captioning: Failed with exception - {str(e)}")
results["video_caption"] = None
try:
action_result = await action_call # action_result is a dict from generate_action_labels
if action_result.get("error"):
error_messages.append(f"Action recognition: {action_result.get('error')}")
results["action_recognition"] = None
else:
results["action_recognition"] = action_result.get("actions", "No actions detected or error in result format")
except Exception as e:
print(f"Error in action recognition task: {e}")
import traceback
traceback.print_exc()
error_messages.append(f"Action recognition: Failed with exception - {str(e)}")
results["action_recognition"] = None
# TODO: Add calls for object detection here in the future
results["object_detection"] = "(Object detection/tracking not yet implemented)"
if error_messages:
results["processing_errors"] = error_messages
# Store partial results in cache even if there are errors
video_analysis_cache[cache_key] = results
return modal.Response(status_code=207, body=json.dumps(results)) # 207 Multi-Status
# Store successful full result in cache
video_analysis_cache[cache_key] = results
print(f"Successfully processed and cached results for key: {cache_key}")
return modal.Response(status_code=200, body=json.dumps(results))
# Update local entrypoint to use the new main processing function if desired for testing
# For now, keeping it as is to test transcription independently if needed.
@app.function(
image=whisper_image,
secrets=[HF_TOKEN_SECRET],
timeout=700, # Increased timeout slightly for model loading and inference
gpu="any" # Requires GPU
)
def generate_action_labels(video_bytes: bytes) -> dict:
import torch
import av
import numpy as np
import tempfile
import os
from transformers import VideoMAEImageProcessor, VideoMAEForVideoClassification
from huggingface_hub import login
if not video_bytes:
return {"actions": [], "error": "No video data received."}
hf_token = os.environ.get("HF_TOKEN")
if hf_token:
try:
login(token=hf_token)
print("Action Recognition: Successfully logged into Hugging Face Hub.")
except Exception as e:
print(f"Action Recognition: Hugging Face Hub login failed: {e}.")
else:
print("Action Recognition: HF_TOKEN secret not found. Proceeding without login.")
video_path = None
try:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Action Recognition: Loading model on device: {device}")
processor = VideoMAEImageProcessor.from_pretrained(ACTION_PROCESSOR_NAME)
model = VideoMAEForVideoClassification.from_pretrained(ACTION_MODEL_NAME)
model.to(device)
model.eval()
print(f"Action Recognition: Model {ACTION_MODEL_NAME} and processor loaded.")
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_file:
tmp_video_file.write(video_bytes)
video_path = tmp_video_file.name
container = av.open(video_path)
stream = container.streams.video[0]
num_frames_to_extract = 16
total_frames = stream.frames
if total_frames == 0:
return {"actions": [], "error": "Video stream has no frames."}
# Ensure we don't try to select more frames than available, especially for very short videos
if total_frames < num_frames_to_extract:
print(f"Warning: Video has only {total_frames} frames, less than desired {num_frames_to_extract}. Using all available frames.")
num_frames_to_extract = total_frames
if num_frames_to_extract == 0: # Double check after adjustment
return {"actions": [], "error": "Video stream has no frames after adjustment."}
indices = np.linspace(0, total_frames - 1, num_frames_to_extract, dtype=int)
frames = []
container.seek(0) # Reset stream to the beginning before decoding specific frames
frame_idx_counter = 0
target_idx_ptr = 0
for frame in container.decode(video=0):
if target_idx_ptr < len(indices) and frame_idx_counter == indices[target_idx_ptr]:
frames.append(frame.to_image()) # Convert to PIL Image
target_idx_ptr += 1
frame_idx_counter += 1
if target_idx_ptr == len(indices):
break
container.close()
if not frames:
return {"actions": [], "error": "Could not extract frames from video."}
print(f"Action Recognition: Extracted {len(frames)} frames.")
# Process frames and predict
inputs = processor(frames, return_tensors="pt").to(device)
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
predicted_class_idx = logits.argmax(-1).item()
predicted_label = model.config.id2label[predicted_class_idx]
print(f"Action Recognition: Predicted action: {predicted_label}")
return {"actions": [predicted_label], "error": None}
except Exception as e:
print(f"Error during action recognition: {e}")
import traceback
traceback.print_exc()
return {"actions": [], "error": f"Action recognition failed: {str(e)}"}
finally:
if video_path and os.path.exists(video_path):
try:
os.remove(video_path)
print(f"Removed temporary video file for action recognition: {video_path}")
except Exception as e_rm:
print(f"Error removing temporary action recognition video file {video_path}: {e_rm}")