import os import sqlite3 import json import pandas as pd import argparse import re from agno.agent import Agent from agno.models.groq import Groq from dotenv import load_dotenv import sys from clean_database import clean_database sys.stdout.reconfigure(encoding='utf-8') load_dotenv() def detecter_format(fichier): """Retourne l'extension du fichier sous forme explicite.""" extension = os.path.splitext(fichier)[1].lower() if extension in [".db", ".sqlite"]: return "sqlite" elif extension == ".json": return "json" elif extension == ".csv": return "csv" elif extension in [".xls", ".xlsx"]: return "excel" else: return "inconnu" def json_to_sqlite(fichier_json): """Convertit un fichier JSON en base SQLite.""" try: with open(fichier_json, 'r') as file: data = json.load(file) except json.JSONDecodeError as e: print(f"Erreur lors de la lecture du fichier JSON : {e}") return None db_name = 'temp_json.db' conn = sqlite3.connect(db_name) cursor = conn.cursor() # Supposons que les données JSON sont une liste de dictionnaires (une table) table_name = "data" columns = ', '.join(data[0].keys()) placeholders = ', '.join('?' * len(data[0])) cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} ({columns})") for row in data: cursor.execute(f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})", tuple(row.values())) conn.commit() conn.close() print(f"✅ Fichier JSON converti avec succès en {db_name}") return db_name def csv_to_sqlite(fichier_csv): """Convertit un fichier CSV en base SQLite.""" try: df = pd.read_csv(fichier_csv) except pd.errors.ParserError as e: print(f"Erreur lors de la lecture du fichier CSV : {e}") return None db_name = 'temp_csv.db' conn = sqlite3.connect(db_name) df.to_sql('data', conn, if_exists='replace', index=False) conn.close() print(f"✅ Fichier CSV converti avec succès en {db_name}") return db_name def excel_to_sqlite(fichier_excel): """Convertit un fichier Excel en base SQLite.""" try: df = pd.read_excel(fichier_excel) except Exception as e: print(f"Erreur lors de la lecture du fichier Excel : {e}") return None db_name = 'temp_excel.db' conn = sqlite3.connect(db_name) df.to_sql('data', conn, if_exists='replace', index=False) conn.close() print(f"✅ Fichier Excel converti avec succès en {db_name}") return db_name def preparer_bdd(input_path): """Détecte le type du fichier et le convertit en SQLite si nécessaire.""" format_detecte = detecter_format(input_path) print(f"📂 Format détecté : {format_detecte.upper()}") if format_detecte == "sqlite": print("➡️ Aucun traitement requis, base SQLite déjà prête.") return input_path elif format_detecte == "json": return json_to_sqlite(input_path) elif format_detecte == "csv": return csv_to_sqlite(input_path) elif format_detecte == "excel": return excel_to_sqlite(input_path) else: print("❌ Format non supporté. Utilise un fichier .db, .json, .csv, .xls ou .xlsx") return None def extraire_bdd(db_path): """Récupère la structure et les données d'une base SQLite.""" try: with sqlite3.connect(db_path) as conn: cursor = conn.cursor() # Récupérer les tables tables = [table[0] for table in cursor.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%';" ).fetchall()] if not tables: print("⚠️ Aucune table trouvée.") return "" output_bdd = "" for table in tables: output_bdd += f"\n📌 Table `{table}` :\n" # Récupérer les colonnes cursor.execute(f"PRAGMA table_info({table});") columns = cursor.fetchall() col_names = [col[1] for col in columns] col_types = [col[2] for col in columns] output_bdd += f"📝 Colonnes: {', '.join(f'{name} ({ctype})' for name, ctype in zip(col_names, col_types))}\n" # Récupérer les données de la table cursor.execute(f"SELECT * FROM {table}") rows = cursor.fetchall() if rows: output_bdd += "\n".join(f" -> {row}" for row in rows) + "\n" else: output_bdd += "⚠️ Aucune donnée trouvée.\n" return output_bdd except sqlite3.Error as e: print(f"❌ Erreur SQLite : {e}") return "" # Création de l'agent Groq pour la vérification de la normalisation normalization_checker = Agent( model=Groq(id="deepseek-r1-distill-llama-70b", temperature=0.0, top_p=0), description="Vérifie si une base de données est normalisée.", instructions=[ "Analyse la structure de la base de données fournie en entrée.", "Détermine si elle respecte les formes normales (1NF, 2NF, 3NF, BCNF).", "Migre les données de l'ancienne base de données vers la nouvelle base de données normalisée sans générer de code SQL.", "Assurez-vous que toutes les données sont correctement transférées et que les relations sont maintenues.", "Si la base n'est pas normalisée, propose une version améliorée sans générer de schéma SQL." ], markdown=True ) def analyser_bdd(output_bdd: str): """Utilise le premier agent pour analyser et normaliser la base de données.""" prompt = f""" Voici la structure et les données de la base SQLite : {output_bdd} Tu es un expert en base de données exécute l'algorithme suivant pour normaliser et migrer la base de données. Affiche **uniquement** le résultat final sans explication détaillée et Ne pas ajouter de nouvelles colonnes non mentionnées dans la base de données d'origine . --- ### **Algorithme de normalisation et migration** Début Initialiser une variable `resultat_final` pour accumuler les résultats. # Analyser la structure actuelle de la base de données Pour chaque table `T` dans la base de données faire Détecter la clé primaire existante ou en attribuer une si nécessaire. # Appliquer la 1NF : Atomisation des attributs Si `T` contient des attributs non atomiques alors Transformer les attributs en valeurs atomiques. Assigner les clés primaires et ajouter les clés étrangères si nécessaire. Fin Si # Appliquer la 2NF : Éliminer les dépendances fonctionnelles partielles Si `T` contient une clé primaire composite et des attributs qui dépendent d’une partie seulement de la clé alors Décomposer `T` en nouvelles tables en respectant la 2NF. Assigner les clés primaires et ajouter les clés étrangères si nécessaire. Fin Si # Appliquer la 3NF : Éliminer les dépendances transitives Si `T` contient des dépendances transitives (un attribut dépend d’un autre attribut non clé) alors Décomposer `T` en nouvelles tables pour isoler ces dépendances. Assigner les clés primaires et ajouter les clés étrangères si nécessaire. Fin Si # Appliquer la BCNF : Suppression des dépendances anormales Si `T` contient une dépendance fonctionnelle où un attribut non clé détermine une clé candidate alors Décomposer `T` en nouvelles relations respectant la BCNF. Assigner les clés primaires et ajouter les clés étrangères si nécessaire. Fin Si # Migrer les données Pour chaque nouvelle table normalisée faire Identifier les données à migrer depuis l'ancienne base de données. Transformer ou réorganiser les données selon le schéma normalisé proposé. Insérer les données transformées dans la nouvelle table normalisée. Fin Pour # Ajouter les nouvelles tables normalisées au résultat final Ajouter "📝 Nouvelles tables proposées :" à `resultat_final`. Pour chaque nouvelle table `N` créée faire Ajouter "📌 Table `nom_nouvelle_table` :" à `resultat_final`. Ajouter "📝 Colonnes: colonne1 (type1), colonne2 (type2), ..." à `resultat_final`. Ajouter "🔑 Clé primaire: `colonne_PK`" si définie. Ajouter "🔗 Clé étrangère: `colonne_FK` → `table_referencée` (`colonne_referencée`)" si applicable. Ajouter "📋 Données :" à `resultat_final`. Pour chaque enregistrement migré dans la table faire Ajouter " - `valeur1`, `valeur2`, ..." à `resultat_final`. Fin Pour Fin Pour # Vérification finale avant affichage Afficher `resultat_final` Afficher "✅ Normalisation complète et migration réussie." Fin --- """ response = normalization_checker.run(prompt) resultat = response.content.strip() # Supprimer le texte entre ... resultat_sans_think = re.sub(r".*?", "", resultat, flags=re.DOTALL).strip() return resultat_sans_think # Création de l'agent 2 (vérifie la proposition de normalisation) normalization_validator = Agent( model=Groq(id="qwen-2.5-32b", temperature=0.0, top_p=0), description="Vérifie si la base de données normalisée proposée est correcte.", instructions=[ "Analyse la normalisation proposée et vérifie si elle respecte les formes normales.", "Compare la proposition avec la base de données générée pour s'assurer de leur correspondance.", "Donne une proposition améliorée si nécessaire." ], markdown=True ) def verifier_normalisation(proposition_normalisee: str, output_bdd: str): """Utilise le deuxième agent pour valider la normalisation proposée et sa correspondance avec la base de données générée.""" prompt = f""" Voici la base de données générée après application de la normalisation : Vérifie si la normalisation proposée correspond bien à {output_bdd}. Voici une proposition de base de données normalisée : {proposition_normalisee} - Tu es un expert en base de données exécute l'algorithme suivant pour vérifier si la base obtenue correspond bien à la normalisation attendue. Affiche **uniquement** le résultat final sans explication détaillée. --- ### **Algorithme de vérification et correction des formes normales** Début Initialiser une variable `corrections_appliquees` = False Pour chaque table dans la base de données faire Si tous les attributs sont atomiques alors printf("📌 Vérification de la table `nom_table`") printf("✅ La table `nom_table` est en 1NF") Si la table ne contient pas de dépendances fonctionnelles partielles alors printf("✅ La table `nom_table` est en 2NF") Si la table ne contient pas de dépendances transitives alors printf("✅ La table `nom_table` est en 3NF") Si chaque dépendance fonctionnelle est basée sur une clé candidate alors printf("✅ La table `nom_table` est en BCNF") Sinon printf("❌ La table `nom_table` ne respecte pas la BCNF → Correction appliquée.") Appliquer la correction en décomposant la table en relations BCNF. corrections_appliquees = True Fin Si Sinon printf("❌ La table `nom_table` ne respecte pas la 3NF → Correction appliquée.") Appliquer la correction en éliminant les dépendances transitives. corrections_appliquees = True Fin Si Sinon printf("❌ La table `nom_table` ne respecte pas la 2NF → Correction appliquée.") Appliquer la correction en éliminant les dépendances partielles. corrections_appliquees = True Fin Si Sinon printf("❌ La table `nom_table` ne respecte pas la 1NF → Correction appliquée.") Appliquer la correction en atomisant les attributs. corrections_appliquees = True Fin Si Fin Pour Si corrections_appliquees == True alors printf("⚠️ Des corrections ont été appliquées durant la vérification.") Fin Si printf("🔍 Normalisation terminée.") Fin --- """ response = normalization_validator.run(prompt) return response.content.strip() def generate_sql_from_normalized_schema(normalized_schema, output_file): """Génère les requêtes SQL pour créer et insérer des données dans une base de données normalisée.""" create_table_statements = [] insert_data_statements = [] # Analyser le schéma normalisé tables = normalized_schema.split('📌 Table') for table in tables: if not table.strip(): continue # Extraire le nom de la table table_name = "" table_parts = re.split(r'`| :', table, maxsplit=2) if len(table_parts) >= 2: table_name = table_parts[1].strip() # Extraire les colonnes et leurs types columns = [] col_types = {} if '📝 Colonnes: ' in table: columns_line = table.split('📝 Colonnes: ')[1].split('\n')[0] columns = [col.strip().split(' (')[0] for col in columns_line.split(', ')] col_types = {col.split(' (')[0]: col.split(' (')[1].rstrip(')') for col in columns_line.split(', ')} # Extraire la clé primaire (peut être composée) primary_keys = [] pk_match = re.search(r'🔑 Clé primaire: ([\w, ]+)', table) if pk_match: primary_keys = [key.strip() for key in pk_match.group(1).split(',')] # Extraire les clés étrangères foreign_keys = [] fk_matches = re.findall(r'🔗 Clé étrangère: (\w+) → (\w+) \((\w+)\)', table) for fk_col, ref_table, ref_col in fk_matches: foreign_keys.append((fk_col, ref_table, ref_col)) # Générer CREATE TABLE if table_name and columns: create_sql = f"CREATE TABLE {table_name} (\n" # Ajouter les colonnes avec leurs types for col in columns: col_type = col_types.get(col, "TEXT").upper() create_sql += f" {col} {col_type},\n" # Ajouter la clé primaire (peut être composée) if primary_keys: create_sql += f" PRIMARY KEY ({', '.join(primary_keys)}),\n" # Ajouter les clés étrangères for fk in foreign_keys: create_sql += f" FOREIGN KEY ({fk[0]}) REFERENCES {fk[1]}({fk[2]}),\n" # Nettoyer les virgules finales create_sql = re.sub(r',\n$', '\n', create_sql) + ");" create_table_statements.append(create_sql) # Extraire les données et les insérer en une seule requête if '📋 Données :' in table: data_section = table.split('📋 Données :')[1].split('\n\n')[0] data_rows = re.findall(r' - (.*?)\n', data_section) if data_rows: insert_sql = f"INSERT INTO {table_name} (" insert_sql += ', '.join(columns) + ") VALUES\n" formatted_values_list = [] for row in data_rows: # Nettoyer les guillemets simples et backticks values = [v.strip().strip("'`") for v in row.split(', ')] formatted_values = [] for i, value in enumerate(values): col_type = col_types.get(columns[i], "TEXT").upper() if col_type in ['TEXT', 'DATE']: formatted_values.append(f"'{value}'") else: formatted_values.append(f"{value}") formatted_values_list.append(f"({', '.join(formatted_values)})") insert_sql += ",\n".join(formatted_values_list) + ";" insert_data_statements.append(insert_sql) # Ajouter une ligne vide après les insertions pour cette table insert_data_statements.append("") # Écrire dans le fichier with open(output_file, 'w', encoding='utf-8') as f: f.write("-- Structure de la base de données\n") f.write("\n".join(create_table_statements)) f.write("\n\n-- Migrer les données de l'ancienne bdd\n") f.write("\n".join(insert_data_statements)) # --- Exécuter le script --- def main(): parser = argparse.ArgumentParser(description="Outil de transformation de fichier vers base SQLite et normalisation de BDD") parser.add_argument('--input', type=str, required=True, help="Chemin vers le fichier d'entrée (peut être .db, .json, .csv, .xls ou .xlsx)") parser.add_argument('output_file', type=str, help='Chemin vers le fichier SQL de sortie') args = parser.parse_args() # Préparer la base de données selon le format prepared_db_path = preparer_bdd(args.input) if not prepared_db_path: sys.exit("❌ Erreur lors de la préparation de la base de données.") print(f"✅ Base de données prête : {prepared_db_path}") # Nettoyer la base de données clean_database(prepared_db_path) print(f"✅ Base de données nettoyée : {prepared_db_path}") # Extraire la structure et les données de la base SQLite préparée output_bdd = extraire_bdd(prepared_db_path) if not output_bdd: sys.exit("❌ Erreur dans l'extraction de la base de données.") # Mise en place de l'auto-correction et auto-vérification (self-consistency) proposition_normalisee = output_bdd while True: resultat = analyser_bdd(proposition_normalisee) verification = verifier_normalisation(resultat, output_bdd) # Si des corrections sont détectées, mettre à jour la proposition sans afficher le résultat if "⚠️ Des corrections ont été appliquées durant la vérification." in verification: proposition_normalisee = verification else: # Afficher uniquement le résultat final correct et sortir de la boucle print("\n🔍 Résultat final de l'analyse :\n", resultat) print("\n✅ Vérification de la normalisation :\n", verification) break # Générer le fichier SQL final generate_sql_from_normalized_schema(resultat, args.output_file) print(f"✅ Fichier SQL généré: {args.output_file}") if __name__ == "__main__": main()