import os import time import threading import queue import multiprocessing from pathlib import Path import torch import gradio as gr from huggingface_hub import hf_hub_download import numpy as np # Set up environment variables for CPU optimization os.environ["OMP_NUM_THREADS"] = str(max(1, multiprocessing.cpu_count() - 1)) # Optimal OpenMP threads os.environ["MKL_NUM_THREADS"] = str(max(1, multiprocessing.cpu_count() - 1)) # Optimal MKL threads os.environ["LLAMA_AVX"] = "1" os.environ["LLAMA_AVX2"] = "1" os.environ["LLAMA_F16"] = "1" # Cache directories CACHE_DIR = Path.home() / ".cache" / "fast_translate" MODEL_CACHE = CACHE_DIR / "models" QUANTIZED_CACHE = CACHE_DIR / "quantized" os.makedirs(MODEL_CACHE, exist_ok=True) os.makedirs(QUANTIZED_CACHE, exist_ok=True) # Check if we're running on CPU has_gpu = torch.cuda.is_available() gpu_name = torch.cuda.get_device_name(0) if has_gpu else "No GPU" print(f"GPU available: {has_gpu} - {gpu_name}") # Configure CPU settings cpu_count = multiprocessing.cpu_count() optimal_threads = max(4, cpu_count - 1) # Leave one core free print(f"Using {optimal_threads} of {cpu_count} CPU cores") # Download model files def get_model_path(repo_id, filename): print(f"Obtaining {filename}...") # Download to our custom cache location return hf_hub_download(repo_id=repo_id, filename=filename, cache_dir=MODEL_CACHE) # Function to quantize model to int4 or int8 def quantize_model(input_model_path, output_model_path, quantization_type="q4_0"): """Quantize model to lower precision for faster inference on CPU""" try: from llama_cpp import llama_model_quantize # Check if quantized model already exists if os.path.exists(output_model_path): print(f"Using existing quantized model: {output_model_path}") return output_model_path print(f"Quantizing model to {quantization_type}...") start_time = time.time() # Quantize using llama-cpp-python built-in quantization llama_model_quantize( input_model_path, output_model_path, quantization_type ) print(f"Quantization completed in {time.time() - start_time:.2f}s") return output_model_path except Exception as e: print(f"Quantization failed: {e}, using original model") return input_model_path # Download models base_model_path = get_model_path( "johnpaulbin/articulate-11-expspanish-base-merged-Q8_0-GGUF", "articulate-11-expspanish-base-merged-q8_0.gguf" ) adapter_path = get_model_path( "johnpaulbin/articulate-V1-Q8_0-GGUF", "articulate-V1-q8_0.gguf" ) # Quantize models (creates int4 versions for faster CPU inference) quantized_base_path = str(QUANTIZED_CACHE / "articulate-base-q4_0.gguf") quantized_adapter_path = str(QUANTIZED_CACHE / "articulate-adapter-q4_0.gguf") base_model_path = quantize_model(base_model_path, quantized_base_path, "q4_0") adapter_path = quantize_model(adapter_path, quantized_adapter_path, "q4_0") # Import after setting environment variables from llama_cpp import Llama # Translation cache translation_cache = {} MAX_CACHE_SIZE = 1000 # Model worker with batching support class ModelWorker: def __init__(self): self.model = None self.request_queue = queue.Queue() self.response_queue = queue.Queue() self.batch_queue = [] self.batch_event = threading.Event() self.batch_size = 4 # Process up to 4 requests at once self.batch_timeout = 0.1 # Wait 100ms max to collect batch self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True) self.batch_thread = threading.Thread(target=self._batch_loop, daemon=True) self.worker_thread.start() self.batch_thread.start() def _batch_loop(self): """Collect requests into batches for more efficient processing""" while True: try: # Get a request request = self.request_queue.get() if request is None: break # Add to batch self.batch_queue.append(request) # Try to collect more requests for the batch batch_start = time.time() while (len(self.batch_queue) < self.batch_size and time.time() - batch_start < self.batch_timeout): try: req = self.request_queue.get_nowait() if req is None: break self.batch_queue.append(req) except queue.Empty: time.sleep(0.01) # Signal worker to process the batch current_batch = self.batch_queue.copy() self.batch_queue = [] for req in current_batch: self._process_request(req) except Exception as e: print(f"Error in batch thread: {e}") def _worker_loop(self): """Initialize model and process requests""" try: # Initialize model with optimized settings print("Initializing model in background thread...") start_time = time.time() # Create model context with very optimized settings for CPU self.model = Llama( model_path=base_model_path, lora_path=adapter_path, n_ctx=256, # Smaller context for speed n_threads=optimal_threads, # Use all but one CPU core n_batch=512, # Smaller batch for CPU use_mmap=True, # Memory mapping (more efficient) n_gpu_layers=0, # Force CPU only seed=42, # Consistent results rope_freq_base=10000, # Default RoPE settings rope_freq_scale=1.0, verbose=False # Reduce overhead ) print(f"Model loaded in {time.time() - start_time:.2f} seconds") # Pre-warm the model with common phrases by running a simple inference print("Pre-warming model...") self.model.create_completion("[ENGLISH]hello[SPANISH]", max_tokens=8) print("Model ready for translation") except Exception as e: print(f"Failed to initialize model: {e}") def _process_request(self, request): """Process a single translation request""" try: direction, text, callback_id = request result = self._process_translation(direction, text) self.response_queue.put((callback_id, result)) except Exception as e: print(f"Error processing request: {e}") self.response_queue.put((callback_id, f"Error: {str(e)}")) def _process_translation(self, direction, text): """Translate text with optimized settings""" if not text or not text.strip(): return "" # Check cache first for faster response cache_key = f"{direction}:{text}" if cache_key in translation_cache: print("Cache hit!") return translation_cache[cache_key] # Start timing for performance tracking start_time = time.time() # Map language directions lang_map = { "English to Spanish": ("ENGLISH", "SPANISH"), "Spanish to English": ("SPANISH", "ENGLISH"), "Korean to English": ("KOREAN", "ENGLISH"), "English to Korean": ("ENGLISH", "KOREAN") } if direction not in lang_map: return "Invalid direction" source_lang, target_lang = lang_map[direction] # Efficient prompt format prompt = f"[{source_lang}]{text.strip()}[{target_lang}]" # Estimate appropriate token length based on input input_tokens = min(100, max(10, len(text.split()))) max_tokens = min(100, max(25, int(input_tokens * 1.3))) # Generate translation with aggressively optimized settings for speed response = self.model.create_completion( prompt, max_tokens=max_tokens, temperature=0.0, # Deterministic top_k=1, # Most likely token top_p=1.0, # No sampling repeat_penalty=1.0, # No penalty stream=False # Get complete response ) translation = response['choices'][0]['text'].strip() # Cache result if len(translation_cache) >= MAX_CACHE_SIZE: # Remove oldest entry (first key) translation_cache.pop(next(iter(translation_cache))) translation_cache[cache_key] = translation # Log performance inference_time = time.time() - start_time tokens_per_second = (input_tokens + len(translation.split())) / inference_time print(f"Translation: {inference_time:.3f}s ({tokens_per_second:.1f} tokens/sec)") return translation def request_translation(self, direction, text, callback_id): """Queue a translation request""" self.request_queue.put((direction, text, callback_id)) # Model preloading thread that preloads and pre-computes common translations def preload_common_phrases(worker): # Dictionary of common phrases that will benefit from caching common_phrases = { "English to Spanish": [ "Hello", "Thank you", "Good morning", "How are you?", "What's your name?", "I don't understand", "Please", "Sorry", "Yes", "No", "Where is", "How much does it cost?", "What time is it?", "I don't speak Spanish", "Where is the bathroom?", "I need help", "Can you help me?" ], "Spanish to English": [ "Hola", "Gracias", "Buenos días", "¿Cómo estás?", "¿Cómo te llamas?", "No entiendo", "Por favor", "Lo siento", "Sí", "No", "Dónde está", "¿Cuánto cuesta?", "¿Qué hora es?", "No hablo español", "¿Dónde está el baño?", "Necesito ayuda", "¿Puedes ayudarme?" ], "English to Korean": [ "Hello", "Thank you", "Good morning", "How are you?", "What's your name?", "I don't understand", "Please", "Sorry", "Yes", "No", "Where is", "How much is this?", "What time is it?", "I don't speak Korean" ], "Korean to English": [ "안녕하세요", "감사합니다", "좋은 아침입니다", "어떻게 지내세요?", "이름이 뭐예요?", "이해가 안 돼요", "제발", "죄송합니다", "네", "아니요", "어디에 있어요", "이거 얼마예요?", "지금 몇 시예요?", "한국어를 못해요" ] } preload_requests = [] for direction, phrases in common_phrases.items(): for phrase in phrases: preload_requests.append((direction, phrase, f"preload_{len(preload_requests)}")) # Process preloading in a separate thread def preloader(): print(f"Preloading {len(preload_requests)} common phrases in background...") for request in preload_requests: worker.request_translation(*request) # Small sleep to avoid overwhelming the queue time.sleep(0.1) print("Preloading complete") thread = threading.Thread(target=preloader, daemon=True) thread.start() return thread # Create worker instance worker = ModelWorker() # Start preloading common phrases in background preload_thread = preload_common_phrases(worker) # Counter for request IDs next_request_id = 0 # Implementation of a faster sentence splitter for batching def split_sentences(text, max_length=50): """Split text into manageable chunks for faster translation""" if len(text) <= max_length: return [text] # Split on natural boundaries delimiters = ['. ', '! ', '? ', '.\n', '!\n', '?\n', '\n\n'] chunks = [] current_chunk = "" lines = text.split('\n') for line in lines: if not line.strip(): if current_chunk: chunks.append(current_chunk) current_chunk = "" continue words = line.split(' ') for word in words: test_chunk = f"{current_chunk} {word}".strip() if len(test_chunk) > max_length: chunks.append(current_chunk) current_chunk = word else: current_chunk = test_chunk # Check for natural breaks for delimiter in delimiters: if delimiter in current_chunk[-len(delimiter):]: chunks.append(current_chunk) current_chunk = "" break if current_chunk: chunks.append(current_chunk) return chunks # Gradio interface functions def translate(direction, text, progress=gr.Progress()): """Fast translation with batching and caching""" global next_request_id # Skip empty inputs if not text or not text.strip(): return "" # Check exact cache hit cache_key = f"{direction}:{text}" if cache_key in translation_cache: return translation_cache[cache_key] # For longer texts, split into sentences for faster processing if len(text) > 50: progress(0.1, desc="Processing text...") chunks = split_sentences(text) if len(chunks) > 1: results = [] for i, chunk in enumerate(chunks): # Check if this chunk is in cache chunk_key = f"{direction}:{chunk}" if chunk_key in translation_cache: results.append(translation_cache[chunk_key]) continue # Request translation for this chunk chunk_id = next_request_id next_request_id += 1 worker.request_translation(direction, chunk, chunk_id) # Wait for response chunk_start = time.time() while time.time() - chunk_start < 10: # 10 second timeout per chunk progress((i + 0.5) / len(chunks), desc=f"Translating part {i+1}/{len(chunks)}") try: while not worker.response_queue.empty(): resp_id, result = worker.response_queue.get_nowait() if resp_id == chunk_id: results.append(result) chunk_found = True break except queue.Empty: pass time.sleep(0.05) if len(results) != i + 1: results.append(f"[Translation failed for part {i+1}]") combined = " ".join(results) translation_cache[cache_key] = combined progress(1.0) return combined # For single sentences request_id = next_request_id next_request_id += 1 # Queue the request worker.request_translation(direction, text, request_id) # Wait for the response progress(0.2, desc="Translating...") start_time = time.time() max_wait = 20 # Maximum wait time in seconds while time.time() - start_time < max_wait: progress(0.2 + 0.8 * ((time.time() - start_time) / max_wait), desc="Translating...") # Check for our response try: while not worker.response_queue.empty(): resp_id, result = worker.response_queue.get_nowait() if resp_id == request_id: progress(1.0) return result except queue.Empty: pass # Small sleep to prevent CPU hogging time.sleep(0.05) progress(1.0) return "Translation timed out. Please try again with a shorter text." # Create Gradio interface with gr.Blocks(title="Ultra-Fast Translation App (CPU Optimized)") as iface: gr.Markdown(f""" ## Ultra-Fast Translation App (CPU Optimized) Running on: {'GPU: ' + gpu_name if has_gpu else 'CPU optimized with int4 quantization'} """) with gr.Row(): direction = gr.Dropdown( choices=["English to Spanish", "Spanish to English", "Korean to English", "English to Korean"], label="Translation Direction", value="English to Spanish" ) with gr.Row(): input_text = gr.Textbox(lines=5, label="Input Text", placeholder="Enter text to translate...") output_text = gr.Textbox(lines=5, label="Translation") # Add translate button translate_btn = gr.Button("Translate") translate_btn.click(fn=translate, inputs=[direction, input_text], outputs=output_text) # Optimization options with gr.Accordion("Performance Tips", open=True): gr.Markdown(""" ### Speed Optimization Tips - ✅ The model has been quantized to int4 for faster CPU execution - ✅ Common phrases are pre-cached for instant results - ✅ Long text is automatically split into smaller chunks - ✅ First translation will be slower as the model warms up - ✅ Short sentences (< 50 chars) translate much faster """) # Add examples with preloaded common phrases gr.Examples( examples=[ ["English to Spanish", "Hello, how are you today?"], ["Spanish to English", "Hola, ¿cómo estás hoy?"], ["English to Korean", "The weather is nice today."], ["Korean to English", "안녕하세요, 만나서 반갑습니다."] ], inputs=[direction, input_text], fn=translate, outputs=output_text ) # Launch with optimized settings if __name__ == "__main__": iface.launch( debug=False, show_error=True, share=False, quiet=True, server_name="0.0.0.0", server_port=7860 )