Depthflow / main.py
Mbonea's picture
main
59c7007
raw
history blame
4.03 kB
import os
import tempfile
import requests
from mimetypes import guess_extension
from PIL import Image
from io import BytesIO
import subprocess
from fastapi import FastAPI, UploadFile, File, HTTPException, Response, Request
from typing import List, Optional
import asyncio, aiofiles
from fastapi.responses import StreamingResponse, FileResponse
app = FastAPI()
def download_image(image_url: str):
# Create a temporary directory
temp_dir = tempfile.mkdtemp()
# Get the image content
response = requests.get(image_url)
response.raise_for_status()
# Detect the image type
image = Image.open(BytesIO(response.content))
image_format = image.format.lower()
image_extension = guess_extension(f"image/{image_format}")
if image_extension is None:
raise ValueError("Cannot detect image file type.")
# Define the image file path
image_path = os.path.join(temp_dir, f"image{image_extension}")
# Save the image file
with open(image_path, "wb") as image_file:
image_file.write(response.content)
# Return the image path and dimensions
return image_path, image.size
def make_effect(
image_link: str,
filename: str,
frame_rate: int,
duration: int,
quality: int,
ssaa: float,
raw: bool,
):
# Download the image and get its dimensions
image_path, (width, height) = download_image(image_url=image_link)
print(f"Image path: {image_path}, Width: {width}, Height: {height}", "#" * 100)
# Define the output video file path
destination = os.path.join("/tmp/Video", filename)
# Create the destination directory if it doesn't exist
os.makedirs(os.path.dirname(destination), exist_ok=True)
# Build the depthflow command
command = [
"depthflow",
"input",
"-i",
image_path,
"main",
"-f",
str(frame_rate),
"-t",
str(duration),
"--width",
str(width),
"--height",
str(height),
"--quality",
str(quality),
"--ssaa",
str(ssaa),
"--benchmark",
]
if raw:
command.append("--raw")
command.extend(["--output", destination])
# Execute the depthflow command
subprocess.run(command, check=True)
return destination
@app.post("/generate_video")
async def generate_video(
image_link: str = None,
filename: str = "output.mp4",
frame_rate: int = 30,
duration: int = 3,
quality: int = 10,
ssaa: float = 0.75,
raw: bool = True,
):
try:
output_path = await make_effect(
image_link, filename, frame_rate, duration, quality, ssaa, raw
)
return {"output_file": filename}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/download/{filename}")
async def download_video(filename: str, request: Request):
video_directory = "/tmp/Video"
video_path = os.path.join(video_directory, filename)
if not os.path.isfile(video_path):
raise HTTPException(status_code=404, detail="Video not found")
range_header = request.headers.get("Range", None)
video_size = os.path.getsize(video_path)
if range_header:
start, end = range_header.strip().split("=")[1].split("-")
start = int(start)
end = video_size if end == "" else int(end)
headers = {
"Content-Range": f"bytes {start}-{end}/{video_size}",
"Accept-Ranges": "bytes",
}
content = read_file_range(video_path, start, end)
return StreamingResponse(content, media_type="video/mp4", headers=headers)
return FileResponse(video_path, media_type="video/mp4")
async def read_file_range(path, start, end):
async with aiofiles.open(path, "rb") as file:
await file.seek(start)
while True:
data = await file.read(1024 * 1024) # read in chunks of 1MB
if not data or await file.tell() > end:
break
yield data