Spaces:
Running
on
Zero
Running
on
Zero
File size: 32,030 Bytes
8eea094 4ec9e88 661d10b 570eaa9 b074257 f2f927b 8eea094 f2f927b 0d61fe6 f2f927b 570eaa9 f2f927b 545bc06 f2f927b 545bc06 f2f927b 545bc06 f2f927b 5c58fd6 f2f927b 8eea094 f2f927b 8eea094 f2f927b 8eea094 f2f927b 8eea094 f2f927b 8eea094 f2f927b 661d10b f2f927b 8eea094 f2f927b 8eea094 f2f927b 8eea094 f2f927b 545bc06 f2f927b 570eaa9 f2f927b b074257 f2f927b b074257 f2f927b 545bc06 f2f927b 545bc06 b074257 f2f927b b074257 f2f927b 545bc06 f2f927b b074257 f2f927b b074257 f2f927b b074257 f2f927b b074257 f2f927b 545bc06 b074257 f2f927b b074257 545bc06 f2f927b b074257 4ec9e88 f2f927b 570eaa9 b074257 545bc06 570eaa9 f2f927b b074257 f2f927b b074257 f2f927b 545bc06 f2f927b 545bc06 f2f927b a528449 f2f927b a528449 f2f927b a528449 b074257 a528449 f2f927b 570eaa9 f2f927b 570eaa9 f2f927b 2af55e5 b074257 545bc06 b074257 f2f927b b074257 f2f927b b074257 8eea094 b074257 f2f927b 545bc06 f2f927b 545bc06 f2f927b 545bc06 f2f927b 2af55e5 a528449 2af55e5 a528449 f2f927b b074257 f2f927b 2af55e5 570eaa9 f2f927b 570eaa9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 |
from collections import defaultdict
import spaces
import math
import os
import gradio as gr
import torch
import itertools # For color cycling
import tiktoken # For GPT-4 tokenizer
from transformers import AutoTokenizer, HfArgumentParser # For Llama3 tokenizer & args potentially
import traceback # For detailed error logging
import logging # For better logging practices
from typing import Optional, Tuple, List, Dict, Any
import matplotlib.figure # For type hinting
import matplotlib.pyplot as plt
# --- Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class Config:
# Visualization
VIZ_COLORS: List[str] = [
"#a6cee3", "#1f78b4", "#b2df8a", "#33a02c", "#fb9a99", "#e31a1c",
"#fdbf6f", "#ff7f00", "#cab2d6", "#6a3d9a", "#ffff99", "#b15928"
]
MAX_EXPECTED_SEGMENTS: int = 1 # Max segments for color map generation
# Model/Tokenizer Names
LLAMA3_MODEL_NAME: str = "meta-llama/Meta-Llama-3-8B" # Or choose another variant like Instruct
TIKTOKEN_ENCODING_NAME: str = "cl100k_base"
BLT_MODEL_NAME: str = "blt-1b" # Default Bytelatent model
# Bytelatent Specific
BLT_WEIGHTS_DIR: str = "hf-weights"
BLT_MAX_BYTES_FOR_DEMO: int = 512
# Gradio
DEFAULT_PROMPT: str = "Daenerys Targaryen is in Game of Thrones, a fantasy epic by George R.R. Martin."
GRADIO_THEME = gr.themes.Origin()
GRADIO_TITLE: str = "BLT's Entropy-based Patcher vs. Tokenizer Visualisation"
GRADIO_DESC: str = (
"Enter text to visualize its segmentation according to different methods:\n"
f"1. **Byte Latent Transformer (BLT):** Entropy-based patching plot and patched text (using `blt_main_entropy_100m_512w`).\n"
f"2. **Tiktoken (GPT-4):** Text segmented by `{TIKTOKEN_ENCODING_NAME}` tokens.\n"
f"3. **Llama 3:** Text segmented by the `{LLAMA3_MODEL_NAME}` tokenizer."
)
# --- Bytelatent Processor ---
# Attempt to import Bytelatent libraries
try:
from bytelatent.data.file_util import get_fs
from bytelatent.generate_patcher import patcher_nocache
from bytelatent.tokenizers.blt_tokenizer import BltTokenizer
from bytelatent.plotting.entropy_figure_via_matplot_lib import plot_entropies
from bytelatent.args import TrainArgs
from download_blt_weights import main as ensure_present # Assuming this downloads weights
_BLT_AVAILABLE = True
logging.info("Bytelatent libraries found.")
except ImportError as e:
logging.warning(f"Bytelatent libraries not found. Bytelatent functionality will be disabled. Error: {e}")
_BLT_AVAILABLE = False
# Define dummy classes/functions if BLT is not available to avoid NameErrors later
class BltTokenizer: pass
class TrainArgs: pass
def patcher_nocache(*args, **kwargs): return None
def plot_entropies(*args, **kwargs): return None
def ensure_present(*args, **kwargs): pass
matplotlib = None # No plotting if BLT isn't there
class BytelatentProcessor:
"""Handles loading and running the Bytelatent entropy model."""
def __init__(self, model_name: str, weights_dir: str):
self.model_name = model_name
self.weights_dir = weights_dir
self.is_available: bool = False
self.tokenizer: Optional[BltTokenizer] = None
self.patcher: Optional[Any] = None # Type depends on bytelatent implementation
self.device: str = "cuda" if torch.cuda.is_available() else "cpu"
if _BLT_AVAILABLE:
try:
# 1. Ensure weights are present
logging.info(f"Ensuring Bytelatent model '{model_name}' weights are present...")
ensure_present([model_name]) # Call the download script
logging.info("Bytelatent model check complete.")
# 2. Load Bytelatent model components
consolidated_path = os.path.join(self.weights_dir, model_name)
train_args_path = os.path.join(consolidated_path, "train_args.json")
entropy_model_dir = os.path.join(consolidated_path, "entropy_model")
if not os.path.exists(train_args_path):
raise FileNotFoundError(f"BLT training args not found at {train_args_path}.")
if not os.path.exists(entropy_model_dir):
raise FileNotFoundError(f"BLT Entropy model directory not found at {entropy_model_dir}.")
fs = get_fs(train_args_path)
train_args = TrainArgs.model_validate_json(fs.read_text(train_args_path))
self.tokenizer = train_args.data.tokenizer_args.build()
assert isinstance(self.tokenizer, BltTokenizer), "Failed to build Bytelatent Tokenizer"
patcher_args = train_args.data.patcher_args.model_copy(deep=True)
patcher_args.realtime_patching = True
patcher_args.patching_device = self.device
patcher_args.device = self.device
patcher_args.entropy_model_checkpoint_dir = entropy_model_dir
self.patcher = patcher_args.build()
self.is_available = True
logging.info(f"Bytelatent processor for '{model_name}' loaded successfully on device '{self.device}'.")
except FileNotFoundError as e:
logging.error(f"Bytelatent setup failed: Required file/directory not found. {e}")
except Exception as e:
logging.error(f"An unexpected error occurred during Bytelatent setup: {e}")
logging.error(traceback.format_exc())
else:
logging.warning("Skipping Bytelatent setup as libraries are unavailable.")
def _create_highlight_data(self, patch_lengths: torch.Tensor, tokens: torch.Tensor) -> Tuple[List[Tuple[str, str]], int]:
"""Generates data for gr.HighlightedText based on bytelatent patches,
formatting each byte's display text as 'char-byte_index'."""
if not self.is_available or self.tokenizer is None:
return [("Bytelatent processing unavailable.", "Error")], 0
if patch_lengths.numel() == 0 and tokens.numel() == 0: # No data at all
return [("No tokens or patches.", "Info")], 0
if tokens.numel() == 0: # No tokens to process
# Count patches even if no tokens, as per original logic for patch_count
actual_patch_count = 0
for length in patch_lengths.tolist():
if length > 0:
actual_patch_count +=1
return [("No tokens provided to highlight.", "Info")], actual_patch_count
patch_lengths_list = patch_lengths.tolist()
all_token_ids = tokens.tolist() # These are byte representations (integer IDs)
highlighted_data: List[Tuple[str, str]] = []
# Calculate original patch count (number of non-empty patches)
actual_patch_count = 0
for length in patch_lengths_list:
if length > 0:
actual_patch_count +=1
# Create a map from global token index to its patch label
token_to_patch_label = [""] * len(all_token_ids)
current_token_processed_for_patches = 0
patch_idx_counter = 0
for length in patch_lengths_list:
if length <= 0:
continue
patch_label = f"BL Patch {patch_idx_counter + 1}"
patch_idx_counter += 1
for _ in range(length):
if current_token_processed_for_patches < len(all_token_ids):
token_to_patch_label[current_token_processed_for_patches] = patch_label
current_token_processed_for_patches += 1
# Handle remainder tokens label
if current_token_processed_for_patches < len(all_token_ids):
remainder_label = "BL Remainder"
logging.warning(
f"Bytelatent patch lengths sum ({current_token_processed_for_patches}) "
f"is less than total tokens ({len(all_token_ids)}). "
f"Remainder tokens will be labelled '{remainder_label}'."
)
for k in range(current_token_processed_for_patches, len(all_token_ids)):
token_to_patch_label[k] = remainder_label
elif current_token_processed_for_patches > len(all_token_ids) and len(all_token_ids) > 0 :
logging.warning(
f"Bytelatent patch lengths sum ({current_token_processed_for_patches}) "
f"exceeds total tokens ({len(all_token_ids)}). "
f"Patch label mapping might be affected."
)
global_token_idx = 0
while global_token_idx < len(all_token_ids):
char_representation = ""
decoded_byte_ids: List[int] = []
# Handle the special case for token ID 1, often representing '<' or similar
# This assumes token ID 1 should always be treated as a single character '<'.
# Adjust if your tokenizer handles ID 1 differently or if it can be part of a multi-byte sequence.
if all_token_ids[global_token_idx] == 1:
char_representation = "<" # As per user's original code snippet's implication
decoded_byte_ids = [1]
else:
# Iteratively try to decode a character (1 to 4 bytes for UTF-8)
for length_to_try in range(1, 5):
if global_token_idx + length_to_try > len(all_token_ids):
break # Not enough tokens left for this length
current_ids_to_try = all_token_ids[global_token_idx : global_token_idx + length_to_try]
try:
temp_decode_text = self.tokenizer.decode(current_ids_to_try)
if temp_decode_text: # Successfully decoded something
# This means `current_ids_to_try` forms a valid character(s).
# We take the first successful decode, assuming it's the shortest complete char.
char_representation = temp_decode_text
decoded_byte_ids = current_ids_to_try
break # Found a character
except Exception as e:
# Decoding failed (e.g., incomplete sequence for this length_to_try).
# Log this if it's unexpected for a particular tokenizer.
# logging.debug(f"Decode attempt failed for {current_ids_to_try}: {e}")
pass # Continue to try with more bytes.
# After trying to decode:
if char_representation and decoded_byte_ids:
num_bytes_in_char = len(decoded_byte_ids)
# Ensure char_representation is treated as a single conceptual unit here.
# If tokenizer.decode can return multiple characters for a short byte sequence,
# this might need adjustment. For UTF-8, one char is expected.
processed_char_text = char_representation.splitlines()[0] # Take first char if multiple, or clean up
for j in range(num_bytes_in_char):
current_byte_abs_idx = global_token_idx + j
# Boundary check, though loop structure should prevent out-of-bounds
if current_byte_abs_idx < len(all_token_ids):
label = token_to_patch_label[current_byte_abs_idx] if current_byte_abs_idx < len(token_to_patch_label) else "Error: Label Missing"
display_text = f"{processed_char_text}-{j+1}".replace(" ", "_")
highlighted_data.append((display_text, label))
else: # Should ideally not be reached
logging.error(f"Critical: Token index {current_byte_abs_idx} out of bounds for labeling.")
global_token_idx += num_bytes_in_char
else:
# Fallback: Could not form a character starting at global_token_idx.
# Treat the current byte as a standalone problematic byte.
current_byte_abs_idx = global_token_idx
label = token_to_patch_label[current_byte_abs_idx] if current_byte_abs_idx < len(token_to_patch_label) else "Error: Label Missing"
problem_byte_id = all_token_ids[current_byte_abs_idx]
display_text = f"err_byte({problem_byte_id})-1"
# Attempt to get a direct representation if tokenizer can provide one for the single byte
try:
single_byte_char_attempt = self.tokenizer.decode([problem_byte_id])
if single_byte_char_attempt and single_byte_char_attempt != "\ufffd": # Replacement char
display_text = f"{single_byte_char_attempt}-1"
except Exception:
pass # Stick with the err_byte display_text
highlighted_data.append((display_text.replace(" ", "_"), label))
logging.warning(
f"Token ID {problem_byte_id} at index {current_byte_abs_idx} "
f"could not be part of a validly decoded character using iterative decode. Fallback: '{display_text}'."
)
global_token_idx += 1
return highlighted_data, actual_patch_count
def process(self, prompt: str, max_bytes: float) -> Tuple[Optional[matplotlib.figure.Figure], List[Tuple[str, str]], int, str]:
"""Processes the prompt using the loaded Bytelatent model."""
status = ""
if not self.is_available or self.tokenizer is None or self.patcher is None:
status = "Bytelatent processor not available."
return None, [("Bytelatent not available.", "Error")], 0, status
# Truncate prompt if necessary for this demo's model
prompt_bytes = prompt.encode('utf-8')
prompt_bl = prompt
if len(prompt_bytes) > max_bytes:
try:
# Find last full character within limit (simple space split fallback)
try:
prompt_bl = prompt_bytes[:max_bytes].decode('utf-8', errors='strict')
# If successful, find last space to avoid cutting mid-word visually
last_space = prompt_bl.rfind(' ')
if last_space != -1:
prompt_bl = prompt_bl[:last_space]
except UnicodeDecodeError:
# If strict fails, find last valid byte sequence start before max_bytes
i = max_bytes
while i > 0:
try:
prompt_bytes[:i].decode('utf-8', errors='strict')
break # Found valid end point
except UnicodeDecodeError:
i -= 1
prompt_bl = prompt_bytes[:i].decode('utf-8', errors='ignore') # Decode ignoring errors now
trunc_len = len(prompt_bl.encode('utf-8'))
status = f"Warning: Prompt truncated to {trunc_len} bytes for Bytelatent entropy model.\n"
logging.warning(status.strip())
except Exception as trunc_err:
# Fallback if complex truncation fails
prompt_bl = prompt_bytes[:max_bytes].decode('utf-8', errors='ignore')
trunc_len = len(prompt_bl.encode('utf-8'))
status = f"Warning: Prompt aggressively truncated to ~{trunc_len} bytes due to encoding issue. Error: {trunc_err}\n"
logging.warning(status.strip())
# Run Bytelatent patching
try:
logging.info(f"Running Bytelatent entropy model patching on {len(prompt_bl.encode('utf-8'))} bytes...")
results = patcher_nocache(
[prompt_bl],
tokenizer=self.tokenizer,
patcher=self.patcher,
max_prompt_len=512,
max_gen_len=256,
)
status += "Bytelatent patching executed.\n"
if not results:
logging.warning("Bytelatent entropy processing returned no results.")
status += "Warning: Bytelatent generated no patches."
return None, [("No patches generated by Bytelatent.", "Info")], 0, status
batch_patch_lengths, batch_scores, batch_tokens = results
patch_lengths, scores, tokens = batch_patch_lengths[0], batch_scores[0], batch_tokens[0]
# Create highlighted text data
_highlighted_data, patch_count = self._create_highlight_data(patch_lengths, tokens)
ind_highlighted_data = [(text.replace("-1", ""), label) for text, label in _highlighted_data]
grouped_data = defaultdict(str)
for text, label in ind_highlighted_data:
grouped_data[label] += text
highlighted_data = [(text, label) for label, text in grouped_data.items()]
# Create plot
fig = None
if plot_entropies is not None: # Check if plotting function is available
try:
# Use the potentially truncated prompt_bl for the plot text axis if full decode fails
decoded_output_for_plot = self.tokenizer.decode(tokens.tolist())
except Exception as decode_err:
logging.warning(f"Error decoding full BLT token sequence for plot: {decode_err}. Using (truncated) input prompt for plot axis.")
decoded_output_for_plot = prompt_bl
# fig = plot_entropies(patch_lengths, scores, decoded_output_for_plot, threshold=self.patcher.threshold)
fig = plot_entropies(
patch_lengths,
scores,
tokens,
chars=decoded_output_for_plot,
threshold=self.patcher.threshold
)
status += f"Bytelatent plot generated. Found {patch_count} patches.\n"
else:
status += "Plotting unavailable.\n"
logging.info(f"Bytelatent processing complete. Patches: {patch_count}")
return fig, highlighted_data, patch_count, status.strip()
except Exception as e:
logging.error(f"An error occurred during Bytelatent processing: {e}")
logging.error(traceback.format_exc())
status += f"Error during Bytelatent processing: {e}"
return None, [(f"Bytelatent Error: {e}", "Error")], 0, status.strip()
# --- Tokenizer Helpers ---
def create_tiktoken_highlight_data(prompt: str, encoding: tiktoken.Encoding) -> Tuple[List[Tuple[str, str]], int, str]:
"""Generates data for gr.HighlightedText based on tiktoken."""
status = "Processing with Tiktoken...\n"
try:
tiktoken_ids = encoding.encode(prompt)
highlighted_data = []
for i, token_id in enumerate(tiktoken_ids):
try:
token_text = encoding.decode([token_id])
except (UnicodeDecodeError, TypeError): # Handle bytes that don't form valid unicode
try:
token_bytes = encoding.decode_single_token_bytes(token_id)
token_text = f"[Bytes: {token_bytes.hex()}]"
except Exception: token_text = "[Decode Error]"
except Exception as e:
logging.warning(f"Unexpected tiktoken decode error for token ID {token_id}: {e}")
token_text = "[Decode Error]"
token_label = f"GPT4 Tk {i+1}"
highlighted_data.append((token_text, token_label))
token_count = len(tiktoken_ids)
status += f"Tiktoken processing successful ({token_count} tokens)."
logging.info(f"Tiktoken processing complete. Found {token_count} tokens.")
return highlighted_data, token_count, status.strip()
except Exception as e:
logging.error(f"Error during tiktoken processing: {e}")
logging.error(traceback.format_exc())
status += f"Error during Tiktoken processing: {e}"
return [(f"Error processing with tiktoken: {e}", "Error")], 0, status.strip()
def create_llama3_highlight_data(prompt: str, tokenizer: AutoTokenizer) -> Tuple[List[Tuple[str, str]], int, str]:
"""Generates data for gr.HighlightedText based on Llama 3 tokenizer."""
status = f"Processing with Llama 3 ({tokenizer.name_or_path})...\n"
try:
llama_token_ids = tokenizer.encode(prompt)
highlighted_data = []
for i, token_id in enumerate(llama_token_ids):
try:
# Decode individual token. Add special handling if needed for specific tokenizers.
token_text = tokenizer.decode([token_id])
except Exception as e:
logging.warning(f"Unexpected Llama 3 decode error for token ID {token_id}: {e}")
token_text = "[Decode Error]"
token_label = f"Llama3 Tk {i+1}"
highlighted_data.append((token_text, token_label))
token_count = len(llama_token_ids)
status += f"Llama 3 processing successful ({token_count} tokens)."
logging.info(f"Llama 3 processing complete. Found {token_count} tokens.")
return highlighted_data, token_count, status.strip()
except Exception as e:
logging.error(f"Error during Llama 3 processing: {e}")
logging.error(traceback.format_exc())
status += f"Error during Llama 3 processing: {e}"
return [(f"Error processing with Llama 3: {e}", "Error")], 0, status.strip()
# --- Global Initializations ---
# Initialize Bytelatent Processor (loads model if available)
blt_processor = BytelatentProcessor(Config.BLT_MODEL_NAME, Config.BLT_WEIGHTS_DIR)
# Load Tiktoken Encoding
try:
tiktoken_encoding = tiktoken.get_encoding(Config.TIKTOKEN_ENCODING_NAME)
logging.info(f"Tiktoken encoding '{Config.TIKTOKEN_ENCODING_NAME}' loaded.")
tiktoken_available = True
except Exception as e:
logging.error(f"Failed to load Tiktoken encoding '{Config.TIKTOKEN_ENCODING_NAME}': {e}")
tiktoken_encoding = None
tiktoken_available = False
# Load Llama 3 Tokenizer
try:
# Use trust_remote_code=True if required by the specific model revision
llama_tokenizer = AutoTokenizer.from_pretrained(Config.LLAMA3_MODEL_NAME) #, trust_remote_code=True)
logging.info(f"Llama 3 tokenizer '{Config.LLAMA3_MODEL_NAME}' loaded.")
llama_available = True
except ImportError:
logging.error("Transformers or SentencePiece library not found. Llama 3 functionality disabled. Install with: pip install transformers sentencepiece")
llama_tokenizer = None
llama_available = False
except OSError as e:
logging.error(f"Error loading Llama 3 tokenizer '{Config.LLAMA3_MODEL_NAME}': {e}")
error_msg = f"Could not load Llama 3 tokenizer '{Config.LLAMA3_MODEL_NAME}'. Check model name, network, and authentication (use `huggingface-cli login` if needed)."
logging.error(error_msg)
llama_tokenizer = None
llama_available = False
except Exception as e:
logging.error(f"An unexpected error occurred loading Llama 3 tokenizer: {e}")
logging.error(traceback.format_exc())
llama_tokenizer = None
llama_available = False
# --- Main Processing Function ---
@spaces.GPU
def process_text(prompt: str) -> Tuple[
Optional[matplotlib.figure.Figure], List[Tuple[str, str]], int, # BLT
List[Tuple[str, str]], int, # Tiktoken
List[Tuple[str, str]], int, # Llama 3
str # Status
]:
"""
Processes the input prompt using ByteLatent, Tiktoken, and Llama 3,
returning visualizations, counts, and status.
"""
status_messages = ["Processing started..."]
fig = None
bl_highlighted_data, bl_count = [("Bytelatent not available.", "Error")], 0
tk_highlighted_data, tk_count = [("Tiktoken not available.", "Error")], 0
llama_highlighted_data, llama_count = [("Llama 3 not available.", "Error")], 0
# 1. Bytelatent Processing
if blt_processor.is_available:
fig, bl_highlighted_data, bl_count, bl_status = blt_processor.process(prompt, Config.BLT_MAX_BYTES_FOR_DEMO)
status_messages.append(f"Bytelatent Status:\n{bl_status}")
else:
status_messages.append("Bytelatent Status: Skipped (processor unavailable).")
# 2. Tiktoken Processing
if tiktoken_available and tiktoken_encoding:
tk_highlighted_data, tk_count, tk_status = create_tiktoken_highlight_data(prompt, tiktoken_encoding)
status_messages.append(f"Tiktoken Status:\n{tk_status}")
else:
status_messages.append("Tiktoken Status: Skipped (tokenizer unavailable).")
# 3. Llama 3 Processing
if llama_available and llama_tokenizer:
llama_highlighted_data, llama_count, llama_status = create_llama3_highlight_data(prompt, llama_tokenizer)
status_messages.append(f"Llama 3 Status:\n{llama_status}")
else:
status_messages.append("Llama 3 Status: Skipped (tokenizer unavailable).")
final_status = "\n---\n".join(status_messages)
if fig is not None and matplotlib is not None:
try:
plt.close(fig) # Close the specific figure
logging.debug("Closed Matplotlib figure.")
except Exception as close_err:
logging.warning(f"Could not close Matplotlib figure: {close_err}")
return fig, bl_highlighted_data, bl_count, tk_highlighted_data, tk_count, llama_highlighted_data, llama_count, final_status
# --- Gradio Interface ---
def create_color_map(label_prefix: str, colors: List[str], max_segments: int) -> Dict[str, str]:
"""Generates a color map dictionary for Gradio HighlightedText."""
color_cycler = itertools.cycle(colors)
color_map = {f"{label_prefix} {i+1}": next(color_cycler) for i in range(max_segments)}
color_map.update({"Error": "#FF0000", "Info": "#808080", "BL Remainder": "#AAAAAA"}) # Common labels
return color_map
bytelatent_color_map = create_color_map("BL Patch", Config.VIZ_COLORS, Config.MAX_EXPECTED_SEGMENTS)
tiktoken_color_map = create_color_map("GPT4 Tk", Config.VIZ_COLORS, Config.MAX_EXPECTED_SEGMENTS)
llama3_color_map = create_color_map("Llama3 Tk", Config.VIZ_COLORS, Config.MAX_EXPECTED_SEGMENTS)
with gr.Blocks(theme=Config.GRADIO_THEME) as iface:
gr.Markdown(f"# {Config.GRADIO_TITLE}")
gr.Markdown(Config.GRADIO_DESC)
with gr.Row():
with gr.Column(scale=1): # Input Column
prompt_input = gr.Textbox(
label="Input Prompt",
value=Config.DEFAULT_PROMPT,
placeholder="Enter text here...",
# Max length is for UI input; Bytelatent truncation happens in backend
lines=5,
info=f"Note: Entropy-based Patcher processing is limited to {Config.BLT_MAX_BYTES_FOR_DEMO} bytes for this demo."
)
submit_button = gr.Button("Generate Visualizations", variant="primary")
status_output = gr.Textbox(label="Processing Status", interactive=False, lines=10) # More space for detailed status
with gr.Column(scale=2): # Output Column
# --- Bytelatent Output Area ---
if blt_processor.is_available: # Only show BLT section if it loaded
with gr.Accordion("BLT Entropy Patcher Output (`blt_main_entropy_100m_512w`)", open=True):
with gr.Row():
bl_count_output = gr.Number(label="Patch Count", value=0, interactive=False, step=1, scale=0)
highlighted_output_bl = gr.HighlightedText(
label="BLT Patches",
color_map=bytelatent_color_map,
show_legend=False,
show_inline_category=False,
container=False
)
plot_output = gr.Plot(label="Entropy vs. Token Index")
else:
gr.Markdown(f"### Bytelatent Output (`{Config.BLT_MODEL_NAME}`)")
gr.Markdown("_(Bytelatent processor failed to load or libraries are missing. Output unavailable.)_")
# Define dummy outputs if BLT is unavailable so the `outputs` list doesn't break
highlighted_output_bl = gr.HighlightedText(value=[("BLT Unavailable", "Error")], label="BLT Patches", visible=False)
bl_count_output = gr.Number(value=0, label="Patch Count", visible=False)
plot_output = gr.Plot(label="Entropy Plot", visible=False)
# --- Tiktoken Output Area ---
if tiktoken_available: # Only show Tiktoken section if it loaded
with gr.Accordion(f"Tiktoken Output (`{Config.TIKTOKEN_ENCODING_NAME}`)", open=True):
with gr.Row():
tk_count_output = gr.Number(label="Token Count", value=0, interactive=False, step=1, scale=0)
highlighted_output_tk = gr.HighlightedText(
label="Tiktoken Segments",
color_map=tiktoken_color_map,
show_legend=False,
show_inline_category=False,
container=False
)
else:
gr.Markdown(f"### Tiktoken Output (`{Config.TIKTOKEN_ENCODING_NAME}`)")
gr.Markdown("_(Tiktoken failed to load. Output unavailable.)_")
highlighted_output_tk = gr.HighlightedText(value=[("Tiktoken Unavailable", "Error")], label="Tiktoken Segments", visible=False)
tk_count_output = gr.Number(value=0, label="Token Count", visible=False)
# --- Llama 3 Output Area ---
if llama_available: # Only show Llama section if it loaded
with gr.Accordion(f"Llama 3 Output (`{Config.LLAMA3_MODEL_NAME}`)", open=True):
with gr.Row():
llama_count_output = gr.Number(label="Token Count", value=0, interactive=False, step=1, scale=0)
highlighted_output_llama = gr.HighlightedText(
label="Llama 3 Segments",
color_map=llama3_color_map,
show_legend=False,
show_inline_category=False,
container=False
)
else:
gr.Markdown(f"### Llama 3 Output (`{Config.LLAMA3_MODEL_NAME}`)")
gr.Markdown("_(Llama 3 tokenizer failed to load. Output unavailable.)_")
highlighted_output_llama = gr.HighlightedText(value=[("Llama 3 Unavailable", "Error")], label="Llama 3 Segments", visible=False)
llama_count_output = gr.Number(value=0, label="Token Count", visible=False)
# Define the action for the button click
submit_button.click(
fn=process_text,
inputs=prompt_input,
# Ensure order matches the return values of process_text
outputs=[
# Bytelatent outputs (even if dummy/hidden)
plot_output,
highlighted_output_bl,
bl_count_output,
# Tiktoken outputs (even if dummy/hidden)
highlighted_output_tk,
tk_count_output,
# Llama 3 outputs (even if dummy/hidden)
highlighted_output_llama,
llama_count_output,
# Status output
status_output
]
)
# --- Launch the Gradio App ---
if __name__ == "__main__":
logging.info("-----------------------------------------")
logging.info("Starting Gradio App...")
logging.info(f"Bytelatent Available: {blt_processor.is_available}")
logging.info(f"Tiktoken Available: {tiktoken_available}")
logging.info(f"Llama 3 Tokenizer Available: {llama_available}")
logging.info("-----------------------------------------")
iface.launch()
|