import os import tempfile import requests from mimetypes import guess_extension from PIL import Image from io import BytesIO import subprocess from fastapi import FastAPI, UploadFile, File, HTTPException, Response, Request from typing import List, Optional import asyncio, aiofiles from fastapi.responses import StreamingResponse, FileResponse app = FastAPI() def download_image(image_url: str): # Create a temporary directory temp_dir = tempfile.mkdtemp() # Get the image content response = requests.get(image_url) response.raise_for_status() # Detect the image type image = Image.open(BytesIO(response.content)) image_format = image.format.lower() image_extension = guess_extension(f"image/{image_format}") if image_extension is None: raise ValueError("Cannot detect image file type.") # Define the image file path image_path = os.path.join(temp_dir, f"image{image_extension}") # Save the image file with open(image_path, "wb") as image_file: image_file.write(response.content) # Return the image path and dimensions return image_path, image.size def make_effect( image_link: str, filename: str, frame_rate: int, duration: int, quality: int, ssaa: float, raw: bool, ): # Download the image and get its dimensions image_path, (width, height) = download_image(image_url=image_link) print(f"Image path: {image_path}, Width: {width}, Height: {height}", "#" * 100) # Define the output video file path destination = os.path.join("/tmp/Video", filename) # Create the destination directory if it doesn't exist os.makedirs(os.path.dirname(destination), exist_ok=True) # Build the depthflow command command = [ "depthflow", "input", "-i", image_path, "main", "-f", str(frame_rate), "-t", str(duration), "--width", str(width), "--height", str(height), "--quality", str(quality), "--ssaa", str(ssaa), "--benchmark", ] if raw: command.append("--raw") command.extend(["--output", destination]) # Execute the depthflow command subprocess.run(command, check=True) return destination @app.post("/generate_video") async def generate_video( image_link: str = None, filename: str = "output.mp4", frame_rate: int = 30, duration: int = 3, quality: int = 10, ssaa: float = 0.75, raw: bool = True, ): try: output_path = await make_effect( image_link, filename, frame_rate, duration, quality, ssaa, raw ) return {"output_file": filename} except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @app.get("/download/{filename}") async def download_video(filename: str, request: Request): video_directory = "/tmp/Video" video_path = os.path.join(video_directory, filename) if not os.path.isfile(video_path): raise HTTPException(status_code=404, detail="Video not found") range_header = request.headers.get("Range", None) video_size = os.path.getsize(video_path) if range_header: start, end = range_header.strip().split("=")[1].split("-") start = int(start) end = video_size if end == "" else int(end) headers = { "Content-Range": f"bytes {start}-{end}/{video_size}", "Accept-Ranges": "bytes", } content = read_file_range(video_path, start, end) return StreamingResponse(content, media_type="video/mp4", headers=headers) return FileResponse(video_path, media_type="video/mp4") async def read_file_range(path, start, end): async with aiofiles.open(path, "rb") as file: await file.seek(start) while True: data = await file.read(1024 * 1024) # read in chunks of 1MB if not data or await file.tell() > end: break yield data