smollvm / src /video_processor /processor.py
youssef
fix dockerfile
c9f0527
raw
history blame
7.42 kB
import torch
from transformers import AutoProcessor, AutoModelForImageTextToText
from typing import List, Dict
import logging
import os
import subprocess
import json
import tempfile
import time
logger = logging.getLogger(__name__)
def _grab_best_device(use_gpu=True):
if torch.cuda.device_count() > 0 and use_gpu:
device = "cuda"
else:
device = "cpu"
return device
def get_video_duration_seconds(video_path: str) -> float:
"""Use ffprobe to get video duration in seconds."""
cmd = [
"ffprobe",
"-v", "quiet",
"-print_format", "json",
"-show_format",
video_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
info = json.loads(result.stdout)
return float(info["format"]["duration"])
def format_duration(seconds: int) -> str:
minutes = seconds // 60
secs = seconds % 60
return f"{minutes:02d}:{secs:02d}"
DEVICE = _grab_best_device()
logger.info(f"Using device: {DEVICE}")
class VideoAnalyzer:
def __init__(self):
if not torch.cuda.is_available():
raise RuntimeError("CUDA is required but not available!")
logger.info("Initializing VideoAnalyzer")
self.model_path = "HuggingFaceTB/SmolVLM2-500M-Video-Instruct"
logger.info(f"Loading model from {self.model_path} - Using device: {DEVICE}")
# Load processor and model
self.processor = AutoProcessor.from_pretrained(self.model_path)
self.model = AutoModelForImageTextToText.from_pretrained(
self.model_path,
torch_dtype=torch.bfloat16,
device_map=DEVICE,
_attn_implementation="flash_attention_2",
low_cpu_mem_usage=True,
).to(DEVICE)
# Compile model for faster inference
self.model = torch.compile(self.model, mode="reduce-overhead")
logger.info(f"Model loaded and compiled on device: {self.model.device}")
def analyze_segment(self, video_path: str, start_time: float) -> str:
"""Analyze a single video segment."""
messages = [
{
"role": "system",
"content": [
{
"type": "text",
"text": (
"You are an AI specialized in video content analysis. "
"Your task is to watch the provided video segment and generate a detailed, structured description focusing on the following elements:\n"
"1. **People and Their Actions:** Identify all individuals, their appearances, and describe their activities or interactions.\n"
"2. **Environment and Setting:** Describe the location, time of day, weather conditions, and any notable background details.\n"
"3. **Objects and Their Positions:** List prominent objects, their attributes, and spatial relationships within the scene.\n"
"4. **On-Screen Text:** Transcribe any visible text, including signs, labels, or subtitles, and specify their locations.\n"
"5. **Key Events and Timing:** Outline significant events, actions, or changes, along with their timestamps.\n\n"
"Provide the information in a clear and concise manner, using bullet points or numbered lists where appropriate."
)
}
]
},
{
"role": "user",
"content": [
{"type": "video", "path": video_path},
{
"type": "text",
"text": (
"Please analyze the attached video segment and provide a structured description as per the guidelines above. "
"If certain elements are not present in the video, you may omit them from your response."
)
}
]
}
]
inputs = self.processor.apply_chat_template(
messages,
add_generation_prompt=True,
tokenize=True,
return_dict=True,
return_tensors="pt"
).to(DEVICE, dtype=torch.bfloat16)
with torch.inference_mode():
outputs = self.model.generate(
**inputs,
do_sample=True,
temperature=0.7,
max_new_tokens=256,
)
return self.processor.batch_decode(outputs, skip_special_tokens=True)[0].split("Assistant: ")[-1]
def process_video(self, video_path: str, segment_length: int = 10) -> List[Dict]:
try:
# Create temp directory for segments
temp_dir = tempfile.mkdtemp()
# Get video duration
duration = get_video_duration_seconds(video_path)
total_segments = (int(duration) + segment_length - 1) // segment_length
logger.info(f"Processing {total_segments} segments for video of length {duration:.2f} seconds")
# Process video in segments
for segment_idx in range(total_segments):
segment_start_time = time.time()
start_time = segment_idx * segment_length
end_time = min(start_time + segment_length, duration)
# Skip if we've reached the end
if start_time >= duration:
break
# Create segment - Optimized ffmpeg settings
segment_path = os.path.join(temp_dir, f"segment_{start_time}.mp4")
cmd = [
"ffmpeg",
"-y",
"-i", video_path,
"-ss", str(start_time),
"-t", str(segment_length),
"-c:v", "libx264",
"-preset", "ultrafast", # Use ultrafast preset for speed
"-pix_fmt", "yuv420p", # Ensure compatible pixel format
segment_path
]
ffmpeg_start = time.time()
subprocess.run(cmd, check=True)
ffmpeg_time = time.time() - ffmpeg_start
# Analyze segment
inference_start = time.time()
description = self.analyze_segment(segment_path, start_time)
inference_time = time.time() - inference_start
# Add segment info with timestamp
yield {
"timestamp": format_duration(int(start_time)),
"description": description,
"processing_times": {
"ffmpeg": ffmpeg_time,
"inference": inference_time,
"total": time.time() - segment_start_time
}
}
# Clean up segment file
os.remove(segment_path)
logger.info(
f"Segment {segment_idx + 1}/{total_segments} ({start_time}-{end_time}s) - "
f"FFmpeg: {ffmpeg_time:.2f}s, Inference: {inference_time:.2f}s"
)
# Clean up temp directory
os.rmdir(temp_dir)
except Exception as e:
logger.error(f"Error processing video: {str(e)}", exc_info=True)
raise