Spaces:
Running
Running
import os | |
import json | |
import numpy as np | |
import subprocess | |
import faiss | |
import cv2 | |
import re | |
import gradio as gr | |
from sentence_transformers import SentenceTransformer | |
from openai import OpenAI | |
import logging | |
from PIL import Image | |
import base64 | |
import io | |
deepseek_api_key = os.environ.get("DEEPSEEK_API_KEY", "YOUR_API_KEY") | |
client = OpenAI( | |
base_url="https://openrouter.ai/api/v1", | |
api_key=deepseek_api_key, | |
) | |
DATASET_PATH = "data" | |
JSON_PATH = f"{DATASET_PATH}/sign_language_data.json" | |
if os.path.exists(JSON_PATH): | |
with open(JSON_PATH, "r") as f: | |
dataset = json.load(f) | |
for item in dataset: | |
category = item["category"].lower().replace(" ", "_") | |
video_filename = os.path.basename(item["video_clip_path"]) | |
item["video_clip_path"] = f"{DATASET_PATH}/clips/{category}/{video_filename}" | |
frame_filename = os.path.basename(item["frame_path"]) | |
item["frame_path"] = f"{DATASET_PATH}/all_signs/{frame_filename}" | |
else: | |
dataset = [] | |
print(f"Warning: {JSON_PATH} does not exist. Using empty dataset.") | |
logging.getLogger("sentence_transformers").setLevel(logging.ERROR) | |
print("Loading sentence transformer model...") | |
embed_model = SentenceTransformer("all-MiniLM-L6-v2") | |
dimension = 384 | |
index = faiss.IndexFlatL2(dimension) | |
text_to_video = {} | |
idx_to_text = [] | |
for item in dataset: | |
phrases = [item["text"]] + item.get("semantic_meaning", []) | |
for phrase in phrases: | |
embedding = embed_model.encode(phrase).astype(np.float32) | |
index.add(np.array([embedding])) | |
text_to_video[phrase] = item["video_clip_path"] | |
idx_to_text.append(phrase) | |
print(f"Indexed {len(idx_to_text)} phrases") | |
def list_available_phrases(): | |
print("Available phrases in dataset:") | |
for idx, phrase in enumerate(text_to_video.keys()): | |
print(f"{idx+1}. '{phrase}'") | |
print(f"Total: {len(text_to_video)} phrases") | |
def preprocess_text(text): | |
emoji_pattern = re.compile("[" | |
u"\U0001F600-\U0001F64F" | |
u"\U0001F300-\U0001F5FF" | |
u"\U0001F680-\U0001F6FF" | |
u"\U0001F700-\U0001F77F" | |
u"\U0001F780-\U0001F7FF" | |
u"\U0001F800-\U0001F8FF" | |
u"\U0001F900-\U0001F9FF" | |
u"\U0001FA00-\U0001FA6F" | |
u"\U0001FA70-\U0001FAFF" | |
u"\U00002702-\U000027B0" | |
u"\U000024C2-\U0001F251" | |
"]+", flags=re.UNICODE) | |
text = emoji_pattern.sub(r'', text) | |
text = re.sub(r'[^\w\s\?\/]', '', text) | |
text = re.sub(r'\s+', ' ', text).strip() | |
return text | |
def refine_sentence_with_deepseek(text): | |
text = preprocess_text(text) | |
prompt = f""" | |
Convert the following sentence into a sign-language-friendly version: | |
- Remove unnecessary words like articles (a, an, the). | |
- Keep essential words like pronouns (I, you, we, they). | |
- Maintain question words (what, where, when, why, how). | |
- Ensure verbs and key actions are included. | |
- Reorder words to match sign language grammar. | |
- IMPORTANT: Format your response with "SIGN_LANGUAGE_VERSION: [your simplified phrase]" at the beginning. | |
- Sign language often places topic first, then comment (e.g., "READY YOU?" instead of "YOU READY?"). | |
Sentence: "{text}" | |
""" | |
try: | |
completion = client.chat.completions.create( | |
model="deepseek/deepseek-r1:free", | |
messages=[{"role": "user", "content": prompt}], | |
temperature=0.3 | |
) | |
full_response = completion.choices[0].message.content.strip() | |
patterns = [ | |
r"SIGN_LANGUAGE_VERSION:\s*(.+?)(?:\n|$)", | |
r"\*\*Signs?\*\*:?\s*(.+?)(?:\n|$)", | |
r"\*\*Sign-language-friendly version:\*\*\s*(.+?)(?:\n|$)", | |
r"(?:^|\n)([A-Z\s\?\!]+)(?:\n|$)" | |
] | |
for pattern in patterns: | |
match = re.search(pattern, full_response, re.MULTILINE) | |
if match: | |
refined_text = match.group(1).strip() | |
return refined_text | |
first_line = full_response.split('\n')[0].strip() | |
return first_line | |
except Exception as e: | |
print(f"Error with DeepSeek API: {str(e)}") | |
words = text.split() | |
filtered_words = [w for w in words if w.lower() not in ['a', 'an', 'the', 'is', 'are', 'am']] | |
return ' '.join(filtered_words) | |
def retrieve_video(text, debug=False, similarity_threshold=0.9): | |
if not text or text.isspace(): | |
return None | |
text = preprocess_text(text) | |
if debug: | |
print(f"Creating embedding for '{text}'") | |
# Handle special case for "I" | |
if text.lower() == "i": | |
if "I/me" in text_to_video: | |
if debug: | |
print(f" Direct mapping found: '{text}' → 'I/me'") | |
return text_to_video["I/me"] | |
if index.ntotal == 0: | |
if debug: | |
print("No items in the index") | |
return None | |
query_embedding = embed_model.encode(text).astype(np.float32) | |
distances, closest_idx = index.search(np.array([query_embedding]), min(3, index.ntotal)) # Get top matches | |
closest_texts = [idx_to_text[idx] for idx in closest_idx[0]] | |
similarity_scores = distances[0] | |
if debug: | |
print(f"Top matches for '{text}':") | |
for i, (phrase, score) in enumerate(zip(closest_texts, similarity_scores)): | |
print(f" {i+1}. '{phrase}' (score: {score:.4f})") | |
if len(similarity_scores) > 0 and similarity_scores[0] < similarity_threshold: | |
closest_text = closest_texts[0] | |
query_word_count = len(text.split()) | |
match_word_count = len(closest_text.split()) | |
if query_word_count > 1 and match_word_count == 1: | |
if debug: | |
print(f"Rejecting single-word match '{closest_text}' for multi-word query '{text}'") | |
return None | |
if debug: | |
print(f" Found match: '{closest_text}' with score {similarity_scores[0]:.4f}") | |
return text_to_video.get(closest_text, None) | |
else: | |
if debug: | |
print(f"No match found with similarity below threshold {similarity_threshold}") | |
return None | |
def merge_videos(video_list, output_path="temp/output.mp4"): | |
os.makedirs("temp", exist_ok=True) | |
if not video_list: | |
return None | |
if len(video_list) == 1: | |
try: | |
import shutil | |
shutil.copy(video_list[0], output_path) | |
return output_path | |
except Exception as e: | |
print(f"Error copying single video: {e}") | |
return None | |
verified_paths = [] | |
for path in video_list: | |
if os.path.exists(path): | |
verified_paths.append(path) | |
else: | |
print(f"Warning: Video path does not exist: {path}") | |
if not verified_paths: | |
print("No valid video paths found") | |
return None | |
list_path = "temp/video_list.txt" | |
with open(list_path, "w") as f: | |
for path in verified_paths: | |
abs_path = os.path.abspath(path) | |
f.write(f"file '{abs_path}'\n") | |
abs_output = os.path.abspath(output_path) | |
abs_list = os.path.abspath(list_path) | |
command = f"ffmpeg -f concat -safe 0 -i '{abs_list}' -c copy '{abs_output}' -y" | |
print(f"Running command: {command}") | |
process = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
if process.returncode != 0: | |
print(f"FFmpeg error: {process.stderr.decode()}") | |
return None | |
return output_path | |
def save_video(video_path, output_path="temp/display_output.mp4"): | |
os.makedirs("temp", exist_ok=True) | |
if not video_path or not os.path.exists(video_path): | |
return None | |
if video_path != output_path: | |
os.system(f"cp '{video_path}' '{output_path}'") | |
return output_path | |
def text_to_sign_pipeline(user_input, debug=False): | |
user_input = preprocess_text(user_input) | |
if debug: | |
print(f"Processing input: '{user_input}'") | |
has_multiple_words = len(user_input.split()) > 1 | |
if not has_multiple_words: | |
direct_video = retrieve_video(user_input, debug=debug) | |
if direct_video: | |
if debug: | |
print(f"Single word match found for '{user_input}'") | |
return save_video(direct_video) | |
sign_friendly_sentence = refine_sentence_with_deepseek(user_input) | |
if debug: | |
print(f"DeepSeek refined input to: '{sign_friendly_sentence}'") | |
full_sentence_video = retrieve_video(sign_friendly_sentence, debug=debug) | |
if full_sentence_video: | |
if debug: | |
print(f"Found full sentence match for '{sign_friendly_sentence}'") | |
return save_video(full_sentence_video) | |
words = sign_friendly_sentence.split() | |
video_paths = [] | |
if debug: | |
print(f"No full sentence match. Trying word-by-word approach for: {words}") | |
for word in words: | |
clean_word = preprocess_text(word).replace('?', '') | |
if not clean_word or clean_word.isspace(): | |
continue | |
word_video = retrieve_video(clean_word, debug=debug) | |
if word_video: | |
print(f" Found video for word: '{clean_word}'") | |
video_paths.append(word_video) | |
else: | |
print(f" No video found for word: '{clean_word}'") | |
if not video_paths: | |
print(" No videos found for any words in the sentence") | |
return None | |
if debug: | |
print(f"Found videos for {len(video_paths)} words, merging...") | |
merged_video = merge_videos(video_paths) | |
return save_video(merged_video) | |
def encode_image_to_base64(image_path): | |
with open(image_path, "rb") as image_file: | |
return base64.b64encode(image_file.read()).decode('utf-8') | |
def preprocess_image(image_path): | |
img = cv2.imread(image_path) | |
if img is None: | |
return None | |
height, width = img.shape[:2] | |
right_side = img[:, width//2:width] | |
os.makedirs("temp", exist_ok=True) | |
cropped_path = "temp/cropped_image.jpg" | |
cv2.imwrite(cropped_path, right_side) | |
return cropped_path | |
def detect_text_in_image(image_path, debug=False): | |
base64_image = encode_image_to_base64(image_path) | |
prompt = """ | |
Is there any prominent text label or sign language text in this image? | |
Answer with ONLY "YES" or "NO". | |
""" | |
try: | |
completion = client.chat.completions.create( | |
model="qwen/qwen2.5-vl-32b-instruct:free", | |
messages=[ | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "text", "text": prompt}, | |
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}} | |
] | |
} | |
], | |
temperature=0.3 | |
) | |
response = completion.choices[0].message.content.strip().upper() | |
if debug: | |
print(f"Text detection response: {response}") | |
return "YES" in response | |
except Exception as e: | |
if debug: | |
print(f"Error in text detection: {str(e)}") | |
return False | |
def image_to_text_with_qwen(image_path, debug=False): | |
base64_image = encode_image_to_base64(image_path) | |
has_text = detect_text_in_image(image_path, debug) | |
if has_text: | |
cropped_image_path = preprocess_image(image_path) | |
if cropped_image_path: | |
cropped_base64 = encode_image_to_base64(cropped_image_path) | |
prompt = """ | |
Extract ONLY the main text label from this image. I'm looking for a single word or short phrase | |
that appears as the main text (like "AFTERNOON"). Ignore any numbers, categories, or other text. | |
Provide ONLY the extracted text without any other explanation or context. | |
""" | |
try: | |
completion = client.chat.completions.create( | |
model="qwen/qwen2.5-vl-32b-instruct:free", | |
messages=[ | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "text", "text": prompt}, | |
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{cropped_base64}"}} | |
] | |
} | |
], | |
temperature=0.3 | |
) | |
response = completion.choices[0].message.content.strip() | |
if debug: | |
print(f"Qwen VL text extraction response: {response}") | |
cleaned_text = re.sub(r"^(the|main|text|label|is|:|\.|\s)+", "", response, flags=re.IGNORECASE) | |
cleaned_text = re.sub(r'["\'\(\)]', '', cleaned_text) | |
cleaned_text = cleaned_text.strip().upper() | |
if cleaned_text: | |
return cleaned_text, "text" | |
except Exception as e: | |
if debug: | |
print(f"Error using Qwen VL for text extraction: {str(e)}") | |
prompt = """ | |
Describe this image in a SINGLE WORD only. | |
Focus on the main subject (like "MAN", "WOMAN", "HOUSE", "HAPPY", "SAD", etc.). | |
Provide ONLY this single word without any punctuation or explanation. | |
""" | |
try: | |
completion = client.chat.completions.create( | |
model="qwen/qwen2.5-vl-32b-instruct:free", | |
messages=[ | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "text", "text": prompt}, | |
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}} | |
] | |
} | |
], | |
temperature=0.3 | |
) | |
response = completion.choices[0].message.content.strip() | |
if debug: | |
print(f"Qwen VL caption response: {response}") | |
cleaned_caption = re.sub(r'[^\w\s]', '', response) | |
cleaned_caption = cleaned_caption.strip().split()[0] | |
cleaned_caption = cleaned_caption.upper() | |
return cleaned_caption, "caption" | |
except Exception as e: | |
if debug: | |
print(f"Error using Qwen VL for captioning: {str(e)}") | |
return "ERROR", "error" | |
def process_text(input_text): | |
if not input_text or input_text.isspace(): | |
return "Please enter some text to convert." | |
final_video = text_to_sign_pipeline(input_text, debug=True) | |
if final_video: | |
return final_video | |
else: | |
return "Sorry, no matching sign language video found." | |
def process_image(input_image): | |
os.makedirs("temp", exist_ok=True) | |
image_path = "temp/uploaded_image.jpg" | |
input_image.save(image_path) | |
extracted_text, source_type = image_to_text_with_qwen(image_path, debug=True) | |
if extracted_text == "ERROR": | |
return "Error processing image", None | |
sign_video = text_to_sign_pipeline(extracted_text, debug=True) | |
if source_type == "text": | |
result_text = f"Extracted text: {extracted_text}" | |
else: | |
result_text = f"Generated caption: {extracted_text}" | |
return result_text, sign_video if sign_video else "No matching sign language video found" | |
with gr.Blocks() as app: | |
gr.Markdown("# Sign Language Conversion") | |
with gr.Tabs(): | |
with gr.Tab("Text to Sign"): | |
text_input = gr.Textbox(label="Enter text to convert to sign language") | |
text_button = gr.Button("Convert Text to Sign") | |
text_output = gr.Video(label="Sign Language Output") | |
text_button.click(process_text, inputs=text_input, outputs=text_output) | |
with gr.Tab("Image to Text/Caption and Sign"): | |
image_input = gr.Image(type="pil", label="Upload image") | |
image_button = gr.Button("Process Image and Convert to Sign") | |
extracted_text_output = gr.Textbox(label="Extracted Text/Caption") | |
image_output = gr.Video(label="Sign Language Output") | |
image_button.click( | |
process_image, | |
inputs=image_input, | |
outputs=[extracted_text_output, image_output] | |
) | |
app.launch() |