Spaces:
Sleeping
Sleeping
import streamlit as st | |
import requests | |
import pandas as pd | |
import re | |
from appStore.prep_data import process_giz_worldwide, remove_duplicates, get_max_end_year, extract_year | |
from appStore.prep_utils import create_documents, get_client | |
from appStore.embed import hybrid_embed_chunks | |
from appStore.search import hybrid_search | |
from appStore.region_utils import load_region_data, get_country_name, get_regions | |
#from appStore.tfidf_extraction import extract_top_keywords # TF-IDF part commented out | |
from torch import cuda | |
import json | |
from datetime import datetime | |
st.set_page_config(page_title="SEARCH IATI", layout='wide') | |
########################################### | |
# Helper functions for data processing | |
########################################### | |
# New helper: Truncate a text to a given (approximate) token count. | |
def truncate_to_tokens(text, max_tokens): | |
tokens = text.split() # simple approximation | |
if len(tokens) > max_tokens: | |
return " ".join(tokens[:max_tokens]) | |
return text | |
# Build a context string for a single result using title, objectives and description. | |
def build_context_for_result(res): | |
metadata = res.payload.get('metadata', {}) | |
# Compute title if not already present. | |
title = metadata.get("title", compute_title(metadata)) | |
objective = metadata.get("objective", "") | |
# Use description.en if available; otherwise use description.de. | |
desc_en = metadata.get("description.en", "").strip() | |
desc_de = metadata.get("description.de", "").strip() | |
description = desc_en if desc_en != "" else desc_de | |
return f"{title}\n{objective}\n{description}" | |
# Updated highlight: return HTML that makes the matched query red and bold. | |
def highlight_query(text, query): | |
pattern = re.compile(re.escape(query), re.IGNORECASE) | |
return pattern.sub(lambda m: f"<span style='color:red; font-weight:bold;'>{m.group(0)}</span>", text) | |
# Helper: Format project id (e.g., "201940485" -> "2019.4048.5") | |
def format_project_id(pid): | |
s = str(pid) | |
if len(s) > 5: | |
return s[:4] + "." + s[4:-1] + "." + s[-1] | |
return s | |
# Helper: Compute title from metadata using name.en (or name.de if empty) | |
def compute_title(metadata): | |
name_en = metadata.get("name.en", "").strip() | |
name_de = metadata.get("name.de", "").strip() | |
base = name_en if name_en else name_de | |
pid = metadata.get("id", "") | |
if base and pid: | |
return f"{base} [{format_project_id(pid)}]" | |
return base or "No Title" | |
# Load CRS lookup CSV and define a lookup function. | |
crs_lookup = pd.read_csv("docStore/crs5_codes.csv") # Assumes columns: "code" and "new_crs_value" | |
def lookup_crs_value(crs_key): | |
row = crs_lookup[crs_lookup["code"] == crs_key] | |
if not row.empty: | |
# Convert to integer (drop decimals) and then to string. | |
try: | |
return str(int(float(row.iloc[0]["new_crs_value"]))) | |
except: | |
return str(row.iloc[0]["new_crs_value"]) | |
return "" | |
########################################### | |
# RAG Answer function (Change 1 & 2 & 3) | |
########################################### | |
# ToDo move functions to utils and model specifications to config file! | |
# Configuration for the dedicated model | |
# https://nwea79x4q1clc89l.eu-west-1.aws.endpoints.huggingface.cloud # 12k token | |
DEDICATED_MODEL = "meta-llama/Llama-3.1-8B-Instruct" | |
DEDICATED_ENDPOINT = "https://qu2d8m6dmsollhly.us-east-1.aws.endpoints.huggingface.cloud" | |
# Write access token from the settings | |
WRITE_ACCESS_TOKEN = st.secrets["Llama_3_1"] | |
def get_rag_answer(query, top_results): | |
# Build context from each top result using title, objective, and description. | |
context = "\n\n".join([build_context_for_result(res) for res in top_results]) | |
# Truncate context to 11500 tokens (approximation) | |
context = truncate_to_tokens(context, 2960) | |
# Improved prompt with role instruction and formatting instruction. | |
prompt = ( | |
"You are a project portfolio adviser at the development cooperation GIZ. " | |
"Using the following context, answer the question in English precisely. " | |
"Ensure that any project title mentioned in your answer is wrapped in ** (markdown bold). " | |
"Only output the final answer below, without repeating the context or question.\n\n" | |
f"Context:\n{context}\n\n" | |
f"Question: {query}\n\n" | |
"Answer:" | |
) | |
headers = {"Authorization": f"Bearer {WRITE_ACCESS_TOKEN}"} | |
payload = { | |
"inputs": prompt, | |
"parameters": {"max_new_tokens": 220} | |
} | |
response = requests.post(DEDICATED_ENDPOINT, headers=headers, json=payload) | |
if response.status_code == 200: | |
result = response.json() | |
answer = result[0]["generated_text"] | |
if "Answer:" in answer: | |
answer = answer.split("Answer:")[-1].strip() | |
return answer | |
else: | |
return f"Error in generating answer: {response.text}" | |
########################################### | |
# CRS Options using lookup (Change 7) | |
########################################### | |
def get_crs_options(_client, collection_name): | |
results = hybrid_search(_client, "", collection_name) | |
all_results = results[0] + results[1] | |
crs_set = set() | |
for res in all_results: | |
metadata = res.payload.get('metadata', {}) | |
crs_key = metadata.get("crs_key", "").strip() | |
if crs_key: | |
new_value = lookup_crs_value(crs_key) | |
crs_combined = f"{crs_key}: {new_value}" | |
crs_set.add(crs_combined) | |
return sorted(crs_set) | |
def load_project_data(): | |
# Load your full project DataFrame using your processing function. | |
return process_giz_worldwide() | |
# Load the project data (cached) | |
project_data = load_project_data() | |
# Convert the 'total_project' column to numeric (dropping errors) and compute min and max. | |
# The budget is assumed to be in euros, so we convert to million euros. | |
budget_series = pd.to_numeric(project_data['total_project'], errors='coerce').dropna() | |
min_budget_val = float(budget_series.min() / 1e6) | |
max_budget_val = float(budget_series.max() / 1e6) | |
########################################### | |
# Revised filter_results with budget filtering (Change 7 & 9) | |
########################################### | |
def parse_budget(value): | |
try: | |
return float(value) | |
except: | |
return 0.0 | |
def filter_results(results, country_filter, region_filter, end_year_range, crs_filter, budget_filter): | |
filtered = [] | |
for r in results: | |
metadata = r.payload.get('metadata', {}) | |
countries = metadata.get('countries', "[]") | |
year_str = metadata.get('end_year') | |
if year_str: | |
extracted = extract_year(year_str) | |
try: | |
end_year_val = int(extracted) if extracted != "Unknown" else 0 | |
except ValueError: | |
end_year_val = 0 | |
else: | |
end_year_val = 0 | |
try: | |
c_list = json.loads(countries.replace("'", '"')) | |
c_list = [code.upper() for code in c_list if len(code) == 2] | |
except json.JSONDecodeError: | |
c_list = [] | |
selected_iso_code = country_name_mapping.get(country_filter, None) | |
if region_filter != "All/Not allocated": | |
countries_in_region = [code for code in c_list if iso_code_to_sub_region.get(code) == region_filter] | |
else: | |
countries_in_region = c_list | |
crs_key = metadata.get("crs_key", "").strip() | |
# Use lookup value instead of stored crs_value. | |
new_crs_value = lookup_crs_value(crs_key) | |
new_crs_value = lookup_crs_value(crs_key).replace('.0', '') | |
crs_combined = f"{crs_key}: {new_crs_value}" if crs_key else "" | |
# Enforce CRS filter only if specified. | |
if crs_filter != "All/Not allocated" and crs_combined: | |
if crs_filter != crs_combined: | |
continue | |
# Budget filtering: parse total_project value. | |
budget_value = parse_budget(metadata.get('total_project', "0")) | |
# Only keep results with budget >= budget_filter (in million euros, so multiply by 1e6) | |
if budget_value < (budget_filter * 1e6): | |
continue | |
year_ok = True if end_year_val == 0 else (end_year_range[0] <= end_year_val <= end_year_range[1]) | |
if ((country_filter == "All/Not allocated" or (selected_iso_code and selected_iso_code in c_list)) | |
and (region_filter == "All/Not allocated" or countries_in_region) | |
and year_ok): | |
filtered.append(r) | |
return filtered | |
########################################### | |
# Get device | |
########################################### | |
device = 'cuda' if cuda.is_available() else 'cpu' | |
########################################### | |
# App heading and About button (Change 5 & 6) | |
########################################### | |
col_title, col_about = st.columns([8,2]) | |
with col_title: | |
st.markdown("<h1 style='text-align:center;'>GIZ Project Database (PROTOTYPE)</h1>", unsafe_allow_html=True) | |
with col_about: | |
with st.expander("ℹ️ About"): | |
st.markdown( | |
""" | |
**This app is a prototype for testing purposes using publicly available project data from the German International Cooperation Society (GIZ) as of 23rd February 2025. | |
**Please do NOT enter sensitive or personal information.** | |
Note: The generated answers are AI-generated and may be wrong or misleading. | |
""", unsafe_allow_html=True) | |
########################################### | |
# Query input and budget slider (Change 9) | |
########################################### | |
var = st.text_input("Enter Question") | |
########################################### | |
# Load region lookup CSV | |
########################################### | |
region_lookup_path = "docStore/regions_lookup.csv" | |
region_df = load_region_data(region_lookup_path) | |
########################################### | |
# Create the embeddings collection and save | |
########################################### | |
# the steps below need to be performed only once and then commented out any unnecssary compute over-run | |
##### First we process and create the chunks for relvant data source | |
#chunks = process_giz_worldwide() | |
##### Convert to langchain documents | |
#temp_doc = create_documents(chunks,'chunks') | |
##### Embed and store docs, check if collection exist then you need to update the collection | |
collection_name = "giz_worldwide" | |
#hybrid_embed_chunks(docs=temp_doc, collection_name=collection_name, del_if_exists=True) | |
########################################### | |
# Hybrid Search and Filters Setup | |
########################################### | |
client = get_client() | |
print(client.get_collections()) | |
max_end_year = get_max_end_year(client, collection_name) | |
_, unique_sub_regions = get_regions(region_df) | |
def get_country_name_and_region_mapping(_client, collection_name, region_df): | |
results = hybrid_search(_client, "", collection_name) | |
country_set = set() | |
for res in results[0] + results[1]: | |
countries = res.payload.get('metadata', {}).get('countries', "[]") | |
try: | |
country_list = json.loads(countries.replace("'", '"')) | |
two_digit_codes = [code.upper() for code in country_list if len(code) == 2] | |
country_set.update(two_digit_codes) | |
except json.JSONDecodeError: | |
pass | |
country_name_to_code = {} | |
iso_code_to_sub_region = {} | |
for code in country_set: | |
name = get_country_name(code, region_df) | |
sub_region_row = region_df[region_df['alpha-2'] == code] | |
sub_region = sub_region_row['sub-region'].values[0] if not sub_region_row.empty else "Not allocated" | |
country_name_to_code[name] = code | |
iso_code_to_sub_region[code] = sub_region | |
return country_name_to_code, iso_code_to_sub_region | |
client = get_client() | |
country_name_mapping, iso_code_to_sub_region = get_country_name_and_region_mapping(client, collection_name, region_df) | |
unique_country_names = sorted(country_name_mapping.keys()) | |
# Layout filter columns | |
col1, col2, col3, col4, col5 = st.columns([1, 1, 1, 1, 1]) | |
with col1: | |
region_filter = st.selectbox("Region", ["All/Not allocated"] + sorted(unique_sub_regions)) | |
if region_filter == "All/Not allocated": | |
filtered_country_names = unique_country_names | |
else: | |
filtered_country_names = [name for name, code in country_name_mapping.items() if iso_code_to_sub_region.get(code) == region_filter] | |
with col2: | |
country_filter = st.selectbox("Country", ["All/Not allocated"] + filtered_country_names) | |
with col3: | |
current_year = datetime.now().year | |
default_start_year = current_year - 4 | |
end_year_range = st.slider("Project End Year", min_value=2010, max_value=max_end_year, value=(default_start_year, max_end_year)) | |
with col4: | |
crs_options = ["All/Not allocated"] + get_crs_options(client, collection_name) | |
crs_filter = st.selectbox("CRS", crs_options) | |
with col5: | |
# Now use these values as the slider range: | |
min_budget = st.slider( | |
"Minimum Project Budget (Million €)", | |
min_value=min_budget_val, | |
max_value=max_budget_val, | |
value=min_budget_val) | |
# Checkbox for exact matches | |
show_exact_matches = st.checkbox("Show only exact matches", value=False) | |
if not var.strip(): | |
st.info("Please enter a question to see results.") | |
else: | |
########################################### | |
# Run the search and apply filters | |
########################################### | |
results = hybrid_search(client, var, collection_name, limit=500) | |
semantic_all = results[0] | |
lexical_all = results[1] | |
semantic_all = [r for r in semantic_all if len(r.payload["page_content"]) >= 5] | |
lexical_all = [r for r in lexical_all if len(r.payload["page_content"]) >= 5] | |
semantic_thresholded = [r for r in semantic_all if r.score >= 0.0] | |
# Pass the budget filter (min_budget) into filter_results | |
filtered_semantic = filter_results(semantic_thresholded, country_filter, region_filter, end_year_range, crs_filter, min_budget) | |
filtered_lexical = filter_results(lexical_all, country_filter, region_filter, end_year_range, crs_filter, min_budget) | |
filtered_semantic_no_dupe = remove_duplicates(filtered_semantic) | |
filtered_lexical_no_dupe = remove_duplicates(filtered_lexical) | |
def format_currency(value): | |
try: | |
return f"€{int(float(value)):,}" | |
except (ValueError, TypeError): | |
return value | |
########################################### | |
# Display Results (Lexical and Semantic) | |
########################################### | |
# --- Lexical Results Branch --- | |
if show_exact_matches: | |
st.write("Showing **Top 15 Lexical Search results**") | |
query_substring = var.strip().lower() | |
lexical_substring_filtered = [r for r in lexical_all if query_substring in r.payload["page_content"].lower()] | |
filtered_lexical = filter_results(lexical_substring_filtered, country_filter, region_filter, end_year_range, crs_filter, min_budget) | |
filtered_lexical_no_dupe = remove_duplicates(filtered_lexical) | |
if not filtered_lexical_no_dupe: | |
st.write('No exact matches, consider unchecking "Show only exact matches"') | |
else: | |
top_results = filtered_lexical_no_dupe[:10] | |
rag_answer = get_rag_answer(var, top_results) | |
# Use the query as heading; increase size and center it. | |
st.markdown(f"<h2 style='text-align:center; font-size:1.5em;'>{var}</h2>", unsafe_allow_html=True) | |
st.write(rag_answer) | |
st.divider() | |
for res in top_results: | |
metadata = res.payload.get('metadata', {}) | |
if "title" not in metadata: | |
metadata["title"] = compute_title(metadata) | |
# Highlight query matches in title (rendered with HTML) | |
title_html = highlight_query(metadata["title"], var) if var.strip() else metadata["title"] | |
st.markdown(f"#### {title_html}", unsafe_allow_html=True) | |
# Build snippet from objectives and description | |
objective = metadata.get("objective", "None") | |
desc_en = metadata.get("description.en", "").strip() | |
desc_de = metadata.get("description.de", "").strip() | |
description = desc_en if desc_en != "" else desc_de | |
full_snippet = f"{description}" | |
words = full_snippet.split() | |
preview_word_count = 90 | |
preview_text = " ".join(words[:preview_word_count]) | |
remainder_text = " ".join(words[preview_word_count:]) | |
# Create two columns: left for description, right for additional details. | |
col_left, col_right = st.columns(2) | |
with col_left: | |
st.markdown(highlight_query(preview_text, var), unsafe_allow_html=True) | |
if remainder_text: | |
with st.expander("Show more"): | |
st.markdown(highlight_query(remainder_text, var), unsafe_allow_html=True) | |
with col_right: | |
# Format additional text with line breaks using <br> | |
start_year = metadata.get('start_year', None) | |
end_year = metadata.get('end_year', None) | |
start_year_str = extract_year(start_year) if start_year else "Unknown" | |
end_year_str = extract_year(end_year) if end_year else "Unknown" | |
total_project = metadata.get('total_project', "Unknown") | |
total_volume = metadata.get('total_volume', "Unknown") | |
formatted_project_budget = format_currency(total_project) | |
formatted_total_volume = format_currency(total_volume) | |
try: | |
c_list = json.loads(metadata.get('countries', "[]").replace("'", '"')) | |
except json.JSONDecodeError: | |
c_list = [] | |
matched_countries = [] | |
for code in c_list: | |
if len(code) == 2: | |
resolved_name = get_country_name(code.upper(), region_df) | |
if resolved_name.upper() != code.upper(): | |
matched_countries.append(resolved_name) | |
crs_key = metadata.get("crs_key", "").strip() | |
new_crs_value = lookup_crs_value(crs_key) | |
new_crs_value = lookup_crs_value(crs_key).replace('.0', '') | |
crs_combined = f"{crs_key}: {new_crs_value}" if crs_key else "Unknown" | |
client_name = metadata.get('client', 'Unknown Client') | |
contact = metadata.get("contact", "").strip() | |
objective_highlighted = highlight_query(objective, var) if var.strip() else objective | |
additional_text = ( | |
f"**Objective:** {objective_highlighted}<br>" | |
f"**Commissioned by:** {client_name}<br>" | |
f"**Projekt duration:** {start_year_str}-{end_year_str}<br>" | |
f"**Budget:** Project: {formatted_project_budget}, Total volume: {formatted_total_volume}<br>" | |
f"**Country:** {', '.join(matched_countries)}<br>" | |
f"**Sector:** {crs_combined}" | |
) | |
#if contact and contact.lower() != "[email protected]": | |
# additional_text += f"<br>**Contact:** {contact}" | |
st.divider() | |
# --- Semantic Results Branch --- | |
else: | |
if not filtered_semantic_no_dupe: | |
st.write("No relevant results found.") | |
else: | |
top_results = filtered_semantic_no_dupe[:10] | |
rag_answer = get_rag_answer(var, top_results) | |
st.markdown(f"<h2 style='text-align:center; font-size:2.5em;'>{var}</h2>", unsafe_allow_html=True) | |
st.write(rag_answer) | |
st.divider() | |
st.write("Showing **Top 15 Semantic Search results**") | |
for res in top_results: | |
metadata = res.payload.get('metadata', {}) | |
if "title" not in metadata: | |
metadata["title"] = compute_title(metadata) | |
st.markdown(f"#### {metadata['title']}") | |
objective = metadata.get("objective", "") | |
desc_en = metadata.get("description.en", "").strip() | |
desc_de = metadata.get("description.de", "").strip() | |
description = desc_en if desc_en != "" else desc_de | |
full_snippet = f"{description}" | |
words = full_snippet.split() | |
preview_word_count = 90 | |
preview_text = " ".join(words[:preview_word_count]) | |
remainder_text = " ".join(words[preview_word_count:]) | |
# Create two columns: left for full description (with expander) and right for additional details. | |
col_left, col_right = st.columns(2) | |
with col_left: | |
st.markdown(highlight_query(preview_text, var), unsafe_allow_html=True) | |
if remainder_text: | |
with st.expander("Show more"): | |
st.markdown(highlight_query(remainder_text, var), unsafe_allow_html=True) | |
with col_right: | |
start_year = metadata.get('start_year', None) | |
end_year = metadata.get('end_year', None) | |
start_year_str = extract_year(start_year) if start_year else "Unknown" | |
end_year_str = extract_year(end_year) if end_year else "Unknown" | |
total_project = metadata.get('total_project', "Unknown") | |
total_volume = metadata.get('total_volume', "Unknown") | |
formatted_project_budget = format_currency(total_project) | |
formatted_total_volume = format_currency(total_volume) | |
try: | |
c_list = json.loads(metadata.get('countries', "[]").replace("'", '"')) | |
except json.JSONDecodeError: | |
c_list = [] | |
matched_countries = [] | |
for code in c_list: | |
if len(code) == 2: | |
resolved_name = get_country_name(code.upper(), region_df) | |
if resolved_name.upper() != code.upper(): | |
matched_countries.append(resolved_name) | |
crs_key = metadata.get("crs_key", "").strip() | |
new_crs_value = lookup_crs_value(crs_key) | |
new_crs_value = lookup_crs_value(crs_key).replace('.0', '') | |
crs_combined = f"{crs_key}: {new_crs_value}" if crs_key else "Unknown" | |
client_name = metadata.get('client', 'Unknown Client') | |
contact = metadata.get("contact", "").strip() | |
additional_text = ( | |
f"**Objective:** {objective}<br>" | |
f"**Commissioned by:** {client_name}<br>" | |
f"**Projekt duration:** {start_year_str}-{end_year_str}<br>" | |
f"**Budget:** Project: {formatted_project_budget}, Total volume: {formatted_total_volume}<br>" | |
f"**Country:** {', '.join(matched_countries)}<br>" | |
f"**Sector:** {crs_combined}" | |
) | |
#if contact and contact.lower() != "[email protected]": | |
# additional_text += f"<br>Contact: **{contact}**" | |
st.markdown(additional_text, unsafe_allow_html=True) | |
st.divider() |