Reverse-Image / app.py
Nymbo's picture
Update app.py
f77bbf5 verified
raw
history blame
8.25 kB
import gradio as gr
import requests
import yt_dlp
import cv2
from google_img_source_search import ReverseImageSearcher
from PIL import Image
import os
import uuid
from pathlib import Path
def ensure_directory(directory):
Path(directory).mkdir(parents=True, exist_ok=True)
return directory
# Create a unique working directory for each session
def get_session_dir():
session_id = str(uuid.uuid4())
return ensure_directory(f"temp_{session_id}")
def dl(inp):
if not inp or not inp.strip():
return None, gr.HTML("Please provide a valid URL"), "", ""
work_dir = get_session_dir()
out = None
try:
# Sanitize input filename
inp_out = inp.replace("https://", "").replace("/", "_").replace(".", "_").replace("=", "_").replace("?", "_")
output_path = os.path.join(work_dir, f"{inp_out}.mp4")
ydl_opts = {
'format': 'best[ext=mp4]',
'outtmpl': output_path,
'quiet': True,
'no_warnings': True
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([inp])
if os.path.exists(output_path):
out = output_path
print(f"Successfully downloaded video to: {out}")
else:
raise Exception("Video download failed")
except Exception as e:
print(f"Download error: {str(e)}")
return None, gr.HTML(f"Error downloading video: {str(e)}"), "", ""
return out, gr.HTML(""), "", ""
def process_vid(file, cur_frame, every_n):
if not file or not os.path.exists(file):
return gr.HTML("No valid video file provided."), "", ""
work_dir = os.path.dirname(file)
capture = cv2.VideoCapture(file)
if not capture.isOpened():
return gr.HTML("Failed to open video file."), "", ""
frame_count = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
rev_img_searcher = ReverseImageSearcher()
html_out = ""
count = int(every_n) if every_n else 10
start_frame = int(cur_frame) if cur_frame and cur_frame.strip() else 0
try:
for i in range(start_frame, frame_count-1):
if count == int(every_n):
count = 1
print(f"Processing frame {i}")
# Read frame
capture.set(cv2.CAP_PROP_POS_FRAMES, i)
ret, frame = capture.read()
if not ret or frame is None:
print(f"Failed to read frame {i}")
continue
# Save frame
frame_path = os.path.join(work_dir, f"frame_{i}.png")
if not cv2.imwrite(frame_path, frame):
print(f"Failed to write frame {i}")
continue
# Process frame
out_url = f'https://nymbo-reverse-image.hf.space/file={os.path.abspath(frame_path)}'
res = rev_img_searcher.search(out_url)
if res:
out_cnt = 0
for search_item in res:
out_cnt += 1
html_out += f"""
<div>
<h3>Result {out_cnt}</h3>
<p>Title: {search_item.page_title}</p>
<p>Site: <a href='{search_item.page_url}' target='_blank'>{search_item.page_url}</a></p>
<p>Image: <a href='{search_item.image_url}' target='_blank'>{search_item.image_url}</a></p>
<img class='my_im' src='{search_item.image_url}' alt='Search result'>
</div>
"""
return gr.HTML(f'<h2>Total Found: {out_cnt}</h2>{html_out}'), f"Found frame: {i}", i+int(every_n)
count += 1
except Exception as e:
import traceback
error_msg = f"Error processing video: {str(e)}\n{traceback.format_exc()}"
print(error_msg)
return gr.HTML(error_msg), "", ""
finally:
capture.release()
return gr.HTML('No matches found in processed frames.'), "", ""
def process_im(file, url):
if not url.startswith("https://nymbo"):
return url
work_dir = get_session_dir()
output_path = os.path.join(work_dir, "processed_image.png")
try:
read_file = Image.open(file)
read_file.save(output_path)
return f'https://nymbo-reverse-image.hf.space/file={os.path.abspath(output_path)}'
except Exception as e:
print(f"Error processing image: {str(e)}")
return url
def rev_im(image):
if not image or not os.path.exists(image):
return gr.HTML("No valid image provided.")
work_dir = get_session_dir()
html_out = ""
try:
# Read and write image
img = cv2.imread(image)
if img is None:
return gr.HTML("Failed to read image file.")
output_path = os.path.join(work_dir, "search_image.png")
if not cv2.imwrite(output_path, img):
return gr.HTML("Failed to process image file.")
# Search image
out_url = f'https://nymbo-reverse-image.hf.space/file={os.path.abspath(output_path)}'
rev_img_searcher = ReverseImageSearcher()
res = rev_img_searcher.search(out_url)
count = 0
for search_item in res:
count += 1
html_out += f"""
<div>
<h3>Result {count}</h3>
<p>Title: {search_item.page_title}</p>
<p>Site: <a href='{search_item.page_url}' target='_blank'>{search_item.page_url}</a></p>
<p>Image: <a href='{search_item.image_url}' target='_blank'>{search_item.image_url}</a></p>
<img class='my_im' src='{search_item.image_url}' alt='Search result'>
</div>
"""
except Exception as e:
return gr.HTML(f"Error processing image: {str(e)}")
return gr.HTML(f'<h2>Total Found: {count}</h2>{html_out}')
# Gradio Interface
with gr.Blocks() as app:
with gr.Row():
gr.Column()
with gr.Column():
source_tog = gr.Radio(choices=["Image", "Video"], value="Image", label="Search Type")
# Image search interface
with gr.Box(visible=True) as im_box:
inp_url = gr.Textbox(label="Image URL")
load_im_btn = gr.Button("Load Image")
inp_im = gr.Image(label="Search Image", type='filepath')
go_btn_im = gr.Button("Search")
# Video search interface
with gr.Box(visible=False) as vid_box:
vid_url = gr.Textbox(label="Video URL")
vid_url_btn = gr.Button("Load URL")
inp_vid = gr.Video(label="Search Video")
with gr.Row():
every_n = gr.Number(label="Process every N frames", value=10, minimum=1)
stat_box = gr.Textbox(label="Status")
with gr.Row():
go_btn_vid = gr.Button("Start Search")
next_btn = gr.Button("Next Frame")
gr.Column()
with gr.Row():
html_out = gr.HTML()
with gr.Row(visible=False):
hid_box = gr.Textbox()
# Interface logic
def toggle_interface(tog):
return gr.update(visible=tog == "Image"), gr.update(visible=tog == "Video")
source_tog.change(
toggle_interface,
[source_tog],
[im_box, vid_box],
cancels=[go_btn_vid.click, go_btn_im.click, load_im_btn.click, vid_url_btn.click]
)
# Button actions
load_im_btn.click(lambda x: x, inp_url, inp_im)
next_btn.click(process_vid, [inp_vid, hid_box, every_n], [html_out, stat_box, hid_box])
vid_url_btn.click(dl, vid_url, [inp_vid, html_out, stat_box, hid_box])
go_btn_vid.click(process_vid, [inp_vid, hid_box, every_n], [html_out, stat_box, hid_box])
go_btn_im.click(rev_im, inp_im, html_out)
app.queue(concurrency_count=20).launch()