import os import sqlite3 import json import pandas as pd import re from agno.agent import Agent from agno.models.groq import Groq import streamlit as st from dotenv import load_dotenv import graphviz from clean_database import clean_database # Charger les variables d'environnement depuis le fichier .env load_dotenv() # Fonctions de normalisation de la base de données def detecter_format(fichier): extension = os.path.splitext(fichier)[1].lower() if extension in [".db", ".sqlite"]: return "sqlite" elif extension == ".json": return "json" elif extension == ".csv": return "csv" elif extension in [".xls", ".xlsx"]: return "excel" else: return "inconnu" def json_to_sqlite(fichier_json): with open(fichier_json, 'r', encoding='utf-8') as file: data = json.load(file) db_name = 'temp_json.db' conn = sqlite3.connect(db_name) cursor = conn.cursor() table_name = "data" columns = ', '.join(data[0].keys()) placeholders = ', '.join('?' * len(data[0])) cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} ({columns})") for row in data: cursor.execute(f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})", tuple(row.values())) conn.commit() conn.close() return db_name def csv_to_sqlite(fichier_csv): df = pd.read_csv(fichier_csv, encoding='utf-8') db_name = 'temp_csv.db' conn = sqlite3.connect(db_name) df.to_sql('data', conn, if_exists='replace', index=False) conn.close() return db_name def excel_to_sqlite(fichier_excel): df = pd.read_excel(fichier_excel, encoding='utf-8') db_name = 'temp_excel.db' conn = sqlite3.connect(db_name) df.to_sql('data', conn, if_exists='replace', index=False) conn.close() return db_name def preparer_bdd(input_path): format_detecte = detecter_format(input_path) if format_detecte == "sqlite": return input_path elif format_detecte == "json": return json_to_sqlite(input_path) elif format_detecte == "csv": return csv_to_sqlite(input_path) elif format_detecte == "excel": return excel_to_sqlite(input_path) else: st.error("Format non supporté. Utilisez un fichier .db, .json, .csv, .xls ou .xlsx") return None def extraire_bdd(db_path): with sqlite3.connect(db_path) as conn: cursor = conn.cursor() tables = [table[0] for table in cursor.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%';" ).fetchall()] if not tables: st.warning("Aucune table trouvée.") return "" output_bdd = "" for table in tables: output_bdd += f"\n📌 Table `{table}` :\n" cursor.execute(f"PRAGMA table_info({table});") columns = cursor.fetchall() col_names = [col[1] for col in columns] col_types = [col[2] for col in columns] output_bdd += f"📝 Colonnes: {', '.join(f'{name} ({ctype})' for name, ctype in zip(col_names, col_types))}\n" cursor.execute(f"SELECT * FROM {table}") rows = cursor.fetchall() if rows: output_bdd += "\n".join(f" -> {row}" for row in rows) + "\n" else: output_bdd += "⚠️ Aucune donnée trouvée.\n" return output_bdd # Création de l'agent Groq pour la vérification de la normalisation normalization_checker = Agent( model=Groq(id="deepseek-r1-distill-llama-70b", temperature=0.0, top_p=0), description="Vérifie si une base de données est normalisée.", instructions=[ "Analyse la structure de la base de données fournie en entrée.", "Détermine si elle respecte les formes normales (1NF, 2NF, 3NF, BCNF).", "Migre les données de l'ancienne base de données vers la nouvelle base de données normalisée sans générer de code SQL.", "Assurez-vous que toutes les données sont correctement transférées et que les relations sont maintenues.", "Si la base n'est pas normalisée, propose une version améliorée sans générer de schéma SQL." ], markdown=True ) def analyser_bdd(output_bdd: str): prompt = f""" Voici la structure et les données de la base SQLite : {output_bdd} Tu es un expert en base de données exécute l'algorithme suivant pour normaliser et migrer la base de données. Affiche **uniquement** le résultat final sans explication détaillée et Ne pas ajouter de nouvelles colonnes non mentionnées dans la base de données d'origine. --- ### **Algorithme de normalisation et migration** Début Initialiser une variable `resultat_final` pour accumuler les résultats. # Analyser la structure actuelle de la base de données Pour chaque table `T` dans la base de données faire Détecter la clé primaire existante ou en attribuer une si nécessaire. # Appliquer la 1NF : Atomisation des attributs Si `T` contient des attributs non atomiques alors Transformer les attributs en valeurs atomiques. Assigner les clés primaires et ajouter les clés étrangères si nécessaire. Fin Si # Appliquer la 2NF : Éliminer les dépendances fonctionnelles partielles Si `T` contient une clé primaire composite et des attributs qui dépendent d’une partie seulement de la clé alors Décomposer `T` en nouvelles tables en respectant la 2NF. Assigner les clés primaires et ajouter les clés étrangères si nécessaire. Fin Si # Appliquer la 3NF : Éliminer les dépendances transitives Si `T` contient des dépendances transitives (un attribut dépend d’un autre attribut non clé) alors Décomposer `T` en nouvelles tables pour isoler ces dépendances. Assigner les clés primaires et ajouter les clés étrangères si nécessaire. Fin Si # Appliquer la BCNF : Suppression des dépendances anormales Si `T` contient une dépendance fonctionnelle où un attribut non clé détermine une clé candidate alors Décomposer `T` en nouvelles relations respectant la BCNF. Assigner les clés primaires et ajouter les clés étrangères si nécessaire. Fin Si # Migrer les données Pour chaque nouvelle table normalisée faire Identifier les données à migrer depuis l'ancienne base de données. Transformer ou réorganiser les données selon le schéma normalisé proposé. Insérer les données transformées dans la nouvelle table normalisée. Fin Pour # Le résultat doit absolument respecter cette structure. Ajouter "📝 Nouvelles tables proposées :" à `resultat_final`. Pour chaque nouvelle table `N` créée faire Ajouter "📌 Table `nom_nouvelle_table` :" à `resultat_final`. Ajouter "📝 Colonnes: colonne1 (type1), colonne2 (type2), ..." à `resultat_final`. Ajouter "🔑 Clé primaire: `colonne_PK`" si définie. Ajouter "🔗 Clé étrangère: `colonne_FK` → `table_referencée` (`colonne_referencée`)" si applicable. Ajouter "📋 Données :" à `resultat_final`. Pour chaque enregistrement migré dans la table faire Ajouter " - `valeur1`, `valeur2`, ..." à `resultat_final`. Fin Pour Fin Pour # Vérification finale avant affichage Afficher `resultat_final` Afficher "✅ Normalisation complète et migration réussie." Fin --- """ response = normalization_checker.run(prompt) resultat = response.content.strip() resultat_sans_think = re.sub(r".*?", "", resultat, flags=re.DOTALL).strip() return resultat_sans_think # Création de l'agent 2 (vérifie la proposition de normalisation) normalization_validator = Agent( model=Groq(id="qwen-qwq-32b", temperature=0.0, top_p=0), description="Vérifie si la base de données normalisée proposée est correcte.", instructions=[ "Analyse la normalisation proposée et vérifie si elle respecte les formes normales.", "Compare la proposition avec la base de données générée pour s'assurer de leur correspondance.", "Donne une proposition améliorée si nécessaire." ], markdown=True ) def verifier_normalisation(proposition_normalisee: str, output_bdd: str): prompt = f""" Voici la base de données générée après application de la normalisation : Vérifie si la normalisation proposée correspond bien à {output_bdd}. Voici une proposition de base de données normalisée : {proposition_normalisee} - Tu es un expert en base de données exécute l'algorithme suivant pour vérifier si la base obtenue correspond bien à la normalisation attendue. Affiche **uniquement** le résultat final sans explication détaillée. --- ### **Algorithme de vérification et correction des formes normales** Début Initialiser une variable `corrections_appliquees` = False Pour chaque table dans la base de données faire Si tous les attributs sont atomiques alors printf("📌 Vérification de la table `nom_table`") printf("✅ La table `nom_table` est en 1NF") Si la table ne contient pas de dépendances fonctionnelles partielles alors printf("✅ La table `nom_table` est en 2NF") Si la table ne contient pas de dépendances transitives alors printf("✅ La table `nom_table` est en 3NF") Si chaque dépendance fonctionnelle est basée sur une clé candidate alors printf("✅ La table `nom_table` est en BCNF") Sinon printf("❌ La table `nom_table` ne respecte pas la BCNF → Correction appliquée.") Appliquer la correction en décomposant la table en relations BCNF. corrections_appliquees = True Fin Si Sinon printf("❌ La table `nom_table` ne respecte pas la 3NF → Correction appliquée.") Appliquer la correction en éliminant les dépendances transitives. corrections_appliquees = True Fin Si Sinon printf("❌ La table `nom_table` ne respecte pas la 2NF → Correction appliquée.") Appliquer la correction en éliminant les dépendances partielles. corrections_appliquees = True Fin Si Sinon printf("❌ La table `nom_table` ne respecte pas la 1NF → Correction appliquée.") Appliquer la correction en atomisant les attributs. corrections_appliquees = True Fin Si Fin Pour Si corrections_appliquees == True alors printf("⚠️ Des corrections ont été appliquées durant la vérification.") Fin Si printf("🔍 Normalisation terminée.") Fin --- """ response = normalization_validator.run(prompt) resultat = response.content.strip() resultat_sans_think = re.sub(r".*?", "", resultat, flags=re.DOTALL).strip() return resultat_sans_think # Création de l'agent 3 (crée la nouvelle bdd normalisée) sql_generator = Agent( model=Groq(id="whisper-large-v3", temperature=0.0, top_p=0), description="Crée la nouvelle bdd normalisée.", instructions=[ "Analyse le résultat de normalisation et génère les requêtes SQL pour créer la nouvelle bdd." ], markdown=True ) def génération_des_requêtes(proposition_normalisee: str): prompt = f""" Voici une proposition de base de données normalisée : {proposition_normalisee} Génère uniquement les requêtes SQL pour créer et insérer des données dans la nouvelle base de données normalisée (crée les nouvelle tables et insérée tous les donnez et n'ajoutes rien). """ response = sql_generator.run(prompt) resultat = response.content.strip() resultat_sans_think = re.sub(r".*?", "", resultat, flags=re.DOTALL).strip() return resultat_sans_think def nettoyer_requetes_sql(texte): # Expression régulière pour capturer les requêtes CREATE TABLE et INSERT INTO pattern = r"(CREATE TABLE.*?;|INSERT INTO.*?;)" requetes_sql = re.findall(pattern, texte, re.DOTALL | re.IGNORECASE) return "\n\n".join(req.strip() for req in requetes_sql) # --- Fonctions utilitaires --- def create_sqlite_db_from_file(sql_file, db_name): if os.path.exists(db_name): os.remove(db_name) conn = sqlite3.connect(db_name) cursor = conn.cursor() with open(sql_file, 'r') as file: sql_script = file.read() cursor.executescript(sql_script) conn.commit() conn.close() def get_database_structure(db_name): conn = sqlite3.connect(db_name) cursor = conn.cursor() # Récupérer les tables cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") tables = cursor.fetchall() structure = "" for table in tables: table_name = table[0] structure += f"Table: {table_name}\n" # Récupérer les colonnes cursor.execute(f"PRAGMA table_info({table_name});") columns = cursor.fetchall() for column in columns: structure += f" Colonne: {column[1]} (Type: {column[2]})\n" conn.close() return structure def get_sample_data(db_name, limit=3): conn = sqlite3.connect(db_name) cursor = conn.cursor() sample_text = "" cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") tables = cursor.fetchall() for table in tables: table_name = table[0] sample_text += f"\nExtrait de la table {table_name} :\n" df = pd.read_sql_query(f"SELECT * FROM {table_name} LIMIT {limit}", conn) sample_text += df.to_string(index=False) sample_text += "\n" conn.close() return sample_text def transform_prompt_to_sql(prompt, db_structure): instructions = f""" Voici la structure de la base de données : {db_structure} Voici la requête de l'utilisateur : {prompt} Transforme ce prompt en une requête SQL en utilisant la structure de la base de données fournie. """ response = sql_transformer.run(instructions) return response.content.strip() def clean_sql_query(response): # Utiliser une expression régulière pour extraire uniquement la requête SQL match = re.search(r'```sql\s*(.*?)\s*```', response, re.DOTALL) if match: return match.group(1).strip() else: raise ValueError("Impossible d'extraire la requête SQL de la réponse.") def execute_sql_query(db_name, sql_query): conn = sqlite3.connect(db_name) cursor = conn.cursor() cursor.execute(sql_query) results = cursor.fetchall() conn.close() return results def visualize_database_schema_from_sql(sql_file): with open(sql_file, 'r') as file: sql_script = file.read() # Extraction des tables et colonnes tables = {} pks = {} fks = [] create_table_pattern = re.compile(r'CREATE TABLE\s+(\w+)\s*\((.*?)\);', re.DOTALL | re.IGNORECASE) fk_pattern = re.compile(r'FOREIGN KEY\s*\((\w+)\)\s*REFERENCES\s+(\w+)\s*\((\w+)\)', re.IGNORECASE) pk_inline_pattern = re.compile(r'^(\w+)\s+\w+.*PRIMARY KEY', re.IGNORECASE) pk_constraint_pattern = re.compile(r'PRIMARY KEY\s*\((.*?)\)', re.IGNORECASE) for match in create_table_pattern.finditer(sql_script): table_name = match.group(1) body = match.group(2) lines = [line.strip() for line in body.split(',') if line.strip()] columns = [] primary_keys = [] for line in lines: if line.upper().startswith("FOREIGN KEY"): fk_match = fk_pattern.search(line) if fk_match: source_col = fk_match.group(1) ref_table = fk_match.group(2) ref_col = fk_match.group(3) fks.append((table_name, source_col, ref_table, ref_col)) elif line.upper().startswith("PRIMARY KEY"): pk_match = pk_constraint_pattern.search(line) if pk_match: pk_cols = [pk.strip() for pk in pk_match.group(1).split(',')] primary_keys.extend(pk_cols) else: parts = line.split() if len(parts) == 0: continue # ligne vide ou espace col_name = parts[0].strip("()") # on enlève les parenthèses résiduelles if col_name.isidentifier(): # vérifie que c’est un nom valide if col_name not in columns: # Vérifie si la colonne n'est pas déjà présente columns.append(col_name) inline_pk = pk_inline_pattern.match(line) if inline_pk: if inline_pk.group(1) not in primary_keys: # Vérifie si la clé primaire n'est pas déjà présente primary_keys.append(inline_pk.group(1)) tables[table_name] = columns pks[table_name] = primary_keys # Construction du graphe avec style MERISE dot = graphviz.Digraph(format='png') dot.attr('node', shape='plaintext') for table, columns in tables.items(): rows = [] for col in columns: if col in pks.get(table, []): rows.append(f'{col}') else: rows.append(f'{col}') label = f"""< {''.join(rows)}
{table}
>""" dot.node(table, label=label) for src_table, src_col, tgt_table, tgt_col in fks: dot.edge(src_table, tgt_table, label=f"{src_col} → {tgt_col}") st.graphviz_chart(dot) # --- Config Agent --- sql_transformer = Agent( model=Groq(id="llama3-70b-8192", temperature=0.0, top_p=0), description="Transforme une requête en langage naturel en requête SQL.", instructions=[ "Analyse la requête fournie en entrée.", "Transforme la requête en langage naturel en une requête SQL.", ], markdown=True ) # --- Interface Streamlit --- st.set_page_config(page_title="Assistant SQL avec LLM", layout="wide") st.title("🧠📊 Assistant SQL intelligent (avec LLM)") db_name = "database.db" sql_file = "output.sql" # Champ de saisie pour le fichier d'entrée input_file = st.file_uploader("Choisissez un fichier (.db, .json, .csv, .xls ou .xlsx)", type=["db", "sqlite", "json", "csv", "xls", "xlsx"]) if input_file: # Enregistrer le fichier téléchargé temporairement temp_file_path = f"temp_{input_file.name}" with open(temp_file_path, "wb") as f: f.write(input_file.getbuffer()) prepared_db_path = preparer_bdd(temp_file_path) if prepared_db_path: st.success(f"Base de données prête : {prepared_db_path}") clean_database(prepared_db_path) st.success("Base de données nettoyée.") output_bdd = extraire_bdd(prepared_db_path) if output_bdd: proposition_normalisee = output_bdd while True: resultat = analyser_bdd(proposition_normalisee) verification = verifier_normalisation(resultat, output_bdd) if "⚠️ Des corrections ont été appliquées durant la vérification." in verification: proposition_normalisee = verification else: st.text("\n🔍 Résultat final de l'analyse :\n" + resultat) st.text("\n✅ Vérification de la normalisation :\n" + verification) break # Générer les requêtes SQL pour la nouvelle base de données normalisée sql_content = génération_des_requêtes(resultat) requetes_nettoyees = nettoyer_requetes_sql(sql_content) # Écrire les requêtes SQL dans le fichier output.sql with open(sql_file, "w", encoding='utf-8') as fichier_sql: fichier_sql.write(requetes_nettoyees) st.success(f"Fichier SQL généré: {sql_file}") create_sqlite_db_from_file(sql_file, db_name) # Visualiser le schéma de la base de données st.subheader("Schéma des relations entre les tables") visualize_database_schema_from_sql(sql_file) # Champ de saisie pour les requêtes en langage naturel prompt = st.text_input("💬 Pose ta question :", placeholder="Ex: Liste les patients atteints de diabète") if prompt: st.write("🛠️ Génération SQL en cours…") structure = get_database_structure(db_name) response = transform_prompt_to_sql(prompt, structure) sql_query = clean_sql_query(response) st.code(sql_query, language="sql") try: df = execute_sql_query(db_name, sql_query) st.success("✅ Requête exécutée avec succès !") st.dataframe(df) except Exception as e: st.error(f"❌ Erreur lors de l'exécution de la requête : {e}")