Spaces:
Sleeping
Sleeping
import os | |
import sqlite3 | |
import json | |
import pandas as pd | |
import argparse | |
import re | |
from agno.agent import Agent | |
from agno.models.groq import Groq | |
from dotenv import load_dotenv | |
import sys | |
from clean_database import clean_database | |
sys.stdout.reconfigure(encoding='utf-8') | |
load_dotenv() | |
def detecter_format(fichier): | |
"""Retourne l'extension du fichier sous forme explicite.""" | |
extension = os.path.splitext(fichier)[1].lower() | |
if extension in [".db", ".sqlite"]: | |
return "sqlite" | |
elif extension == ".json": | |
return "json" | |
elif extension == ".csv": | |
return "csv" | |
elif extension in [".xls", ".xlsx"]: | |
return "excel" | |
else: | |
return "inconnu" | |
def json_to_sqlite(fichier_json): | |
"""Convertit un fichier JSON en base SQLite.""" | |
try: | |
with open(fichier_json, 'r') as file: | |
data = json.load(file) | |
except json.JSONDecodeError as e: | |
print(f"Erreur lors de la lecture du fichier JSON : {e}") | |
return None | |
db_name = 'temp_json.db' | |
conn = sqlite3.connect(db_name) | |
cursor = conn.cursor() | |
# Supposons que les données JSON sont une liste de dictionnaires (une table) | |
table_name = "data" | |
columns = ', '.join(data[0].keys()) | |
placeholders = ', '.join('?' * len(data[0])) | |
cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} ({columns})") | |
for row in data: | |
cursor.execute(f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})", tuple(row.values())) | |
conn.commit() | |
conn.close() | |
print(f"✅ Fichier JSON converti avec succès en {db_name}") | |
return db_name | |
def csv_to_sqlite(fichier_csv): | |
"""Convertit un fichier CSV en base SQLite.""" | |
try: | |
df = pd.read_csv(fichier_csv) | |
except pd.errors.ParserError as e: | |
print(f"Erreur lors de la lecture du fichier CSV : {e}") | |
return None | |
db_name = 'temp_csv.db' | |
conn = sqlite3.connect(db_name) | |
df.to_sql('data', conn, if_exists='replace', index=False) | |
conn.close() | |
print(f"✅ Fichier CSV converti avec succès en {db_name}") | |
return db_name | |
def excel_to_sqlite(fichier_excel): | |
"""Convertit un fichier Excel en base SQLite.""" | |
try: | |
df = pd.read_excel(fichier_excel) | |
except Exception as e: | |
print(f"Erreur lors de la lecture du fichier Excel : {e}") | |
return None | |
db_name = 'temp_excel.db' | |
conn = sqlite3.connect(db_name) | |
df.to_sql('data', conn, if_exists='replace', index=False) | |
conn.close() | |
print(f"✅ Fichier Excel converti avec succès en {db_name}") | |
return db_name | |
def preparer_bdd(input_path): | |
"""Détecte le type du fichier et le convertit en SQLite si nécessaire.""" | |
format_detecte = detecter_format(input_path) | |
print(f"📂 Format détecté : {format_detecte.upper()}") | |
if format_detecte == "sqlite": | |
print("➡️ Aucun traitement requis, base SQLite déjà prête.") | |
return input_path | |
elif format_detecte == "json": | |
return json_to_sqlite(input_path) | |
elif format_detecte == "csv": | |
return csv_to_sqlite(input_path) | |
elif format_detecte == "excel": | |
return excel_to_sqlite(input_path) | |
else: | |
print("❌ Format non supporté. Utilise un fichier .db, .json, .csv, .xls ou .xlsx") | |
return None | |
def extraire_bdd(db_path): | |
"""Récupère la structure et les données d'une base SQLite.""" | |
try: | |
with sqlite3.connect(db_path) as conn: | |
cursor = conn.cursor() | |
# Récupérer les tables | |
tables = [table[0] for table in cursor.execute( | |
"SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%';" | |
).fetchall()] | |
if not tables: | |
print("⚠️ Aucune table trouvée.") | |
return "" | |
output_bdd = "" | |
for table in tables: | |
output_bdd += f"\n📌 Table `{table}` :\n" | |
# Récupérer les colonnes | |
cursor.execute(f"PRAGMA table_info({table});") | |
columns = cursor.fetchall() | |
col_names = [col[1] for col in columns] | |
col_types = [col[2] for col in columns] | |
output_bdd += f"📝 Colonnes: {', '.join(f'{name} ({ctype})' for name, ctype in zip(col_names, col_types))}\n" | |
# Récupérer les données de la table | |
cursor.execute(f"SELECT * FROM {table}") | |
rows = cursor.fetchall() | |
if rows: | |
output_bdd += "\n".join(f" -> {row}" for row in rows) + "\n" | |
else: | |
output_bdd += "⚠️ Aucune donnée trouvée.\n" | |
return output_bdd | |
except sqlite3.Error as e: | |
print(f"❌ Erreur SQLite : {e}") | |
return "" | |
# Création de l'agent Groq pour la vérification de la normalisation | |
normalization_checker = Agent( | |
model=Groq(id="deepseek-r1-distill-llama-70b", temperature=0.0, top_p=0), | |
description="Vérifie si une base de données est normalisée.", | |
instructions=[ | |
"Analyse la structure de la base de données fournie en entrée.", | |
"Détermine si elle respecte les formes normales (1NF, 2NF, 3NF, BCNF).", | |
"Migre les données de l'ancienne base de données vers la nouvelle base de données normalisée sans générer de code SQL.", | |
"Assurez-vous que toutes les données sont correctement transférées et que les relations sont maintenues.", | |
"Si la base n'est pas normalisée, propose une version améliorée sans générer de schéma SQL." | |
], | |
markdown=True | |
) | |
def analyser_bdd(output_bdd: str): | |
"""Utilise le premier agent pour analyser et normaliser la base de données.""" | |
prompt = f""" | |
Voici la structure et les données de la base SQLite : | |
{output_bdd} | |
Tu es un expert en base de données exécute l'algorithme suivant pour normaliser et migrer la base de données. Affiche **uniquement** le résultat final sans explication détaillée et Ne pas ajouter de nouvelles colonnes non mentionnées dans la base de données d'origine . | |
--- | |
### **Algorithme de normalisation et migration** | |
Début | |
Initialiser une variable `resultat_final` pour accumuler les résultats. | |
# Analyser la structure actuelle de la base de données | |
Pour chaque table `T` dans la base de données faire | |
Détecter la clé primaire existante ou en attribuer une si nécessaire. | |
# Appliquer la 1NF : Atomisation des attributs | |
Si `T` contient des attributs non atomiques alors | |
Transformer les attributs en valeurs atomiques. | |
Assigner les clés primaires et ajouter les clés étrangères si nécessaire. | |
Fin Si | |
# Appliquer la 2NF : Éliminer les dépendances fonctionnelles partielles | |
Si `T` contient une clé primaire composite et des attributs qui dépendent d’une partie seulement de la clé alors | |
Décomposer `T` en nouvelles tables en respectant la 2NF. | |
Assigner les clés primaires et ajouter les clés étrangères si nécessaire. | |
Fin Si | |
# Appliquer la 3NF : Éliminer les dépendances transitives | |
Si `T` contient des dépendances transitives (un attribut dépend d’un autre attribut non clé) alors | |
Décomposer `T` en nouvelles tables pour isoler ces dépendances. | |
Assigner les clés primaires et ajouter les clés étrangères si nécessaire. | |
Fin Si | |
# Appliquer la BCNF : Suppression des dépendances anormales | |
Si `T` contient une dépendance fonctionnelle où un attribut non clé détermine une clé candidate alors | |
Décomposer `T` en nouvelles relations respectant la BCNF. | |
Assigner les clés primaires et ajouter les clés étrangères si nécessaire. | |
Fin Si | |
# Migrer les données | |
Pour chaque nouvelle table normalisée faire | |
Identifier les données à migrer depuis l'ancienne base de données. | |
Transformer ou réorganiser les données selon le schéma normalisé proposé. | |
Insérer les données transformées dans la nouvelle table normalisée. | |
Fin Pour | |
# Ajouter les nouvelles tables normalisées au résultat final | |
Ajouter "📝 Nouvelles tables proposées :" à `resultat_final`. | |
Pour chaque nouvelle table `N` créée faire | |
Ajouter "📌 Table `nom_nouvelle_table` :" à `resultat_final`. | |
Ajouter "📝 Colonnes: colonne1 (type1), colonne2 (type2), ..." à `resultat_final`. | |
Ajouter "🔑 Clé primaire: `colonne_PK`" si définie. | |
Ajouter "🔗 Clé étrangère: `colonne_FK` → `table_referencée` (`colonne_referencée`)" si applicable. | |
Ajouter "📋 Données :" à `resultat_final`. | |
Pour chaque enregistrement migré dans la table faire | |
Ajouter " - `valeur1`, `valeur2`, ..." à `resultat_final`. | |
Fin Pour | |
Fin Pour | |
# Vérification finale avant affichage | |
Afficher `resultat_final` | |
Afficher "✅ Normalisation complète et migration réussie." | |
Fin | |
--- | |
""" | |
response = normalization_checker.run(prompt) | |
resultat = response.content.strip() | |
# Supprimer le texte entre <think>...</think> | |
resultat_sans_think = re.sub(r"<think>.*?</think>", "", resultat, flags=re.DOTALL).strip() | |
return resultat_sans_think | |
# Création de l'agent 2 (vérifie la proposition de normalisation) | |
normalization_validator = Agent( | |
model=Groq(id="qwen-2.5-32b", temperature=0.0, top_p=0), | |
description="Vérifie si la base de données normalisée proposée est correcte.", | |
instructions=[ | |
"Analyse la normalisation proposée et vérifie si elle respecte les formes normales.", | |
"Compare la proposition avec la base de données générée pour s'assurer de leur correspondance.", | |
"Donne une proposition améliorée si nécessaire." | |
], | |
markdown=True | |
) | |
def verifier_normalisation(proposition_normalisee: str, output_bdd: str): | |
"""Utilise le deuxième agent pour valider la normalisation proposée et sa correspondance avec la base de données générée.""" | |
prompt = f""" | |
Voici la base de données générée après application de la normalisation : | |
Vérifie si la normalisation proposée correspond bien à {output_bdd}. | |
Voici une proposition de base de données normalisée : | |
{proposition_normalisee} | |
- Tu es un expert en base de données exécute l'algorithme suivant pour vérifier si la base obtenue correspond bien à la normalisation attendue. Affiche **uniquement** le résultat final sans explication détaillée. | |
--- | |
### **Algorithme de vérification et correction des formes normales** | |
Début | |
Initialiser une variable `corrections_appliquees` = False | |
Pour chaque table dans la base de données faire | |
Si tous les attributs sont atomiques alors | |
printf("📌 Vérification de la table `nom_table`") | |
printf("✅ La table `nom_table` est en 1NF") | |
Si la table ne contient pas de dépendances fonctionnelles partielles alors | |
printf("✅ La table `nom_table` est en 2NF") | |
Si la table ne contient pas de dépendances transitives alors | |
printf("✅ La table `nom_table` est en 3NF") | |
Si chaque dépendance fonctionnelle est basée sur une clé candidate alors | |
printf("✅ La table `nom_table` est en BCNF") | |
Sinon | |
printf("❌ La table `nom_table` ne respecte pas la BCNF → Correction appliquée.") | |
Appliquer la correction en décomposant la table en relations BCNF. | |
corrections_appliquees = True | |
Fin Si | |
Sinon | |
printf("❌ La table `nom_table` ne respecte pas la 3NF → Correction appliquée.") | |
Appliquer la correction en éliminant les dépendances transitives. | |
corrections_appliquees = True | |
Fin Si | |
Sinon | |
printf("❌ La table `nom_table` ne respecte pas la 2NF → Correction appliquée.") | |
Appliquer la correction en éliminant les dépendances partielles. | |
corrections_appliquees = True | |
Fin Si | |
Sinon | |
printf("❌ La table `nom_table` ne respecte pas la 1NF → Correction appliquée.") | |
Appliquer la correction en atomisant les attributs. | |
corrections_appliquees = True | |
Fin Si | |
Fin Pour | |
Si corrections_appliquees == True alors | |
printf("⚠️ Des corrections ont été appliquées durant la vérification.") | |
Fin Si | |
printf("🔍 Normalisation terminée.") | |
Fin | |
--- | |
""" | |
response = normalization_validator.run(prompt) | |
return response.content.strip() | |
def generate_sql_from_normalized_schema(normalized_schema, output_file): | |
"""Génère les requêtes SQL pour créer et insérer des données dans une base de données normalisée.""" | |
create_table_statements = [] | |
insert_data_statements = [] | |
# Analyser le schéma normalisé | |
tables = normalized_schema.split('📌 Table') | |
for table in tables: | |
if not table.strip(): | |
continue | |
# Extraire le nom de la table | |
table_name = "" | |
table_parts = re.split(r'`| :', table, maxsplit=2) | |
if len(table_parts) >= 2: | |
table_name = table_parts[1].strip() | |
# Extraire les colonnes et leurs types | |
columns = [] | |
col_types = {} | |
if '📝 Colonnes: ' in table: | |
columns_line = table.split('📝 Colonnes: ')[1].split('\n')[0] | |
columns = [col.strip().split(' (')[0] for col in columns_line.split(', ')] | |
col_types = {col.split(' (')[0]: col.split(' (')[1].rstrip(')') for col in columns_line.split(', ')} | |
# Extraire la clé primaire (peut être composée) | |
primary_keys = [] | |
pk_match = re.search(r'🔑 Clé primaire: ([\w, ]+)', table) | |
if pk_match: | |
primary_keys = [key.strip() for key in pk_match.group(1).split(',')] | |
# Extraire les clés étrangères | |
foreign_keys = [] | |
fk_matches = re.findall(r'🔗 Clé étrangère: (\w+) → (\w+) \((\w+)\)', table) | |
for fk_col, ref_table, ref_col in fk_matches: | |
foreign_keys.append((fk_col, ref_table, ref_col)) | |
# Générer CREATE TABLE | |
if table_name and columns: | |
create_sql = f"CREATE TABLE {table_name} (\n" | |
# Ajouter les colonnes avec leurs types | |
for col in columns: | |
col_type = col_types.get(col, "TEXT").upper() | |
create_sql += f" {col} {col_type},\n" | |
# Ajouter la clé primaire (peut être composée) | |
if primary_keys: | |
create_sql += f" PRIMARY KEY ({', '.join(primary_keys)}),\n" | |
# Ajouter les clés étrangères | |
for fk in foreign_keys: | |
create_sql += f" FOREIGN KEY ({fk[0]}) REFERENCES {fk[1]}({fk[2]}),\n" | |
# Nettoyer les virgules finales | |
create_sql = re.sub(r',\n$', '\n', create_sql) + ");" | |
create_table_statements.append(create_sql) | |
# Extraire les données et les insérer en une seule requête | |
if '📋 Données :' in table: | |
data_section = table.split('📋 Données :')[1].split('\n\n')[0] | |
data_rows = re.findall(r' - (.*?)\n', data_section) | |
if data_rows: | |
insert_sql = f"INSERT INTO {table_name} (" | |
insert_sql += ', '.join(columns) + ") VALUES\n" | |
formatted_values_list = [] | |
for row in data_rows: | |
# Nettoyer les guillemets simples et backticks | |
values = [v.strip().strip("'`") for v in row.split(', ')] | |
formatted_values = [] | |
for i, value in enumerate(values): | |
col_type = col_types.get(columns[i], "TEXT").upper() | |
if col_type in ['TEXT', 'DATE']: | |
formatted_values.append(f"'{value}'") | |
else: | |
formatted_values.append(f"{value}") | |
formatted_values_list.append(f"({', '.join(formatted_values)})") | |
insert_sql += ",\n".join(formatted_values_list) + ";" | |
insert_data_statements.append(insert_sql) | |
# Ajouter une ligne vide après les insertions pour cette table | |
insert_data_statements.append("") | |
# Écrire dans le fichier | |
with open(output_file, 'w', encoding='utf-8') as f: | |
f.write("-- Structure de la base de données\n") | |
f.write("\n".join(create_table_statements)) | |
f.write("\n\n-- Migrer les données de l'ancienne bdd\n") | |
f.write("\n".join(insert_data_statements)) | |
# --- Exécuter le script --- | |
def main(): | |
parser = argparse.ArgumentParser(description="Outil de transformation de fichier vers base SQLite et normalisation de BDD") | |
parser.add_argument('--input', type=str, required=True, help="Chemin vers le fichier d'entrée (peut être .db, .json, .csv, .xls ou .xlsx)") | |
parser.add_argument('output_file', type=str, help='Chemin vers le fichier SQL de sortie') | |
args = parser.parse_args() | |
# Préparer la base de données selon le format | |
prepared_db_path = preparer_bdd(args.input) | |
if not prepared_db_path: | |
sys.exit("❌ Erreur lors de la préparation de la base de données.") | |
print(f"✅ Base de données prête : {prepared_db_path}") | |
# Nettoyer la base de données | |
clean_database(prepared_db_path) | |
print(f"✅ Base de données nettoyée : {prepared_db_path}") | |
# Extraire la structure et les données de la base SQLite préparée | |
output_bdd = extraire_bdd(prepared_db_path) | |
if not output_bdd: | |
sys.exit("❌ Erreur dans l'extraction de la base de données.") | |
# Mise en place de l'auto-correction et auto-vérification (self-consistency) | |
proposition_normalisee = output_bdd | |
while True: | |
resultat = analyser_bdd(proposition_normalisee) | |
verification = verifier_normalisation(resultat, output_bdd) | |
# Si des corrections sont détectées, mettre à jour la proposition sans afficher le résultat | |
if "⚠️ Des corrections ont été appliquées durant la vérification." in verification: | |
proposition_normalisee = verification | |
else: | |
# Afficher uniquement le résultat final correct et sortir de la boucle | |
print("\n🔍 Résultat final de l'analyse :\n", resultat) | |
print("\n✅ Vérification de la normalisation :\n", verification) | |
break | |
# Générer le fichier SQL final | |
generate_sql_from_normalized_schema(resultat, args.output_file) | |
print(f"✅ Fichier SQL généré: {args.output_file}") | |
if __name__ == "__main__": | |
main() | |