|
import dash |
|
from dash import dcc, html, Input, Output, State, ctx |
|
import dash_bootstrap_components as dbc |
|
import plotly.express as px |
|
import pandas as pd |
|
import numpy as np |
|
import umap |
|
import hdbscan |
|
import sklearn.feature_extraction.text as text |
|
from dash.exceptions import PreventUpdate |
|
import os |
|
from dotenv import load_dotenv |
|
import helpers |
|
import lancedb |
|
from omeka_s_api_client import OmekaSClient, OmekaSClientError |
|
from lancedb_client import LanceDBManager |
|
|
|
|
|
load_dotenv() |
|
_DEFAULT_PARSE_METADATA = ( |
|
'dcterms:identifier','dcterms:type','dcterms:title', 'dcterms:description', |
|
'dcterms:creator','dcterms:publisher','dcterms:date','dcterms:spatial', |
|
'dcterms:format','dcterms:provenance','dcterms:subject','dcterms:medium', |
|
'bibo:annotates','bibo:content', 'bibo:locator', 'bibo:owner' |
|
) |
|
|
|
app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP]) |
|
app.config.suppress_callback_exceptions = True |
|
server = app.server |
|
manager = LanceDBManager() |
|
|
|
french_stopwords = text.ENGLISH_STOP_WORDS.union([ |
|
"alors", "au", "aucuns", "aussi", "autre", "avant", "avec", "avoir", "bon", |
|
"car", "ce", "cela", "ces", "ceux", "chaque", "ci", "comme", "comment", "dans", |
|
"des", "du", "dedans", "dehors", "depuis", "devrait", "doit", "donc", "dos", |
|
"début", "elle", "elles", "en", "encore", "essai", "est", "et", "eu", "fait", |
|
"faites", "fois", "font", "hors", "ici", "il", "ils", "je", "juste", "la", "le", |
|
"les", "leur", "là", "ma", "maintenant", "mais", "mes", "mine", "moins", "mon", |
|
"mot", "même", "ni", "nommés", "notre", "nous", "nouveaux", "ou", "où", "par", |
|
"parce", "parole", "pas", "personnes", "peut", "peu", "pièce", "plupart", "pour", |
|
"pourquoi", "quand", "que", "quel", "quelle", "quelles", "quels", "qui", "sa", |
|
"sans", "ses", "seulement", "si", "sien", "son", "sont", "sous", "soyez", "sujet", |
|
"sur", "ta", "tandis", "tellement", "tels", "tes", "ton", "tous", "tout", "trop", |
|
"très", "tu", "valeur", "voie", "voient", "vont", "votre", "vous", "vu", "ça", |
|
"étaient", "état", "étions", "été", "être" |
|
]) |
|
|
|
|
|
app.layout = dbc.Container([ |
|
html.H2("🌍 Omeka S UMAP Explorer", className="text-center mt-4"), |
|
html.Hr(), |
|
|
|
|
|
dbc.Row([ |
|
dbc.Col([ |
|
html.H5("🔍 From Omeka S"), |
|
dcc.Input(id="api-url", value="https://your-omeka-instance.org", type="text", className="form-control"), |
|
dbc.Button("Load Item Sets", id="load-sets", color="secondary", className="mt-2"), |
|
dcc.Dropdown(id="items-sets-dropdown", placeholder="Select a collection"), |
|
dcc.Input(id="table-name", value="my_table", type="text", className="form-control mt-2", placeholder="New table name"), |
|
dbc.Button("Process Omeka Collection", id="load-data", color="primary", className="mt-2"), |
|
], md=4), |
|
|
|
dbc.Col([ |
|
html.H5("📁 From LanceDB"), |
|
dbc.Button("Load Existing Tables", id="load-tables", color="info"), |
|
dcc.Dropdown(id="db-tables-dropdown", placeholder="Select an existing table"), |
|
dbc.Button("Display Table", id="load-data-db", color="success", className="mt-2"), |
|
], md=4), |
|
|
|
dbc.Col([ |
|
html.H5("🔎 Query Tool (coming soon)"), |
|
dbc.Input(placeholder="Type a search query...", type="text", disabled=True), |
|
], md=4), |
|
], className="mb-4"), |
|
|
|
|
|
dbc.Row([ |
|
dbc.Col( |
|
dcc.Graph(id="umap-graph", style={"height": "700px"}), |
|
md=8 |
|
), |
|
dbc.Col( |
|
html.Div(id="point-details", style={ |
|
"padding": "15px", |
|
"borderLeft": "1px solid #ccc", |
|
"height": "700px", |
|
"overflowY": "auto" |
|
}), |
|
md=4 |
|
), |
|
]), |
|
|
|
|
|
html.Div(id="status", className="mt-3"), |
|
|
|
dcc.Store(id="omeka-client-config", storage_type="session") |
|
], fluid=True) |
|
|
|
|
|
|
|
@app.callback( |
|
Output("items-sets-dropdown", "options"), |
|
Output("omeka-client-config", "data"), |
|
Input("load-sets", "n_clicks"), |
|
State("api-url", "value"), |
|
prevent_initial_call=True |
|
) |
|
def load_item_sets(n, base_url): |
|
client = OmekaSClient(base_url, "...", "...", 50) |
|
try: |
|
item_sets = client.list_all_item_sets() |
|
options = [{"label": s.get('dcterms:title', [{}])[0].get('@value', 'N/A'), "value": s["o:id"]} for s in item_sets] |
|
return options, { |
|
"base_url": base_url, |
|
"key_identity": "...", |
|
"key_credential": "...", |
|
"default_per_page": 50 |
|
} |
|
except Exception as e: |
|
return dash.no_update, dash.no_update |
|
|
|
@app.callback( |
|
Output("db-tables-dropdown", "options"), |
|
Input("load-tables", "n_clicks"), |
|
prevent_initial_call=True |
|
) |
|
def list_tables(n): |
|
return [{"label": t, "value": t} for t in manager.list_tables()] |
|
|
|
@app.callback( |
|
Output("umap-graph", "figure"), |
|
Output("status", "children"), |
|
Input("load-data", "n_clicks"), |
|
Input("load-data-db", "n_clicks"), |
|
State("items-sets-dropdown", "value"), |
|
State("omeka-client-config", "data"), |
|
State("table-name", "value"), |
|
State("db-tables-dropdown", "value"), |
|
prevent_initial_call=True |
|
) |
|
def handle_data_loading(n_clicks_omeka, n_clicks_db, item_set_id, client_config, table_name, db_table): |
|
triggered_id = ctx.triggered_id |
|
print(triggered_id) |
|
|
|
if triggered_id == "load-data": |
|
if not client_config: |
|
raise PreventUpdate |
|
|
|
client = OmekaSClient( |
|
base_url=client_config["base_url"], |
|
key_identity=client_config["key_identity"], |
|
key_credential=client_config["key_credential"] |
|
) |
|
|
|
df_omeka = harvest_omeka_items(client, item_set_id=item_set_id) |
|
items = df_omeka.to_dict(orient="records") |
|
records_with_text = [helpers.add_concatenated_text_field_exclude_keys(item, keys_to_exclude=['id','images_urls'], text_field_key='text', pair_separator=' - ') for item in items] |
|
df = helpers.prepare_df_atlas(pd.DataFrame(records_with_text), id_col='id', images_col='images_urls') |
|
|
|
text_embed = helpers.generate_text_embed(df['text'].tolist()) |
|
img_embed = helpers.generate_img_embed(df['images_urls'].tolist()) |
|
embeddings = np.concatenate([text_embed, img_embed], axis=1) |
|
df["embeddings"] = embeddings.tolist() |
|
|
|
reducer = umap.UMAP(n_neighbors=15, min_dist=0.1, metric="cosine") |
|
umap_embeddings = reducer.fit_transform(embeddings) |
|
df["umap_embeddings"] = umap_embeddings.tolist() |
|
|
|
clusterer = hdbscan.HDBSCAN(min_cluster_size=10) |
|
cluster_labels = clusterer.fit_predict(umap_embeddings) |
|
df["Cluster"] = cluster_labels |
|
|
|
vectorizer = text.TfidfVectorizer(max_features=1000, stop_words=list(french_stopwords), lowercase=True) |
|
tfidf_matrix = vectorizer.fit_transform(df["text"].astype(str).tolist()) |
|
top_words = [] |
|
for label in sorted(df["Cluster"].unique()): |
|
if label == -1: |
|
top_words.append("Noise") |
|
continue |
|
mask = (df["Cluster"] == label).to_numpy().nonzero()[0] |
|
cluster_docs = tfidf_matrix[mask] |
|
mean_tfidf = cluster_docs.mean(axis=0) |
|
mean_tfidf = np.asarray(mean_tfidf).flatten() |
|
top_indices = mean_tfidf.argsort()[::-1][:5] |
|
terms = [vectorizer.get_feature_names_out()[i] for i in top_indices] |
|
top_words.append(", ".join(terms)) |
|
cluster_name_map = {label: name for label, name in zip(sorted(df["Cluster"].unique()), top_words)} |
|
df["Topic"] = df["Cluster"].map(cluster_name_map) |
|
|
|
manager.initialize_table(table_name) |
|
manager.add_entry(table_name, df.to_dict(orient="records")) |
|
|
|
elif triggered_id == "load-data-db": |
|
if not db_table: |
|
raise PreventUpdate |
|
items = manager.get_content_table(db_table) |
|
df = pd.DataFrame(items) |
|
df = df.dropna(axis=1, how='all') |
|
df = df.fillna('') |
|
|
|
|
|
else: |
|
raise PreventUpdate |
|
|
|
|
|
return create_umap_plot(df) |
|
|
|
|
|
@app.callback( |
|
Output("point-details", "children"), |
|
Input("umap-graph", "clickData") |
|
) |
|
def show_point_details(clickData): |
|
if not clickData: |
|
return html.Div("🖱️ Click a point to see more details.", style={"color": "#888"}) |
|
img_url, title, desc = clickData["points"][0]["customdata"] |
|
return html.Div([ |
|
html.H4(title), |
|
html.Img(src=img_url, style={"maxWidth": "100%", "marginBottom": "10px"}), |
|
html.P(desc or "No description available.") |
|
]) |
|
|
|
|
|
|
|
def harvest_omeka_items(client, item_set_id=None, per_page=50): |
|
""" |
|
Fetch and parse items from Omeka S. |
|
Args: |
|
client: OmekaSClient instance |
|
item_set_id: ID of the item set to fetch items from (optional) |
|
per_page: Number of items to fetch per page (default: 50) |
|
Returns: |
|
DataFrame containing parsed item data |
|
""" |
|
print("\n--- Fetching and Parsing Multiple Items by colection---") |
|
try: |
|
|
|
items_list = client.list_all_items(item_set_id=item_set_id, per_page=per_page) |
|
print(items_list) |
|
print(f"Fetched {len(items_list)} items.") |
|
|
|
parsed_items_list = [] |
|
for item_raw in items_list: |
|
if 'o:media' in item_raw: |
|
parsed = client.digest_item_data(item_raw, prefixes=_DEFAULT_PARSE_METADATA) |
|
if parsed: |
|
|
|
medias_id = [x["o:id"] for x in item_raw["o:media"]] |
|
medias_list = [] |
|
for media_id in medias_id: |
|
media = client.get_media(media_id) |
|
if "image" in media["o:media_type"]: |
|
medias_list.append(media.get('o:original_url')) |
|
if medias_list: |
|
parsed["images_urls"] = medias_list |
|
parsed_items_list.append(parsed) |
|
print(f"Successfully parsed {len(parsed_items_list)} items.") |
|
|
|
print(f"Successfully parsed {len(parsed_items_list)} items.") |
|
|
|
print("\nDataFrame from parsed items:") |
|
return pd.DataFrame(parsed_items_list) |
|
except OmekaSClientError as e: |
|
print(f"Error fetching/parsing multiple items: {e}") |
|
except Exception as e: |
|
print(f"An unexpected error occurred during multi-item parsing: {e}") |
|
|
|
def create_umap_plot(df): |
|
coords = np.array(df["umap_embeddings"].tolist()) |
|
fig = px.scatter( |
|
df, x=coords[:, 0], y=coords[:, 1], |
|
color="Topic", |
|
custom_data=["images_urls", "Title", "Description"], |
|
hover_data=None, |
|
title="UMAP Projection with HDBSCAN Topics" |
|
) |
|
fig.update_traces( |
|
marker=dict(size=8, line=dict(width=1, color="DarkSlateGrey")), |
|
hovertemplate="<b>%{customdata[1]}</b><br><img src='%{customdata[0]}' height='150'><extra></extra>" |
|
) |
|
fig.update_layout(height=700, margin=dict(t=30, b=30, l=30, r=30)) |
|
return fig, f"Loaded {len(df)} items and projected into 2D." |
|
|
|
if __name__ == "__main__": |
|
app.run(debug=True, port=7860) |
|
|