DB_normlizer / normalisation_script.py
Cherryblade29's picture
Upload normalisation_script.py
8e0449f verified
import os
import sqlite3
import json
import pandas as pd
import argparse
import re
from agno.agent import Agent
from agno.models.groq import Groq
from dotenv import load_dotenv
import sys
from clean_database import clean_database
sys.stdout.reconfigure(encoding='utf-8')
load_dotenv()
def detecter_format(fichier):
"""Retourne l'extension du fichier sous forme explicite."""
extension = os.path.splitext(fichier)[1].lower()
if extension in [".db", ".sqlite"]:
return "sqlite"
elif extension == ".json":
return "json"
elif extension == ".csv":
return "csv"
elif extension in [".xls", ".xlsx"]:
return "excel"
else:
return "inconnu"
def json_to_sqlite(fichier_json):
"""Convertit un fichier JSON en base SQLite."""
try:
with open(fichier_json, 'r') as file:
data = json.load(file)
except json.JSONDecodeError as e:
print(f"Erreur lors de la lecture du fichier JSON : {e}")
return None
db_name = 'temp_json.db'
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
# Supposons que les données JSON sont une liste de dictionnaires (une table)
table_name = "data"
columns = ', '.join(data[0].keys())
placeholders = ', '.join('?' * len(data[0]))
cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} ({columns})")
for row in data:
cursor.execute(f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})", tuple(row.values()))
conn.commit()
conn.close()
print(f"✅ Fichier JSON converti avec succès en {db_name}")
return db_name
def csv_to_sqlite(fichier_csv):
"""Convertit un fichier CSV en base SQLite."""
try:
df = pd.read_csv(fichier_csv)
except pd.errors.ParserError as e:
print(f"Erreur lors de la lecture du fichier CSV : {e}")
return None
db_name = 'temp_csv.db'
conn = sqlite3.connect(db_name)
df.to_sql('data', conn, if_exists='replace', index=False)
conn.close()
print(f"✅ Fichier CSV converti avec succès en {db_name}")
return db_name
def excel_to_sqlite(fichier_excel):
"""Convertit un fichier Excel en base SQLite."""
try:
df = pd.read_excel(fichier_excel)
except Exception as e:
print(f"Erreur lors de la lecture du fichier Excel : {e}")
return None
db_name = 'temp_excel.db'
conn = sqlite3.connect(db_name)
df.to_sql('data', conn, if_exists='replace', index=False)
conn.close()
print(f"✅ Fichier Excel converti avec succès en {db_name}")
return db_name
def preparer_bdd(input_path):
"""Détecte le type du fichier et le convertit en SQLite si nécessaire."""
format_detecte = detecter_format(input_path)
print(f"📂 Format détecté : {format_detecte.upper()}")
if format_detecte == "sqlite":
print("➡️ Aucun traitement requis, base SQLite déjà prête.")
return input_path
elif format_detecte == "json":
return json_to_sqlite(input_path)
elif format_detecte == "csv":
return csv_to_sqlite(input_path)
elif format_detecte == "excel":
return excel_to_sqlite(input_path)
else:
print("❌ Format non supporté. Utilise un fichier .db, .json, .csv, .xls ou .xlsx")
return None
def extraire_bdd(db_path):
"""Récupère la structure et les données d'une base SQLite."""
try:
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
# Récupérer les tables
tables = [table[0] for table in cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%';"
).fetchall()]
if not tables:
print("⚠️ Aucune table trouvée.")
return ""
output_bdd = ""
for table in tables:
output_bdd += f"\n📌 Table `{table}` :\n"
# Récupérer les colonnes
cursor.execute(f"PRAGMA table_info({table});")
columns = cursor.fetchall()
col_names = [col[1] for col in columns]
col_types = [col[2] for col in columns]
output_bdd += f"📝 Colonnes: {', '.join(f'{name} ({ctype})' for name, ctype in zip(col_names, col_types))}\n"
# Récupérer les données de la table
cursor.execute(f"SELECT * FROM {table}")
rows = cursor.fetchall()
if rows:
output_bdd += "\n".join(f" -> {row}" for row in rows) + "\n"
else:
output_bdd += "⚠️ Aucune donnée trouvée.\n"
return output_bdd
except sqlite3.Error as e:
print(f"❌ Erreur SQLite : {e}")
return ""
# Création de l'agent Groq pour la vérification de la normalisation
normalization_checker = Agent(
model=Groq(id="deepseek-r1-distill-llama-70b", temperature=0.0, top_p=0),
description="Vérifie si une base de données est normalisée.",
instructions=[
"Analyse la structure de la base de données fournie en entrée.",
"Détermine si elle respecte les formes normales (1NF, 2NF, 3NF, BCNF).",
"Migre les données de l'ancienne base de données vers la nouvelle base de données normalisée sans générer de code SQL.",
"Assurez-vous que toutes les données sont correctement transférées et que les relations sont maintenues.",
"Si la base n'est pas normalisée, propose une version améliorée sans générer de schéma SQL."
],
markdown=True
)
def analyser_bdd(output_bdd: str):
"""Utilise le premier agent pour analyser et normaliser la base de données."""
prompt = f"""
Voici la structure et les données de la base SQLite :
{output_bdd}
Tu es un expert en base de données exécute l'algorithme suivant pour normaliser et migrer la base de données. Affiche **uniquement** le résultat final sans explication détaillée et Ne pas ajouter de nouvelles colonnes non mentionnées dans la base de données d'origine .
---
### **Algorithme de normalisation et migration**
Début
Initialiser une variable `resultat_final` pour accumuler les résultats.
# Analyser la structure actuelle de la base de données
Pour chaque table `T` dans la base de données faire
Détecter la clé primaire existante ou en attribuer une si nécessaire.
# Appliquer la 1NF : Atomisation des attributs
Si `T` contient des attributs non atomiques alors
Transformer les attributs en valeurs atomiques.
Assigner les clés primaires et ajouter les clés étrangères si nécessaire.
Fin Si
# Appliquer la 2NF : Éliminer les dépendances fonctionnelles partielles
Si `T` contient une clé primaire composite et des attributs qui dépendent d’une partie seulement de la clé alors
Décomposer `T` en nouvelles tables en respectant la 2NF.
Assigner les clés primaires et ajouter les clés étrangères si nécessaire.
Fin Si
# Appliquer la 3NF : Éliminer les dépendances transitives
Si `T` contient des dépendances transitives (un attribut dépend d’un autre attribut non clé) alors
Décomposer `T` en nouvelles tables pour isoler ces dépendances.
Assigner les clés primaires et ajouter les clés étrangères si nécessaire.
Fin Si
# Appliquer la BCNF : Suppression des dépendances anormales
Si `T` contient une dépendance fonctionnelle où un attribut non clé détermine une clé candidate alors
Décomposer `T` en nouvelles relations respectant la BCNF.
Assigner les clés primaires et ajouter les clés étrangères si nécessaire.
Fin Si
# Migrer les données
Pour chaque nouvelle table normalisée faire
Identifier les données à migrer depuis l'ancienne base de données.
Transformer ou réorganiser les données selon le schéma normalisé proposé.
Insérer les données transformées dans la nouvelle table normalisée.
Fin Pour
# Ajouter les nouvelles tables normalisées au résultat final
Ajouter "📝 Nouvelles tables proposées :" à `resultat_final`.
Pour chaque nouvelle table `N` créée faire
Ajouter "📌 Table `nom_nouvelle_table` :" à `resultat_final`.
Ajouter "📝 Colonnes: colonne1 (type1), colonne2 (type2), ..." à `resultat_final`.
Ajouter "🔑 Clé primaire: `colonne_PK`" si définie.
Ajouter "🔗 Clé étrangère: `colonne_FK` → `table_referencée` (`colonne_referencée`)" si applicable.
Ajouter "📋 Données :" à `resultat_final`.
Pour chaque enregistrement migré dans la table faire
Ajouter " - `valeur1`, `valeur2`, ..." à `resultat_final`.
Fin Pour
Fin Pour
# Vérification finale avant affichage
Afficher `resultat_final`
Afficher "✅ Normalisation complète et migration réussie."
Fin
---
"""
response = normalization_checker.run(prompt)
resultat = response.content.strip()
# Supprimer le texte entre <think>...</think>
resultat_sans_think = re.sub(r"<think>.*?</think>", "", resultat, flags=re.DOTALL).strip()
return resultat_sans_think
# Création de l'agent 2 (vérifie la proposition de normalisation)
normalization_validator = Agent(
model=Groq(id="qwen-2.5-32b", temperature=0.0, top_p=0),
description="Vérifie si la base de données normalisée proposée est correcte.",
instructions=[
"Analyse la normalisation proposée et vérifie si elle respecte les formes normales.",
"Compare la proposition avec la base de données générée pour s'assurer de leur correspondance.",
"Donne une proposition améliorée si nécessaire."
],
markdown=True
)
def verifier_normalisation(proposition_normalisee: str, output_bdd: str):
"""Utilise le deuxième agent pour valider la normalisation proposée et sa correspondance avec la base de données générée."""
prompt = f"""
Voici la base de données générée après application de la normalisation :
Vérifie si la normalisation proposée correspond bien à {output_bdd}.
Voici une proposition de base de données normalisée :
{proposition_normalisee}
- Tu es un expert en base de données exécute l'algorithme suivant pour vérifier si la base obtenue correspond bien à la normalisation attendue. Affiche **uniquement** le résultat final sans explication détaillée.
---
### **Algorithme de vérification et correction des formes normales**
Début
Initialiser une variable `corrections_appliquees` = False
Pour chaque table dans la base de données faire
Si tous les attributs sont atomiques alors
printf("📌 Vérification de la table `nom_table`")
printf("✅ La table `nom_table` est en 1NF")
Si la table ne contient pas de dépendances fonctionnelles partielles alors
printf("✅ La table `nom_table` est en 2NF")
Si la table ne contient pas de dépendances transitives alors
printf("✅ La table `nom_table` est en 3NF")
Si chaque dépendance fonctionnelle est basée sur une clé candidate alors
printf("✅ La table `nom_table` est en BCNF")
Sinon
printf("❌ La table `nom_table` ne respecte pas la BCNF → Correction appliquée.")
Appliquer la correction en décomposant la table en relations BCNF.
corrections_appliquees = True
Fin Si
Sinon
printf("❌ La table `nom_table` ne respecte pas la 3NF → Correction appliquée.")
Appliquer la correction en éliminant les dépendances transitives.
corrections_appliquees = True
Fin Si
Sinon
printf("❌ La table `nom_table` ne respecte pas la 2NF → Correction appliquée.")
Appliquer la correction en éliminant les dépendances partielles.
corrections_appliquees = True
Fin Si
Sinon
printf("❌ La table `nom_table` ne respecte pas la 1NF → Correction appliquée.")
Appliquer la correction en atomisant les attributs.
corrections_appliquees = True
Fin Si
Fin Pour
Si corrections_appliquees == True alors
printf("⚠️ Des corrections ont été appliquées durant la vérification.")
Fin Si
printf("🔍 Normalisation terminée.")
Fin
---
"""
response = normalization_validator.run(prompt)
return response.content.strip()
def generate_sql_from_normalized_schema(normalized_schema, output_file):
"""Génère les requêtes SQL pour créer et insérer des données dans une base de données normalisée."""
create_table_statements = []
insert_data_statements = []
# Analyser le schéma normalisé
tables = normalized_schema.split('📌 Table')
for table in tables:
if not table.strip():
continue
# Extraire le nom de la table
table_name = ""
table_parts = re.split(r'`| :', table, maxsplit=2)
if len(table_parts) >= 2:
table_name = table_parts[1].strip()
# Extraire les colonnes et leurs types
columns = []
col_types = {}
if '📝 Colonnes: ' in table:
columns_line = table.split('📝 Colonnes: ')[1].split('\n')[0]
columns = [col.strip().split(' (')[0] for col in columns_line.split(', ')]
col_types = {col.split(' (')[0]: col.split(' (')[1].rstrip(')') for col in columns_line.split(', ')}
# Extraire la clé primaire (peut être composée)
primary_keys = []
pk_match = re.search(r'🔑 Clé primaire: ([\w, ]+)', table)
if pk_match:
primary_keys = [key.strip() for key in pk_match.group(1).split(',')]
# Extraire les clés étrangères
foreign_keys = []
fk_matches = re.findall(r'🔗 Clé étrangère: (\w+) → (\w+) \((\w+)\)', table)
for fk_col, ref_table, ref_col in fk_matches:
foreign_keys.append((fk_col, ref_table, ref_col))
# Générer CREATE TABLE
if table_name and columns:
create_sql = f"CREATE TABLE {table_name} (\n"
# Ajouter les colonnes avec leurs types
for col in columns:
col_type = col_types.get(col, "TEXT").upper()
create_sql += f" {col} {col_type},\n"
# Ajouter la clé primaire (peut être composée)
if primary_keys:
create_sql += f" PRIMARY KEY ({', '.join(primary_keys)}),\n"
# Ajouter les clés étrangères
for fk in foreign_keys:
create_sql += f" FOREIGN KEY ({fk[0]}) REFERENCES {fk[1]}({fk[2]}),\n"
# Nettoyer les virgules finales
create_sql = re.sub(r',\n$', '\n', create_sql) + ");"
create_table_statements.append(create_sql)
# Extraire les données et les insérer en une seule requête
if '📋 Données :' in table:
data_section = table.split('📋 Données :')[1].split('\n\n')[0]
data_rows = re.findall(r' - (.*?)\n', data_section)
if data_rows:
insert_sql = f"INSERT INTO {table_name} ("
insert_sql += ', '.join(columns) + ") VALUES\n"
formatted_values_list = []
for row in data_rows:
# Nettoyer les guillemets simples et backticks
values = [v.strip().strip("'`") for v in row.split(', ')]
formatted_values = []
for i, value in enumerate(values):
col_type = col_types.get(columns[i], "TEXT").upper()
if col_type in ['TEXT', 'DATE']:
formatted_values.append(f"'{value}'")
else:
formatted_values.append(f"{value}")
formatted_values_list.append(f"({', '.join(formatted_values)})")
insert_sql += ",\n".join(formatted_values_list) + ";"
insert_data_statements.append(insert_sql)
# Ajouter une ligne vide après les insertions pour cette table
insert_data_statements.append("")
# Écrire dans le fichier
with open(output_file, 'w', encoding='utf-8') as f:
f.write("-- Structure de la base de données\n")
f.write("\n".join(create_table_statements))
f.write("\n\n-- Migrer les données de l'ancienne bdd\n")
f.write("\n".join(insert_data_statements))
# --- Exécuter le script ---
def main():
parser = argparse.ArgumentParser(description="Outil de transformation de fichier vers base SQLite et normalisation de BDD")
parser.add_argument('--input', type=str, required=True, help="Chemin vers le fichier d'entrée (peut être .db, .json, .csv, .xls ou .xlsx)")
parser.add_argument('output_file', type=str, help='Chemin vers le fichier SQL de sortie')
args = parser.parse_args()
# Préparer la base de données selon le format
prepared_db_path = preparer_bdd(args.input)
if not prepared_db_path:
sys.exit("❌ Erreur lors de la préparation de la base de données.")
print(f"✅ Base de données prête : {prepared_db_path}")
# Nettoyer la base de données
clean_database(prepared_db_path)
print(f"✅ Base de données nettoyée : {prepared_db_path}")
# Extraire la structure et les données de la base SQLite préparée
output_bdd = extraire_bdd(prepared_db_path)
if not output_bdd:
sys.exit("❌ Erreur dans l'extraction de la base de données.")
# Mise en place de l'auto-correction et auto-vérification (self-consistency)
proposition_normalisee = output_bdd
while True:
resultat = analyser_bdd(proposition_normalisee)
verification = verifier_normalisation(resultat, output_bdd)
# Si des corrections sont détectées, mettre à jour la proposition sans afficher le résultat
if "⚠️ Des corrections ont été appliquées durant la vérification." in verification:
proposition_normalisee = verification
else:
# Afficher uniquement le résultat final correct et sortir de la boucle
print("\n🔍 Résultat final de l'analyse :\n", resultat)
print("\n✅ Vérification de la normalisation :\n", verification)
break
# Générer le fichier SQL final
generate_sql_from_normalized_schema(resultat, args.output_file)
print(f"✅ Fichier SQL généré: {args.output_file}")
if __name__ == "__main__":
main()