|
import os, uuid |
|
import tempfile |
|
import requests |
|
from mimetypes import guess_extension |
|
from PIL import Image |
|
from io import BytesIO |
|
import subprocess |
|
from fastapi import ( |
|
FastAPI, |
|
UploadFile, |
|
File, |
|
HTTPException, |
|
Response, |
|
Request, |
|
BackgroundTasks, |
|
) |
|
from typing import List, Optional |
|
import asyncio, aiofiles |
|
from fastapi.responses import StreamingResponse, FileResponse |
|
|
|
app = FastAPI() |
|
|
|
|
|
def download_image(image_url: str): |
|
|
|
temp_dir = tempfile.mkdtemp() |
|
|
|
|
|
response = requests.get(image_url) |
|
response.raise_for_status() |
|
|
|
|
|
image = Image.open(BytesIO(response.content)) |
|
image_format = image.format.lower() |
|
image_extension = guess_extension(f"image/{image_format}") |
|
|
|
if image_extension is None: |
|
raise ValueError("Cannot detect image file type.") |
|
|
|
|
|
image_path = os.path.join(temp_dir, f"image{image_extension}") |
|
|
|
|
|
with open(image_path, "wb") as image_file: |
|
image_file.write(response.content) |
|
|
|
|
|
return image_path, image.size |
|
|
|
|
|
def make_effect( |
|
image_link: str, |
|
filename: str, |
|
frame_rate: int, |
|
duration: int, |
|
quality: int, |
|
ssaa: float, |
|
raw: bool, |
|
): |
|
|
|
image_path, (width, height) = download_image(image_url=image_link) |
|
print(f"Image path: {image_path}, Width: {width}, Height: {height}", "#" * 100) |
|
|
|
|
|
destination = os.path.join("/tmp/Video", filename) |
|
|
|
|
|
os.makedirs(os.path.dirname(destination), exist_ok=True) |
|
|
|
|
|
command = [ |
|
"depthflow", |
|
"input", |
|
"-i", |
|
image_path, |
|
"main", |
|
"-f", |
|
str(frame_rate), |
|
"-t", |
|
str(duration), |
|
"--width", |
|
str(width), |
|
"--height", |
|
str(height), |
|
"--quality", |
|
str(quality), |
|
"--ssaa", |
|
str(ssaa), |
|
"--benchmark", |
|
] |
|
|
|
if raw: |
|
command.append("--raw") |
|
|
|
command.extend(["--output", destination]) |
|
|
|
|
|
subprocess.run(command, check=True) |
|
|
|
return destination |
|
|
|
|
|
@app.post("/generate_video") |
|
async def generate_video( |
|
background_task: BackgroundTasks, |
|
image_link: str = None, |
|
frame_rate: int = 30, |
|
duration: int = 3, |
|
quality: int = 10, |
|
ssaa: float = 0.75, |
|
raw: bool = True, |
|
): |
|
filename = f"{str(uuid.uuid4())}.mp4" |
|
try: |
|
background_task.add_task( |
|
make_effect, image_link, filename, frame_rate, duration, quality, ssaa, raw |
|
) |
|
return {"output_file": filename} |
|
except Exception as e: |
|
raise HTTPException(status_code=400, detail=str(e)) |
|
|
|
|
|
@app.get("/download/{filename}") |
|
async def download_video(filename: str, request: Request): |
|
video_directory = "/tmp/Video" |
|
video_path = os.path.join(video_directory, filename) |
|
if not os.path.isfile(video_path): |
|
raise HTTPException(status_code=404, detail="Video not found") |
|
|
|
range_header = request.headers.get("Range", None) |
|
video_size = os.path.getsize(video_path) |
|
|
|
if range_header: |
|
start, end = range_header.strip().split("=")[1].split("-") |
|
start = int(start) |
|
end = video_size if end == "" else int(end) |
|
|
|
headers = { |
|
"Content-Range": f"bytes {start}-{end}/{video_size}", |
|
"Accept-Ranges": "bytes", |
|
} |
|
|
|
content = read_file_range(video_path, start, end) |
|
return StreamingResponse(content, media_type="video/mp4", headers=headers) |
|
|
|
return FileResponse(video_path, media_type="video/mp4") |
|
|
|
|
|
async def read_file_range(path, start, end): |
|
async with aiofiles.open(path, "rb") as file: |
|
await file.seek(start) |
|
while True: |
|
data = await file.read(1024 * 1024) |
|
if not data or await file.tell() > end: |
|
break |
|
yield data |
|
|