File size: 9,901 Bytes
aead09a
a8f7368
1ce9878
d82da04
7716718
2255cf1
a8f7368
e8f283e
a8f7368
 
 
 
c6fe582
a8f7368
 
 
 
 
 
 
 
 
51d1028
a8f7368
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c6fe582
a8f7368
 
 
 
 
 
c6fe582
a8f7368
 
 
 
 
 
 
c6fe582
a8f7368
 
 
 
c6fe582
a8f7368
 
 
 
 
c6fe582
a8f7368
 
 
 
 
 
 
 
 
c6fe582
a8f7368
 
 
c6fe582
a8f7368
 
 
 
 
 
 
 
 
 
 
d4e29c7
a8f7368
 
 
d4e29c7
c6fe582
a8f7368
 
 
 
 
 
 
599424e
a8f7368
 
 
 
 
 
 
 
 
 
 
 
d4e29c7
dc9ebd1
a8f7368
d4e29c7
a8f7368
 
c6fe582
 
a8f7368
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c6fe582
a8f7368
599424e
a8f7368
 
 
 
 
 
 
c6fe582
d82da04
c6fe582
a8f7368
319e8a0
dc9ebd1
a8f7368
 
 
 
dc9ebd1
a8f7368
 
 
c6fe582
a8f7368
 
c558940
dc9ebd1
a8f7368
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c6fe582
a8f7368
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c6fe582
a8f7368
c6fe582
a8f7368
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
import gradio as gr
import requests 
import yt_dlp
import cv2
from google_img_source_search import ReverseImageSearcher
from PIL import Image
import os 
import uuid
from pathlib import Path
import shutil
import logging
import sys

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(sys.stdout)
    ]
)
logger = logging.getLogger(__name__)

# Custom CSS for better presentation
CUSTOM_CSS = """
.result-item {
    margin: 20px 0;
    padding: 15px;
    border: 1px solid #ddd;
    border-radius: 5px;
}
.result-item img {
    max-width: 100%;
    height: auto;
    margin: 10px 0;
}
.error-message {
    color: red;
    padding: 10px;
    border: 1px solid red;
    border-radius: 5px;
    margin: 10px 0;
}
"""

def create_temp_dir():
    """Create a temporary directory with UUID."""
    uid = str(uuid.uuid4())
    temp_dir = Path(f"temp_{uid}")
    temp_dir.mkdir(exist_ok=True)
    return temp_dir

def cleanup_temp_dir(temp_dir):
    """Safely clean up temporary directory and its contents."""
    try:
        if temp_dir.exists():
            shutil.rmtree(str(temp_dir))
    except Exception as e:
        logger.error(f"Error cleaning up temporary directory: {e}")

def dl(inp):
    """Download video from URL using yt-dlp."""
    if not inp or not inp.strip():
        return None, gr.HTML("<p class='error-message'>Please provide a valid URL</p>"), "", ""

    temp_dir = create_temp_dir()
    try:
        # Sanitize input filename
        inp_out = inp.replace("https://", "").replace("/", "_").replace(".", "_").replace("=", "_").replace("?", "_")
        output_path = str(temp_dir / f"{inp_out}.mp4")

        # Configure yt-dlp options
        ydl_opts = {
            'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/mp4',
            'outtmpl': output_path,
            'quiet': True,
            'no_warnings': True,
            'extract_flat': False,
            'merge_output_format': 'mp4'
        }

        # Handle Twitter URLs differently
        if "twitter" in inp.lower():
            ydl_opts['extractor_args'] = {'twitter': {'api': ['syndication']}}

        # Download the video
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            logger.info(f"Downloading video from: {inp}")
            ydl.download([inp])
            
        if os.path.exists(output_path):
            logger.info(f"Successfully downloaded to: {output_path}")
            return output_path, gr.HTML(""), "", ""
        else:
            raise Exception("Download completed but file not found")
            
    except Exception as e:
        logger.error(f"Error downloading video: {e}")
        cleanup_temp_dir(temp_dir)
        return None, gr.HTML(f"<p class='error-message'>Error: {str(e)}</p>"), "", ""

def process_vid(file, cur_frame, every_n):
    """Process video file and search for frames."""
    if not file:
        return gr.HTML("<p class='error-message'>No video file provided</p>"), "", ""

    temp_dir = create_temp_dir()
    capture = None
        
    try:
        capture = cv2.VideoCapture(str(file))
        if not capture.isOpened():
            raise Exception("Failed to open video file")
            
        frame_count = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
        rev_img_searcher = ReverseImageSearcher()
        html_out = ""
        count = int(every_n)
        
        start_frame = int(cur_frame) if cur_frame and cur_frame.strip() else 0
        
        for i in range(start_frame, frame_count-1):
            if count == int(every_n):
                count = 1
                logger.info(f"Processing frame {i}")
                capture.set(cv2.CAP_PROP_POS_FRAMES, i)
                ret, frame = capture.read()
                
                if not ret:
                    continue
                    
                temp_image = temp_dir / f"frame_{i}.png"
                cv2.imwrite(str(temp_image), frame)
                
                # Generate the URL for the temporary image
                image_path = os.path.abspath(str(temp_image))
                image_url = f'https://nymbo-reverse-image.hf.space/file={image_path}'
                
                try:
                    results = rev_img_searcher.search(image_url)
                    if results:
                        out_cnt = len(results)
                        html_out = build_results_html(results)
                        return gr.HTML(f'<h1>Total Found: {out_cnt}</h1><br>{html_out}'), f"Found frame: {i}", i+int(every_n)
                except Exception as search_error:
                    logger.error(f"Search error on frame {i}: {search_error}")
                    
            count += 1
            
    except Exception as e:
        logger.error(f"Error processing video: {e}")
        return gr.HTML(f'<p class="error-message">Error: {str(e)}</p>'), "", ""
    finally:
        if capture is not None:
            capture.release()
        cleanup_temp_dir(temp_dir)
            
    return gr.HTML('No frame matches found.'), "", ""

def process_im(file, url):
    """Process image file or URL."""
    if not url.startswith("https://nymbo"):
        return url
    
    temp_dir = create_temp_dir()
    try:
        temp_image = temp_dir / "temp.png"
        read_file = Image.open(file)
        read_file.save(str(temp_image))
        
        out_url = f'https://nymbo-reverse-image.hf.space/file={os.path.abspath(str(temp_image))}'
        return out_url
    finally:
        cleanup_temp_dir(temp_dir)

def rev_im(image):
    """Reverse image search for a single image."""
    if not image:
        return gr.HTML("<p class='error-message'>No image provided</p>")

    temp_dir = create_temp_dir()
    try:
        temp_image = temp_dir / "temp.png"
        image_cv = cv2.imread(image)
        cv2.imwrite(str(temp_image), image_cv)
        
        out_url = f'https://nymbo-reverse-image.hf.space/file={os.path.abspath(str(temp_image))}'
        
        rev_img_searcher = ReverseImageSearcher()
        results = rev_img_searcher.search(out_url)
        
        return gr.HTML(build_results_html(results, len(results)))
    except Exception as e:
        logger.error(f"Error in reverse image search: {e}")
        return gr.HTML(f'<p class="error-message">Error: {str(e)}</p>')
    finally:
        cleanup_temp_dir(temp_dir)

def build_results_html(results, count=None):
    """Build HTML output for search results."""
    html_out = f"<h1>Total Found: {count}</h1><br>" if count is not None else ""
    
    for item in results:
        html_out += f"""
        <div class="result-item">
            <h3>{item.page_title}</h3>
            <p>Source: <a href='{item.page_url}' target='_blank' rel='noopener noreferrer'>{item.page_url}</a></p>
            <p>Image: <a href='{item.image_url}' target='_blank' rel='noopener noreferrer'>{item.image_url}</a></p>
            <img class='my_im' src='{item.image_url}' alt='{item.page_title}'>
        </div>
        """
    return html_out

def create_ui():
    """Create the Gradio interface."""
    with gr.Blocks(css=CUSTOM_CSS) as app:
        with gr.Row():
            gr.Column()
            with gr.Column():
                source_tog = gr.Radio(
                    choices=["Image", "Video"],
                    value="Image",
                    label="Search Type"
                )
                
                # Image search interface
                with gr.Box(visible=True) as im_box:
                    inp_url = gr.Textbox(label="Image URL")
                    load_im_btn = gr.Button("Load Image", variant="primary")
                    inp_im = gr.Image(label="Search Image", type='filepath')
                    go_btn_im = gr.Button("Search", variant="primary")

                # Video search interface
                with gr.Box(visible=False) as vid_box:
                    vid_url = gr.Textbox(label="Video URL")
                    vid_url_btn = gr.Button("Load URL", variant="primary")
                    inp_vid = gr.Video(label="Search Video")
                    with gr.Row():
                        every_n = gr.Number(
                            label="Every nth frame",
                            value=10,
                            minimum=1,
                            maximum=100,
                            step=1
                        )
                    stat_box = gr.Textbox(label="Status", interactive=False)
                    with gr.Row():
                        go_btn_vid = gr.Button("Start", variant="primary")
                        next_btn = gr.Button("Next Frame", variant="secondary")
                
            gr.Column()
            
        with gr.Row():
            html_out = gr.HTML()
        
        with gr.Row(visible=False):
            hid_box = gr.Textbox()

        # Event handlers
        def toggle_interface(tog):
            """Toggle between image and video interfaces."""
            return (
                gr.update(visible=tog == "Image"),
                gr.update(visible=tog == "Video")
            )

        # Connect all event handlers
        source_tog.change(
            toggle_interface,
            [source_tog],
            [im_box, vid_box],
            cancels=[go_btn_vid.click, go_btn_im.click, load_im_btn.click, vid_url_btn.click]
        )
        
        load_im_btn.click(lambda x: x, inp_url, inp_im)
        next_btn.click(process_vid, [inp_vid, hid_box, every_n], [html_out, stat_box, hid_box])
        vid_url_btn.click(dl, vid_url, [inp_vid, html_out, stat_box, hid_box])
        go_btn_vid.click(process_vid, [inp_vid, hid_box, every_n], [html_out, stat_box, hid_box])
        go_btn_im.click(rev_im, inp_im, html_out)

    return app

if __name__ == "__main__":
    app = create_ui()
    app.queue(concurrency_count=20).launch()