|
import argparse |
|
import os |
|
import asyncio |
|
import fal_client |
|
import base64 |
|
import io |
|
from PIL import Image |
|
import requests |
|
import shutil |
|
from together import Together |
|
|
|
|
|
OUTPUT_DIR = "output" |
|
os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
|
|
def get_next_dir_number(): |
|
"""Get the next available directory number for output.""" |
|
existing_dirs = [d for d in os.listdir(OUTPUT_DIR) |
|
if os.path.isdir(os.path.join(OUTPUT_DIR, d)) and d.isdigit()] |
|
if not existing_dirs: |
|
return 1 |
|
return max(map(int, existing_dirs)) + 1 |
|
|
|
def save_results(input_path, generated_image_path, video_path, user_prompt, optimized_prompt, output_dir=None): |
|
""" |
|
Save all generation results in a numbered directory within OUTPUT_DIR. |
|
|
|
Args: |
|
input_path: Path to the input reference image |
|
generated_image_path: Path to the generated image |
|
video_path: Path to the generated video |
|
user_prompt: The original text prompt used for generation |
|
optimized_prompt: The optimized prompt used for generation |
|
output_dir: Optional custom output directory |
|
|
|
Returns: |
|
Tuple of (result_dir, saved_video_path) |
|
""" |
|
|
|
if output_dir is None: |
|
dir_num = get_next_dir_number() |
|
result_dir = os.path.join(OUTPUT_DIR, str(dir_num)) |
|
else: |
|
result_dir = output_dir |
|
|
|
os.makedirs(result_dir, exist_ok=True) |
|
|
|
|
|
input_image_path = os.path.join(result_dir, "input_image.png") |
|
shutil.copy2(input_path, input_image_path) |
|
|
|
|
|
output_image_path = os.path.join(result_dir, "generated_image.png") |
|
shutil.copy2(generated_image_path, output_image_path) |
|
|
|
|
|
saved_video_path = os.path.join(result_dir, "generated_video.mp4") |
|
shutil.copy2(video_path, saved_video_path) |
|
|
|
|
|
with open(os.path.join(result_dir, "input_prompt.txt"), "w") as f: |
|
f.write(user_prompt) |
|
|
|
|
|
with open(os.path.join(result_dir, "opt_prompt.txt"), "w") as f: |
|
f.write(optimized_prompt) |
|
|
|
print(f"All results saved to directory: {result_dir}") |
|
return result_dir, saved_video_path |
|
|
|
async def generate_image(ref_image, prompt): |
|
print(f"Generating image") |
|
|
|
handler = await fal_client.submit_async( |
|
"fal-ai/flux-pulid", |
|
arguments={ |
|
"prompt": prompt, |
|
"reference_image_url": ref_image |
|
}, |
|
) |
|
|
|
|
|
async for _ in handler.iter_events(): |
|
pass |
|
|
|
result = await handler.get() |
|
return result |
|
|
|
async def generate_video(image_path, prompt): |
|
print(f"Generating video from image...'") |
|
|
|
|
|
with open(image_path, 'rb') as image_file: |
|
image_data = image_file.read() |
|
base64_image = base64.b64encode(image_data).decode('utf-8') |
|
image_data_url = f"data:image/png;base64,{base64_image}" |
|
|
|
handler = await fal_client.submit_async( |
|
"fal-ai/wan-i2v", |
|
arguments={ |
|
"prompt": prompt, |
|
"image_url": image_data_url, |
|
"resolution": "480p", |
|
"guide_scale": 6.5, |
|
"shift": 4.5, |
|
"enable_prompt_expansion": True, |
|
"acceleration": "regular", |
|
"aspect_ratio": "auto" |
|
}, |
|
) |
|
|
|
|
|
async for _ in handler.iter_events(): |
|
pass |
|
|
|
|
|
request_id = handler.request_id |
|
|
|
|
|
result = fal_client.result("fal-ai/wan-i2v", request_id) |
|
return result |
|
|
|
async def optimize_prompt(ref_image_path, user_prompt): |
|
print(f"Optimizing prompt...") |
|
|
|
|
|
client = Together() |
|
|
|
|
|
with open(ref_image_path, 'rb') as image_file: |
|
image_data = base64.b64encode(image_file.read()).decode('utf-8') |
|
|
|
|
|
messages = [ |
|
{"role": "system", "content": "You are an expert at describing images in detail, focusing on clothing, accessories, poses, and visual attributes."}, |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_data}"}}, |
|
{"type": "text", "text": "Describe this image in detail, focusing on the clothing, accessories, pose, and any distinctive visual features."} |
|
] |
|
} |
|
] |
|
|
|
|
|
response = client.chat.completions.create( |
|
model="meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8", |
|
messages=messages, |
|
max_tokens=500 |
|
) |
|
|
|
image_description = response.choices[0].message.content |
|
|
|
|
|
prompt_messages = [ |
|
{"role": "system", "content": "You are an expert at combining user prompts with detailed image descriptions to create optimal prompts for image generation. Focus on maintaining visual consistency while incorporating the user's desired changes. IMPORTANT: Return ONLY the optimized prompt without any explanations or additional text."}, |
|
{"role": "user", "content": f"""Here is a detailed description of the reference image: |
|
{image_description} |
|
|
|
And here is what the user wants to do with it: |
|
{user_prompt} |
|
|
|
Create an optimal prompt that maintains the visual details (especially clothing and accessories) while incorporating the user's desired changes. The prompt should be direct and descriptive. Return ONLY the prompt without any explanations."""} |
|
] |
|
|
|
|
|
response = client.chat.completions.create( |
|
model="meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8", |
|
messages=prompt_messages, |
|
max_tokens=500 |
|
) |
|
|
|
optimized_prompt = response.choices[0].message.content.strip() |
|
print(f"Original prompt: {user_prompt}") |
|
print(f"Optimized prompt: {optimized_prompt}") |
|
|
|
return optimized_prompt |
|
|
|
async def process_async(ref, prompt, output): |
|
print(f"Processing image:") |
|
|
|
|
|
if ref.startswith('http'): |
|
response = requests.get(ref) |
|
temp_image_path = os.path.join(output, 'temp_ref_image.png') |
|
with open(temp_image_path, 'wb') as f: |
|
f.write(response.content) |
|
ref_path = temp_image_path |
|
else: |
|
|
|
if ref.startswith('data:image'): |
|
base64_data = ref.split(',')[1] |
|
image_bytes = base64.b64decode(base64_data) |
|
temp_image_path = os.path.join(output, 'temp_ref_image.png') |
|
with open(temp_image_path, 'wb') as f: |
|
f.write(image_bytes) |
|
ref_path = temp_image_path |
|
else: |
|
ref_path = ref |
|
|
|
|
|
optimized_prompt = await optimize_prompt(ref_path, prompt) |
|
|
|
|
|
result = await generate_image(ref, optimized_prompt) |
|
|
|
|
|
if result and 'images' in result and len(result['images']) > 0: |
|
|
|
image_data = result['images'][0] |
|
|
|
|
|
if isinstance(image_data, str) and image_data.startswith('data:image'): |
|
base64_data = image_data.split(',')[1] |
|
image_bytes = base64.b64decode(base64_data) |
|
image = Image.open(io.BytesIO(image_bytes)) |
|
|
|
elif isinstance(image_data, dict) and 'url' in image_data: |
|
response = requests.get(image_data['url']) |
|
image = Image.open(io.BytesIO(response.content)) |
|
else: |
|
print(f"Unexpected image format in response: {type(image_data)}") |
|
return None |
|
|
|
|
|
output_filename = os.path.join(output, 'generated_image.png') |
|
image.save(output_filename) |
|
print(f"Generated image saved to: {output_filename}") |
|
|
|
|
|
video_result = await generate_video(output_filename, prompt) |
|
|
|
|
|
if video_result and isinstance(video_result, dict) and 'video' in video_result: |
|
video_url = video_result['video']['url'] |
|
video_response = requests.get(video_url) |
|
if video_response.status_code == 200: |
|
video_filename = os.path.join(output, 'generated_video.mp4') |
|
with open(video_filename, 'wb') as f: |
|
f.write(video_response.content) |
|
print(f"Generated video saved to: {video_filename}") |
|
|
|
|
|
if output != os.path.join(OUTPUT_DIR, str(get_next_dir_number() - 1)): |
|
result_dir, saved_video_path = save_results( |
|
ref_path, output_filename, video_filename, prompt, optimized_prompt |
|
) |
|
return result, output_filename, saved_video_path |
|
|
|
return result, output_filename, video_filename |
|
else: |
|
print(f"Failed to download video. Status code: {video_response.status_code}") |
|
else: |
|
print("Error: No video URL in response") |
|
|
|
return result, output_filename, None |
|
else: |
|
print("Error: Failed to generate image") |
|
return None |
|
|
|
def process(ref, prompt, output): |
|
return asyncio.run(process_async(ref, prompt, output)) |
|
|
|
def main(): |
|
|
|
parser = argparse.ArgumentParser(description='Process an image with a text prompt and generate a video') |
|
parser.add_argument('--ref', type=str, required=True, help='URL or path to the reference image') |
|
parser.add_argument('--prompt', type=str, required=True, help='Text prompt') |
|
parser.add_argument('--output', type=str, default=None, help='Optional custom output directory. If not provided, a numbered directory will be created.') |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
if args.output: |
|
output_dir = args.output |
|
os.makedirs(output_dir, exist_ok=True) |
|
print(f"Using custom output directory: {output_dir}") |
|
else: |
|
|
|
temp_dir = os.path.join(OUTPUT_DIR, "temp") |
|
os.makedirs(temp_dir, exist_ok=True) |
|
output_dir = temp_dir |
|
|
|
|
|
print(f"Reference image: {args.ref}") |
|
print(f"Text prompt: {args.prompt}") |
|
|
|
|
|
result, image_path, video_path = process(args.ref, args.prompt, output_dir) |
|
|
|
if result and image_path and video_path: |
|
print("Processing complete") |
|
return 0 |
|
else: |
|
print("Processing failed") |
|
return 1 |
|
|
|
if __name__ == "__main__": |
|
exit(main()) |