jzou1995's picture
Update app.py
fd8b571 verified
raw
history blame
17.4 kB
def google_search_company_info(company_name: str) -> str:
"""
Search for basic company information to help with NAICS classification
"""
company_info = ""
# Create search queries focused on company information
queries = [
f"what is {company_name} company",
f"{company_name} company about us",
f"{company_name} business description",
f"{company_name} company profile",
f"what does {company_name} company do"
]
try:
print(f"πŸ” Searching for information about '{company_name}'...")
for query in queries[:2]: # Limit to first 2 queries to save time
try:
# Search with each query
search_results = search(query, stop=2, pause=2)
for result_url in search_results:
try:
response = requests.get(result_url, timeout=5)
if response.status_code == 200:
# Extract text from paragraphs
from bs4 import BeautifulSoup
soup = BeautifulSoup(response.text, 'html.parser')
paragraphs = soup.find_all('p')
# Get text from first 3 substantial paragraphs
for p in paragraphs:
text = p.get_text().strip()
if len(text) > 100 and company_name.lower() in text.lower():
company_info += text + "\n\n"
if len(company_info) > 500:
break
if len(company_info) > 500:
break
except Exception as e:
print(f" ⚠️ Error fetching {result_url}: {e}")
if len(company_info) > 500:
break
except Exception as e:
print(f" ⚠️ Error with query '{query}': {e}")
continue
return company_info.strip()
except Exception as e:
print(f"❌ Error searching for company info: {str(e)}")
return ""import os
import re
import json
import requests
from typing import List, Dict, Optional, Tuple
import gradio as gr
from googlesearch import search
import google.generativeai as genai
from google.generativeai.types import HarmCategory, HarmBlockThreshold
def initialize_gemini(api_key: str):
"""Initialize the Google Gemini API with appropriate configurations"""
genai.configure(api_key=api_key)
generation_config = {
"temperature": 0.2,
"top_p": 0.8,
"top_k": 40,
"max_output_tokens": 1024,
}
safety_settings = {
HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE,
HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE,
HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
}
model = genai.GenerativeModel(
model_name="gemini-1.5-flash",
generation_config=generation_config,
safety_settings=safety_settings
)
return model
def google_search_naics(company_name: str) -> List[str]:
"""
Find potential NAICS codes for a company using multiple targeted Google searches
Uses more specific search queries to improve results
"""
naics_codes = set()
# Create multiple search queries for better results
queries = [
f"2022 NAICS code for {company_name}",
f"NAICS 2022 classification for {company_name}",
f"{company_name} business NAICS 2022 code",
f"{company_name} industry NAICS code 2022",
f"what is {company_name} company NAICS code"
]
try:
print(f"πŸ”Ž Searching Google for NAICS codes for '{company_name}'...")
for query in queries:
print(f" Query: {query}")
try:
# Search with each query, limiting to 3 results per query
search_results = search(query, stop=3, pause=2)
for result_url in search_results:
try:
response = requests.get(result_url, timeout=5)
if response.status_code == 200:
# Extract 6-digit NAICS codes
found_codes = re.findall(r'\b\d{6}\b', response.text)
naics_codes.update(found_codes)
# If we find codes, print them
if found_codes:
print(f" Found codes in {result_url}: {found_codes}")
except Exception as e:
print(f" ⚠️ Error fetching {result_url}: {e}")
except Exception as e:
print(f" ⚠️ Error with query '{query}': {e}")
continue
# Return unique codes, limited to 10 most common
return list(naics_codes)[:10]
except Exception as e:
print(f"❌ Error performing Google search: {str(e)}")
return []
def get_naics_classification(model, company_name: str, context: str, candidates: List[str]) -> dict:
"""
Use Gemini AI to determine the most appropriate NAICS code from candidates
First provides reasoning, then returns the NAICS code and explanation
"""
try:
print("πŸ€– AI is analyzing NAICS classification...")
# Get additional company information from Google
company_info = google_search_company_info(company_name)
if company_info:
print(f"πŸ“ Found additional company information:\n{company_info[:200]}...")
# Add the found information to the context
if context:
context = f"{context}\n\nAdditional information found online:\n{company_info}"
else:
context = f"Information found online:\n{company_info}"
# If we have candidate codes from Google search
if candidates:
# Create a prompt that asks for research on the candidates
prompt = f"""
You are a NAICS code classification expert. Based on the company information provided and the NAICS code candidates found from Google search, determine the most appropriate NAICS code.
Company Name: {company_name}
Context Information: {context}
NAICS Code Candidates from Google Search: {candidates}
First, research what these NAICS codes represent:
1. For each NAICS code candidate, briefly explain what industry or business activity it corresponds to.
2. Then explain which industry classification best matches this company based on the name and context provided.
3. Finally, select the single most appropriate NAICS code from the candidates, or suggest a different one if none match.
Your response should be in this format:
RESEARCH: [Brief explanation of what each NAICS code represents]
REASONING: [Your detailed reasoning about why the chosen industry classification is most appropriate for this company]
NAICS_CODE: [6-digit NAICS code]
"""
# If no candidates were found from Google search
else:
prompt = f"""
You are a NAICS code classification expert. Based on the company information provided, determine the most appropriate NAICS code.
Company Name: {company_name}
Context Information: {context}
First, analyze what industry this company likely belongs to based on its name and the provided context.
Consider standard business classifications and determine the most appropriate category.
Then provide the single most appropriate 6-digit NAICS code.
Your response should be in this format:
REASONING: [Your detailed reasoning about the company's industry classification, including what business activities it likely performs]
NAICS_CODE: [6-digit NAICS code]
"""
response = model.generate_content(prompt)
response_text = response.text.strip()
# Create result dictionary
result = {}
# Extract research if available
if "RESEARCH:" in response_text:
research_match = re.search(r'RESEARCH:(.*?)REASONING:', response_text, re.DOTALL | re.IGNORECASE)
if research_match:
result["research"] = research_match.group(1).strip()
# Extract reasoning
reasoning_match = re.search(r'REASONING:(.*?)NAICS_CODE:', response_text, re.DOTALL | re.IGNORECASE)
result["reasoning"] = reasoning_match.group(1).strip() if reasoning_match else "No reasoning provided."
# Extract NAICS code
naics_match = re.search(r'NAICS_CODE:(.*?)(\d{6})', response_text, re.DOTALL)
if naics_match:
result["naics_code"] = naics_match.group(2)
else:
# Try to find any 6-digit code in the response
code_match = re.search(r'\b(\d{6})\b', response_text)
result["naics_code"] = code_match.group(1) if code_match else "000000"
return result
except Exception as e:
print(f"❌ Error getting NAICS classification: {str(e)}")
return {
"naics_code": "000000",
"reasoning": f"Error analyzing company: {str(e)}"
}
def find_naics_code(company_name: str, context: str = "", api_key: Optional[str] = None) -> Dict:
"""
Core function to find NAICS code for a company that can be called from different interfaces
Args:
company_name: Name of the company
context: Brief description of the company (optional)
api_key: Google Gemini API key (if None, will try to get from environment variable)
Returns:
Dictionary with NAICS code, reasoning, and optional research
"""
# Get API key from environment if not provided
if not api_key:
api_key = os.environ.get('GEMINI_API_KEY')
if not api_key:
return {
"error": "No API key provided. Set GEMINI_API_KEY environment variable or pass as parameter.",
"naics_code": "000000",
"reasoning": "Error: API key missing"
}
# Initialize Gemini model
try:
model = initialize_gemini(api_key)
except Exception as e:
return {
"error": f"Failed to initialize Gemini API: {str(e)}",
"naics_code": "000000",
"reasoning": f"Error: {str(e)}"
}
# Find NAICS Code Candidates via Google search
naics_candidates = google_search_naics(company_name)
# Get classification from Gemini
if not naics_candidates:
print("No NAICS codes found from Google search.")
result = get_naics_classification(model, company_name, context, [])
else:
print(f"Found {len(naics_candidates)} NAICS candidates: {naics_candidates}")
result = get_naics_classification(model, company_name, context, naics_candidates)
# Add metadata
result["company_name"] = company_name
result["context"] = context
result["candidates"] = naics_candidates
return result
# Gradio interface function
def classify_company(company_name: str, company_description: str, api_key: str = None) -> Tuple[str, str, str]:
"""Process inputs from Gradio and return formatted results"""
if not api_key:
api_key = os.environ.get('GEMINI_API_KEY')
if not company_name:
return "Error: Company name is required", "", ""
result = find_naics_code(company_name, company_description, api_key)
# Format the NAICS code output
naics_code = f"**NAICS Code: {result['naics_code']}**"
# Format the research output
research = ""
if "research" in result and result["research"]:
research = f"## Research on NAICS Codes\n\n{result['research']}"
# Format the reasoning output
reasoning = f"## Analysis\n\n{result['reasoning']}"
return naics_code, research, reasoning
# Create the Gradio interface
def create_gradio_interface():
# Check if API key is set in environment
has_api_key = bool(os.environ.get('GEMINI_API_KEY'))
with gr.Blocks(title="NAICS Code Finder") as demo:
gr.Markdown("# NAICS Code Finder")
gr.Markdown("Enter a company name to find its appropriate NAICS code. The tool will search for information about the company and relevant NAICS codes online.")
with gr.Row():
with gr.Column():
company_name = gr.Textbox(label="Company Name", placeholder="Enter company name")
company_description = gr.Textbox(label="Additional Context (optional)", placeholder="Any additional information about the company")
# Only show API key input if not set in environment
if not has_api_key:
api_key = gr.Textbox(
label="Gemini API Key (required)",
placeholder="Enter your Google Gemini API key",
type="password"
)
else:
api_key = gr.Textbox(visible=False, value="")
submit_btn = gr.Button("Find NAICS Code", variant="primary")
with gr.Column():
status_output = gr.Markdown(label="Status")
naics_output = gr.Markdown(label="NAICS Code")
with gr.Accordion("Company Information", open=False):
company_info_output = gr.Markdown()
with gr.Accordion("NAICS Codes Research", open=False):
research_output = gr.Markdown()
with gr.Accordion("Classification Reasoning", open=True):
reasoning_output = gr.Markdown()
# Functions for the interface
def process_company(company_name, company_description, api_key):
if not company_name:
return "Please enter a company name", "", "", "", ""
# Use API key from input or environment
key_to_use = api_key if api_key else os.environ.get('GEMINI_API_KEY')
if not key_to_use:
return "No API key provided. Please enter your Gemini API key.", "", "", "", ""
status_md = "πŸ” Searching for company information...\n\n"
yield status_md, "", "", "", ""
# Get company info first
company_info = google_search_company_info(company_name)
if company_info:
company_info_md = f"## Information found about {company_name}\n\n{company_info}"
status_md += "βœ… Found company information\n\n"
else:
company_info_md = f"No detailed information found for {company_name}"
status_md += "⚠️ No company information found\n\n"
yield status_md, "", company_info_md, "", ""
# Get NAICS candidates
status_md += "πŸ” Searching for NAICS codes...\n\n"
yield status_md, "", company_info_md, "", ""
# Run the core functionality
result = find_naics_code(company_name, company_description, key_to_use)
if "candidates" in result and result["candidates"]:
status_md += f"βœ… Found {len(result['candidates'])} potential NAICS codes\n\n"
else:
status_md += "⚠️ No specific NAICS codes found in search results\n\n"
status_md += "πŸ€– Analyzing classification...\n\n"
yield status_md, "", company_info_md, "", ""
# Format the NAICS code output
naics_code_md = f"## NAICS Code: {result['naics_code']}"
# Format the research output
research_md = ""
if "research" in result and result["research"]:
research_md = f"## Research on NAICS Codes\n\n{result['research']}"
# Format the reasoning output
reasoning_md = f"## Analysis\n\n{result['reasoning']}"
status_md += "βœ… Classification complete!"
return status_md, naics_code_md, company_info_md, research_md, reasoning_md
submit_btn.click(
process_company,
inputs=[company_name, company_description, api_key],
outputs=[status_output, naics_output, company_info_output, research_output, reasoning_output]
)
gr.Examples(
[
["Apple Inc", "Tech company"],
["Walmart", "Retail store"],
["Goldman Sachs", "Investment bank"],
["Ford Motor Company", "Automobile manufacturer"]
],
inputs=[company_name, company_description]
)
return demo
# Create and launch the interface
demo = create_gradio_interface()
# For Spaces deployment
if __name__ == "__main__":
demo.launch()