Spaces:
Running
on
Zero
Running
on
Zero
import gradio as gr | |
import os | |
import torch | |
import numpy as np | |
import random | |
from huggingface_hub import login, HfFolder | |
from transformers import AutoTokenizer, AutoModelForSequenceClassification, AutoModelForCausalLM, TextIteratorStreamer | |
from scipy.special import softmax | |
import logging | |
import spaces | |
from threading import Thread | |
from collections.abc import Iterator | |
import csv | |
# Increase CSV field size limit | |
csv.field_size_limit(1000000) # Or an even larger value if needed | |
# Setup logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') | |
# Set a seed for reproducibility | |
seed = 42 | |
np.random.seed(seed) | |
random.seed(seed) | |
torch.manual_seed(seed) | |
if torch.cuda.is_available(): | |
torch.cuda.manual_seed_all(seed) | |
# Login to Hugging Face | |
token = os.getenv("hf_token") | |
HfFolder.save_token(token) | |
login(token) | |
# --- Quality Prediction Model Setup --- | |
model_paths = [ | |
'karths/binary_classification_train_test', | |
"karths/binary_classification_train_process", | |
"karths/binary_classification_train_infrastructure", | |
"karths/binary_classification_train_documentation", | |
"karths/binary_classification_train_design", | |
"karths/binary_classification_train_defect", | |
"karths/binary_classification_train_code", | |
"karths/binary_classification_train_build", | |
"karths/binary_classification_train_automation", | |
"karths/binary_classification_train_people", | |
"karths/binary_classification_train_architecture", | |
] | |
quality_mapping = { | |
'binary_classification_train_test': 'Test', | |
'binary_classification_train_process': 'Process', | |
'binary_classification_train_infrastructure': 'Infrastructure', | |
'binary_classification_train_documentation': 'Documentation', | |
'binary_classification_train_design': 'Design', | |
'binary_classification_train_defect': 'Defect', | |
'binary_classification_train_code': 'Code', | |
'binary_classification_train_build': 'Build', | |
'binary_classification_train_automation': 'Automation', | |
'binary_classification_train_people': 'People', | |
'binary_classification_train_architecture': 'Architecture' | |
} | |
# Pre-load models and tokenizer for quality prediction | |
tokenizer = AutoTokenizer.from_pretrained("distilroberta-base") | |
models = {path: AutoModelForSequenceClassification.from_pretrained(path) for path in model_paths} | |
def get_quality_name(model_name): | |
return quality_mapping.get(model_name.split('/')[-1], "Unknown Quality") | |
def model_prediction(model, text, device): | |
model.to(device) | |
model.eval() | |
inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512) | |
inputs = {k: v.to(device) for k, v in inputs.items()} | |
with torch.no_grad(): | |
outputs = model(**inputs) | |
logits = outputs.logits | |
probs = softmax(logits.cpu().numpy(), axis=1) | |
avg_prob = np.mean(probs[:, 1]) | |
return avg_prob | |
# --- Llama 3.2 3B Model Setup --- | |
LLAMA_MAX_MAX_NEW_TOKENS = 2048 | |
LLAMA_DEFAULT_MAX_NEW_TOKENS = 1024 | |
LLAMA_MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096")) | |
llama_device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # Explicitly define device | |
llama_model_id = "meta-llama/Llama-3.2-3B-Instruct" | |
llama_tokenizer = AutoTokenizer.from_pretrained(llama_model_id) | |
llama_model = AutoModelForCausalLM.from_pretrained( | |
llama_model_id, | |
device_map="auto", # Automatically distribute model across devices | |
torch_dtype=torch.bfloat16, | |
) | |
llama_model.eval() | |
# --- IMPORTANT: Set Pad Token --- | |
# Llama3 does *not* have a default pad token. We *must* set one. | |
# Using the EOS token as the PAD token is a common and recommended practice. | |
if llama_tokenizer.pad_token is None: | |
llama_tokenizer.pad_token = llama_tokenizer.eos_token | |
def llama_generate( | |
message: str, | |
max_new_tokens: int = LLAMA_DEFAULT_MAX_NEW_TOKENS, | |
temperature: float = 0.6, | |
top_p: float = 0.9, | |
top_k: int = 50, | |
repetition_penalty: float = 1.2, | |
) -> Iterator[str]: | |
inputs = llama_tokenizer(message, return_tensors="pt", padding=True, truncation=True, max_length=LLAMA_MAX_INPUT_TOKEN_LENGTH).to(llama_model.device) | |
#The line above was changed to add attention mask | |
if inputs.input_ids.shape[1] > LLAMA_MAX_INPUT_TOKEN_LENGTH: | |
inputs.input_ids = inputs.input_ids[:, -LLAMA_MAX_INPUT_TOKEN_LENGTH:] | |
gr.Warning(f"Trimmed input from conversation as it was longer than {LLAMA_MAX_INPUT_TOKEN_LENGTH} tokens.") | |
streamer = TextIteratorStreamer(llama_tokenizer, timeout=60.0, skip_prompt=True, skip_special_tokens=True) | |
generate_kwargs = dict( | |
inputs, # Pass the entire inputs dictionary | |
streamer=streamer, | |
max_new_tokens=max_new_tokens, | |
do_sample=True, | |
top_p=top_p, | |
top_k=top_k, | |
temperature=temperature, | |
num_beams=1, | |
repetition_penalty=repetition_penalty, | |
) | |
t = Thread(target=llama_model.generate, kwargs=generate_kwargs) | |
t.start() | |
outputs = [] | |
for text in streamer: | |
outputs.append(text) | |
yield "".join(outputs) | |
def generate_explanation(issue_text, top_qualities): | |
"""Generates an explanation using Llama 3.2 3B.""" | |
if not top_qualities: | |
return "No explanation available as no quality tags were predicted." | |
prompt = f""" | |
Given the following issue description: | |
--- | |
{issue_text} | |
--- | |
Explain why this issue might be classified under the following quality categories: {', '.join([q[0] for q in top_qualities])}. | |
Provide a concise explanation for each category, relating it back to the issue description. | |
""" | |
explanation = "" | |
try: | |
for chunk in llama_generate(prompt): | |
explanation += chunk # Accumulate generated text | |
except Exception as e: | |
logging.error(f"Error during Llama generation: {e}") | |
return "An error occurred while generating the explanation." | |
return explanation | |
def main_interface(text): | |
if not text.strip(): | |
return "<div style='color: red;'>No text provided. Please enter a valid issue description.</div>", "", "" | |
if len(text) < 30: | |
return "<div style='color: red;'>Text is less than 30 characters.</div>", "", "" | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
results = [] | |
for model_path, model in models.items(): | |
quality_name = get_quality_name(model_path) | |
avg_prob = model_prediction(model, text, device) | |
if avg_prob >= 0.95: | |
results.append((quality_name, avg_prob)) | |
logging.info(f"Model: {model_path}, Quality: {quality_name}, Average Probability: {avg_prob:.3f}") | |
if not results: | |
return "<div style='color: red;'>No recommendation. Prediction probability is below the threshold. </div>", "", "" | |
top_qualities = sorted(results, key=lambda x: x[1], reverse=True)[:3] | |
output_html = render_html_output(top_qualities) | |
# Generate explanation using the top qualities and the original input text | |
explanation = generate_explanation(text, top_qualities) | |
return output_html, "", explanation # Return explanation as the third output | |
def render_html_output(top_qualities): | |
styles = """ | |
<style> | |
.quality-container { | |
font-family: Arial, sans-serif; | |
text-align: center; | |
margin-top: 20px; | |
} | |
.quality-label, .ranking { | |
display: inline-block; | |
padding: 0.5em 1em; | |
font-size: 18px; | |
font-weight: bold; | |
color: white; | |
background-color: #007bff; | |
border-radius: 0.5rem; | |
margin-right: 10px; | |
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.2); | |
} | |
.probability { | |
display: block; | |
margin-top: 10px; | |
font-size: 16px; | |
color: #007bff; | |
} | |
</style> | |
""" | |
html_content = "" | |
ranking_labels = ['Top 1 Prediction', 'Top 2 Prediction', 'Top 3 Prediction'] | |
top_n = min(len(top_qualities), len(ranking_labels)) | |
for i in range(top_n): | |
quality, prob = top_qualities[i] | |
html_content += f""" | |
<div class="quality-container"> | |
<span class="ranking">{ranking_labels[i]}</span> | |
<span class="quality-label">{quality}</span> | |
</div> | |
""" | |
return styles + html_content | |
example_texts = [ | |
["The algorithm does not accurately distinguish between the positive and negative classes during edge cases.\n\nEnvironment: Production\nReproduction: Run the classifier on the test dataset with known edge cases."], | |
["The regression tests do not cover scenarios involving concurrent user sessions.\n\nEnvironment: Test automation suite\nReproduction: Update the test scripts to include tests for concurrent sessions."], | |
["There is frequent miscommunication between the development and QA teams regarding feature specifications.\n\nEnvironment: Inter-team meetings\nReproduction: Audit recent communication logs and meeting notes between the teams."], | |
["The service-oriented architecture does not effectively isolate failures, leading to cascading failures across services.\n\nEnvironment: Microservices architecture\nReproduction: Simulate a service failure and observe the impact on other services."] | |
] | |
interface = gr.Interface( | |
fn=main_interface, | |
inputs=gr.Textbox(lines=7, label="Issue Description", placeholder="Enter your issue text here"), | |
outputs=[ | |
gr.HTML(label="Prediction Output"), | |
gr.Textbox(label="Predictions", visible=False), | |
gr.Textbox(label="Explanation", lines=5) # Added Textbox for explanation | |
], | |
title="QualityTagger", | |
description="This tool classifies text into different quality domains such as Security, Usability, etc., and provides explanations.", | |
examples=example_texts | |
) | |
interface.launch(share=True) |