Depthflow / main.py
Mbonea's picture
background_task
ab7efde
import os, uuid
import tempfile
import requests
from mimetypes import guess_extension
from PIL import Image
from io import BytesIO
import subprocess
from fastapi import (
FastAPI,
UploadFile,
File,
HTTPException,
Response,
Request,
BackgroundTasks,
)
from typing import List, Optional
import asyncio, aiofiles
from fastapi.responses import StreamingResponse, FileResponse
app = FastAPI()
def download_image(image_url: str):
# Create a temporary directory
temp_dir = tempfile.mkdtemp()
# Get the image content
response = requests.get(image_url)
response.raise_for_status()
# Detect the image type
image = Image.open(BytesIO(response.content))
image_format = image.format.lower()
image_extension = guess_extension(f"image/{image_format}")
if image_extension is None:
raise ValueError("Cannot detect image file type.")
# Define the image file path
image_path = os.path.join(temp_dir, f"image{image_extension}")
# Save the image file
with open(image_path, "wb") as image_file:
image_file.write(response.content)
# Return the image path and dimensions
return image_path, image.size
def make_effect(
image_link: str,
filename: str,
frame_rate: int,
duration: int,
quality: int,
ssaa: float,
raw: bool,
):
# Download the image and get its dimensions
image_path, (width, height) = download_image(image_url=image_link)
print(f"Image path: {image_path}, Width: {width}, Height: {height}", "#" * 100)
# Define the output video file path
destination = os.path.join("/tmp/Video", filename)
# Create the destination directory if it doesn't exist
os.makedirs(os.path.dirname(destination), exist_ok=True)
# Build the depthflow command
command = [
"depthflow",
"input",
"-i",
image_path,
"main",
"-f",
str(frame_rate),
"-t",
str(duration),
"--width",
str(width),
"--height",
str(height),
"--quality",
str(quality),
"--ssaa",
str(ssaa),
"--benchmark",
]
if raw:
command.append("--raw")
command.extend(["--output", destination])
# Execute the depthflow command
subprocess.run(command, check=True)
return destination
@app.post("/generate_video")
async def generate_video(
background_task: BackgroundTasks,
image_link: str = None,
frame_rate: int = 30,
duration: int = 3,
quality: int = 10,
ssaa: float = 0.75,
raw: bool = True,
):
filename = f"{str(uuid.uuid4())}.mp4"
try:
background_task.add_task(
make_effect, image_link, filename, frame_rate, duration, quality, ssaa, raw
)
return {"output_file": filename}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/download/{filename}")
async def download_video(filename: str, request: Request):
video_directory = "/tmp/Video"
video_path = os.path.join(video_directory, filename)
if not os.path.isfile(video_path):
raise HTTPException(status_code=404, detail="Video not found")
range_header = request.headers.get("Range", None)
video_size = os.path.getsize(video_path)
if range_header:
start, end = range_header.strip().split("=")[1].split("-")
start = int(start)
end = video_size if end == "" else int(end)
headers = {
"Content-Range": f"bytes {start}-{end}/{video_size}",
"Accept-Ranges": "bytes",
}
content = read_file_range(video_path, start, end)
return StreamingResponse(content, media_type="video/mp4", headers=headers)
return FileResponse(video_path, media_type="video/mp4")
async def read_file_range(path, start, end):
async with aiofiles.open(path, "rb") as file:
await file.seek(start)
while True:
data = await file.read(1024 * 1024) # read in chunks of 1MB
if not data or await file.tell() > end:
break
yield data