File size: 4,138 Bytes
748217d 2430e21 748217d 2430e21 748217d 59c7007 2430e21 748217d 2430e21 ab7efde 748217d 2430e21 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
import os, uuid
import tempfile
import requests
from mimetypes import guess_extension
from PIL import Image
from io import BytesIO
import subprocess
from fastapi import (
FastAPI,
UploadFile,
File,
HTTPException,
Response,
Request,
BackgroundTasks,
)
from typing import List, Optional
import asyncio, aiofiles
from fastapi.responses import StreamingResponse, FileResponse
app = FastAPI()
def download_image(image_url: str):
# Create a temporary directory
temp_dir = tempfile.mkdtemp()
# Get the image content
response = requests.get(image_url)
response.raise_for_status()
# Detect the image type
image = Image.open(BytesIO(response.content))
image_format = image.format.lower()
image_extension = guess_extension(f"image/{image_format}")
if image_extension is None:
raise ValueError("Cannot detect image file type.")
# Define the image file path
image_path = os.path.join(temp_dir, f"image{image_extension}")
# Save the image file
with open(image_path, "wb") as image_file:
image_file.write(response.content)
# Return the image path and dimensions
return image_path, image.size
def make_effect(
image_link: str,
filename: str,
frame_rate: int,
duration: int,
quality: int,
ssaa: float,
raw: bool,
):
# Download the image and get its dimensions
image_path, (width, height) = download_image(image_url=image_link)
print(f"Image path: {image_path}, Width: {width}, Height: {height}", "#" * 100)
# Define the output video file path
destination = os.path.join("/tmp/Video", filename)
# Create the destination directory if it doesn't exist
os.makedirs(os.path.dirname(destination), exist_ok=True)
# Build the depthflow command
command = [
"depthflow",
"input",
"-i",
image_path,
"main",
"-f",
str(frame_rate),
"-t",
str(duration),
"--width",
str(width),
"--height",
str(height),
"--quality",
str(quality),
"--ssaa",
str(ssaa),
"--benchmark",
]
if raw:
command.append("--raw")
command.extend(["--output", destination])
# Execute the depthflow command
subprocess.run(command, check=True)
return destination
@app.post("/generate_video")
async def generate_video(
background_task: BackgroundTasks,
image_link: str = None,
frame_rate: int = 30,
duration: int = 3,
quality: int = 10,
ssaa: float = 0.75,
raw: bool = True,
):
filename = f"{str(uuid.uuid4())}.mp4"
try:
background_task.add_task(
make_effect, image_link, filename, frame_rate, duration, quality, ssaa, raw
)
return {"output_file": filename}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/download/{filename}")
async def download_video(filename: str, request: Request):
video_directory = "/tmp/Video"
video_path = os.path.join(video_directory, filename)
if not os.path.isfile(video_path):
raise HTTPException(status_code=404, detail="Video not found")
range_header = request.headers.get("Range", None)
video_size = os.path.getsize(video_path)
if range_header:
start, end = range_header.strip().split("=")[1].split("-")
start = int(start)
end = video_size if end == "" else int(end)
headers = {
"Content-Range": f"bytes {start}-{end}/{video_size}",
"Accept-Ranges": "bytes",
}
content = read_file_range(video_path, start, end)
return StreamingResponse(content, media_type="video/mp4", headers=headers)
return FileResponse(video_path, media_type="video/mp4")
async def read_file_range(path, start, end):
async with aiofiles.open(path, "rb") as file:
await file.seek(start)
while True:
data = await file.read(1024 * 1024) # read in chunks of 1MB
if not data or await file.tell() > end:
break
yield data
|