ginipick's picture
Update app.py
10e03de verified
raw
history blame
18.3 kB
import os
import gradio as gr
from gradio import ChatMessage
from typing import Iterator, List, Dict, Tuple, Any
import google.generativeai as genai
from huggingface_hub import HfApi
import requests
import re
import traceback
# HuggingFace API key for space analysis
HF_TOKEN = os.getenv("HF_TOKEN")
hf_api = HfApi(token=HF_TOKEN)
# Gemini 2.0 Flash Thinking model API key and client (for LLM)
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
genai.configure(api_key=GEMINI_API_KEY)
model = genai.GenerativeModel("gemini-2.0-flash-thinking-exp-01-21")
def get_headers():
if not HF_TOKEN:
raise ValueError("Hugging Face token not found in environment variables")
return {"Authorization": f"Bearer {HF_TOKEN}"}
def get_file_content(space_id: str, file_path: str) -> str:
file_url = f"https://huggingface.co/spaces/{space_id}/raw/main/{file_path}"
try:
response = requests.get(file_url, headers=get_headers())
if response.status_code == 200:
return response.text
else:
return f"File not found or inaccessible: {file_path}"
except requests.RequestException:
return f"Error fetching content for file: {file_path}"
def get_space_structure(space_id: str) -> Dict:
try:
files = hf_api.list_repo_files(repo_id=space_id, repo_type="space")
tree = {"type": "directory", "path": "", "name": space_id, "children": []}
for file in files:
path_parts = file.split('/')
current = tree
for i, part in enumerate(path_parts):
if i == len(path_parts) - 1: # file
current["children"].append({"type": "file", "path": file, "name": part})
else:
found = False
for child in current["children"]:
if child["type"] == "directory" and child["name"] == part:
current = child
found = True
break
if not found:
new_dir = {"type": "directory", "path": '/'.join(path_parts[:i+1]), "name": part, "children": []}
current["children"].append(new_dir)
current = new_dir
return tree
except Exception as e:
print(f"Error in get_space_structure: {str(e)}")
return {"error": f"API request error: {str(e)}"}
def format_tree_structure(tree_data: Dict, indent: str = "") -> str:
if "error" in tree_data:
return tree_data["error"]
formatted = f"{indent}{'📁' if tree_data.get('type') == 'directory' else '📄'} {tree_data.get('name', 'Unknown')}\n"
if tree_data.get("type") == "directory":
for child in sorted(tree_data.get("children", []), key=lambda x: (x.get("type", "") != "directory", x.get("name", ""))):
formatted += format_tree_structure(child, indent + " ")
return formatted
def adjust_lines_for_code(code_content: str, min_lines: int = 10, max_lines: int = 100) -> int:
num_lines = len(code_content.split('\n'))
return min(max(num_lines, min_lines), max_lines)
def analyze_space(url: str, progress=gr.Progress()):
try:
space_id = url.split('spaces/')[-1]
if not re.match(r'^[\w.-]+/[\w.-]+$', space_id):
raise ValueError(f"Invalid Space ID format: {space_id}")
progress(0.1, desc="Analyzing file structure...")
tree_structure = get_space_structure(space_id)
if "error" in tree_structure:
raise ValueError(tree_structure["error"])
tree_view = format_tree_structure(tree_structure)
progress(0.3, desc="Fetching app.py content...")
app_content = get_file_content(space_id, "app.py")
progress(0.5, desc="Summarizing code...")
summary = summarize_code(app_content)
progress(0.7, desc="Analyzing code...")
analysis = analyze_code(app_content)
progress(0.9, desc="Generating usage instructions...")
usage = explain_usage(app_content)
lines_for_app_py = adjust_lines_for_code(app_content)
progress(1.0, desc="Complete")
return app_content, tree_view, tree_structure, space_id, summary, analysis, usage, lines_for_app_py
except Exception as e:
print(f"Error in analyze_space: {str(e)}")
print(traceback.format_exc())
return f"An error occurred: {str(e)}", "", None, "", "", "", "", 10
# --------------------------------------------------
# Gemini 2.0 Flash Thinking model (LLM) functions
# --------------------------------------------------
from gradio import ChatMessage
def format_chat_history(messages: List[ChatMessage]) -> List[Dict]:
"""
Convert a list of ChatMessages to a format that the Gemini model can understand.
(Skip messages with 'Thinking' metadata)
"""
formatted = []
for m in messages:
if hasattr(m, "metadata") and m.metadata: # Skip 'Thinking' messages
continue
role = "assistant" if m.role == "assistant" else "user"
formatted.append({"role": role, "parts": [m.content or ""]})
return formatted
def gemini_chat_completion(system_message: str, user_message: str, max_tokens: int = 200, temperature: float = 0.7) -> str:
init_msgs = [
ChatMessage(role="system", content=system_message),
ChatMessage(role="user", content=user_message)
]
chat_history = format_chat_history(init_msgs)
chat = model.start_chat(history=chat_history)
final = ""
try:
for chunk in chat.send_message(user_message, stream=True):
parts = chunk.candidates[0].content.parts
if len(parts) == 2:
final += parts[1].text
else:
final += parts[0].text
return final.strip()
except Exception as e:
return f"Error calling LLM: {str(e)}"
def summarize_code(app_content: str):
system_msg = "You are an AI assistant that analyzes and summarizes Python code. Please summarize the provided code in no more than 3 lines."
user_msg = f"Please summarize the following Python code in no more than 3 lines:\n\n{app_content}"
try:
return gemini_chat_completion(system_msg, user_msg, max_tokens=200, temperature=0.7)
except Exception as e:
return f"Error generating summary: {str(e)}"
def analyze_code(app_content: str):
system_msg = (
"You are an AI assistant that analyzes Python code. Please analyze the provided code in terms of its service utility and application with respect to the following aspects:\n"
"A. Background and Necessity\n"
"B. Functional Utility and Value\n"
"C. Key Features\n"
"D. Target Audience\n"
"E. Expected Impact\n"
"Please also compare with existing and similar projects. Output in Markdown format."
)
user_msg = f"Please analyze the following Python code:\n\n{app_content}"
try:
return gemini_chat_completion(system_msg, user_msg, max_tokens=1000, temperature=0.7)
except Exception as e:
return f"Error generating analysis: {str(e)}"
def explain_usage(app_content: str):
system_msg = (
"You are an AI assistant that analyzes Python code to explain its usage. Based on the provided code, please describe how to use it as if you were viewing the interface. Output in Markdown format."
)
user_msg = f"Please explain how to use the following Python code:\n\n{app_content}"
try:
return gemini_chat_completion(system_msg, user_msg, max_tokens=800, temperature=0.7)
except Exception as e:
return f"Error generating usage instructions: {str(e)}"
def stream_gemini_response(user_message: str, conversation_state: List[ChatMessage]) -> Iterator[List[ChatMessage]]:
"""
Send a streaming request to Gemini.
If the user_message is empty, append a minimal guidance message from the assistant and yield.
"""
if not user_message.strip():
conversation_state.append(
ChatMessage(
role="assistant",
content="No input provided. Please enter a question!"
)
)
yield conversation_state
return
print(f"\n=== New Request ===\nUser message: {user_message}")
chat_history = format_chat_history(conversation_state)
chat = model.start_chat(history=chat_history)
response = chat.send_message(user_message, stream=True)
thought_buffer = ""
response_buffer = ""
thinking_complete = False
conversation_state.append(
ChatMessage(
role="assistant",
content="",
metadata={"title": "⚙️ Thinking: *The thoughts produced by the model are experimental"}
)
)
try:
for chunk in response:
parts = chunk.candidates[0].content.parts
current_chunk = parts[0].text
if len(parts) == 2 and not thinking_complete:
thought_buffer += current_chunk
print(f"\n=== Complete Thought ===\n{thought_buffer}")
conversation_state[-1] = ChatMessage(
role="assistant",
content=thought_buffer,
metadata={"title": "⚙️ Thinking: *The thoughts produced by the model are experimental"}
)
yield conversation_state
response_buffer = parts[1].text
print(f"\n=== Starting Response ===\n{response_buffer}")
conversation_state.append(
ChatMessage(role="assistant", content=response_buffer)
)
thinking_complete = True
elif thinking_complete:
response_buffer += current_chunk
print(f"\n=== Response Chunk ===\n{current_chunk}")
conversation_state[-1] = ChatMessage(
role="assistant",
content=response_buffer
)
else:
thought_buffer += current_chunk
print(f"\n=== Thinking Chunk ===\n{current_chunk}")
conversation_state[-1] = ChatMessage(
role="assistant",
content=thought_buffer,
metadata={"title": "⚙️ Thinking: *The thoughts produced by the model are experimental"}
)
yield conversation_state
print(f"\n=== Final Response ===\n{response_buffer}")
except Exception as e:
print(f"\n=== Error ===\n{str(e)}")
conversation_state.append(
ChatMessage(
role="assistant",
content=f"I apologize, but encountered an error: {str(e)}"
)
)
yield conversation_state
def convert_for_messages_format(messages: List[ChatMessage]) -> List[Dict[str, str]]:
"""
Convert a list of ChatMessages to the format [{"role": "assistant"/"user", "content": "..."}].
"""
output = []
for msg in messages:
output.append({"role": msg.role, "content": msg.content})
return output
def user_submit_message(msg: str, conversation_state: List[ChatMessage]):
conversation_state.append(ChatMessage(role="user", content=msg))
return "", conversation_state
def respond_wrapper(message: str, conversation_state: List[ChatMessage], max_tokens, temperature, top_p):
# Get the last user message
last_user_message = ""
for msg in reversed(conversation_state):
if msg.role == "user":
last_user_message = msg.content
break
# Generate response based on the last user message
for updated_messages in stream_gemini_response(last_user_message, conversation_state):
yield "", convert_for_messages_format(updated_messages)
def create_ui():
try:
css = """
body {
background: linear-gradient(to right, #f0f2f5, #ffffff);
font-family: 'Segoe UI', sans-serif;
}
.gradio-container {
border-radius: 15px;
box-shadow: 0 4px 6px rgba(0,0,0,0.1);
}
footer {visibility: hidden;}
.tabitem-header {
font-weight: bold;
color: #3b3b3b;
}
.gradio-markdown h1 {
color: #ff6f61;
}
"""
with gr.Blocks(css=css) as demo:
gr.Markdown("# 🚀 MOUSE: Space Research Thinking")
with gr.Tabs():
with gr.TabItem("🔍 Analysis"):
with gr.Row():
with gr.Column():
url_input = gr.Textbox(label="🔗 HuggingFace Space URL", placeholder="e.g.: https://huggingface.co/spaces/username/space-name")
analyze_button = gr.Button("Start Analysis 🚀", variant="primary")
summary_output = gr.Markdown(label="📝 Code Summary")
analysis_output = gr.Markdown(label="🔍 Code Analysis")
usage_output = gr.Markdown(label="📚 Usage Instructions")
tree_view_output = gr.Textbox(label="📁 File Structure", lines=20)
with gr.Column():
code_tabs = gr.Tabs()
with code_tabs:
with gr.TabItem("app.py"):
app_py_content = gr.Code(
language="python",
label="app.py",
lines=50
)
with gr.TabItem("requirements.txt"):
requirements_content = gr.Textbox(
label="requirements.txt",
lines=50
)
with gr.TabItem("🤖 AI Code Chat"):
gr.Markdown("## 💬 Enter an example or paste your source code and ask your question!")
chatbot = gr.Chatbot(
label="Chat Window",
height=400,
type="messages"
)
msg = gr.Textbox(
label="Enter your message",
placeholder="Type your message here..."
)
max_tokens = gr.Slider(
minimum=1, maximum=8000,
value=4000, label="Max Tokens",
visible=False
)
temperature = gr.Slider(
minimum=0, maximum=1,
value=0.7, label="Temperature",
visible=False
)
top_p = gr.Slider(
minimum=0, maximum=1,
value=0.9, label="Top P",
visible=False
)
examples = [
["Explain detailed usage instructions in over 4000 tokens"],
["Generate 20 FAQs in over 4000 tokens"],
["Describe technical differentiators and strengths in over 4000 tokens"],
["Generate innovative ideas for patent applications in over 4000 tokens"],
["Write an academic paper in over 4000 tokens"],
["Continue your answer"]
]
gr.Examples(examples, inputs=msg)
conversation_state = gr.State([])
msg.submit(
user_submit_message,
inputs=[msg, conversation_state],
outputs=[msg, conversation_state],
queue=False
).then(
respond_wrapper,
inputs=[msg, conversation_state, max_tokens, temperature, top_p],
outputs=[msg, chatbot],
)
with gr.TabItem("⭐ Recommended Best"):
gr.Markdown(
"Discover recommended HuggingFace Spaces [here](https://huggingface.co/spaces/openfree/Korean-Leaderboard)."
)
# Analysis tab logic
space_id_state = gr.State()
tree_structure_state = gr.State()
app_py_content_lines = gr.State()
analyze_button.click(
analyze_space,
inputs=[url_input],
outputs=[
app_py_content,
tree_view_output,
tree_structure_state,
space_id_state,
summary_output,
analysis_output,
usage_output,
app_py_content_lines
]
).then(
lambda space_id: get_file_content(space_id, "requirements.txt"),
inputs=[space_id_state],
outputs=[requirements_content]
).then(
lambda lines: gr.update(lines=lines),
inputs=[app_py_content_lines],
outputs=[app_py_content]
)
return demo
except Exception as e:
print(f"Error in create_ui: {str(e)}")
print(traceback.format_exc())
raise
if __name__ == "__main__":
try:
print("Starting HuggingFace Space Analyzer...")
demo = create_ui()
print("UI created successfully.")
print("Configuring Gradio queue...")
demo.queue()
print("Gradio queue configured.")
print("Launching Gradio app...")
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
debug=True,
show_api=False
)
print("Gradio app launched successfully.")
except Exception as e:
print(f"Error in main: {str(e)}")
print("Detailed error information:")
print(traceback.format_exc())
raise