DB_normlizer / prompt_to.py
Cherryblade29's picture
Upload prompt_to.py
1e187f1 verified
import os
import sqlite3
import json
import pandas as pd
import re
from agno.agent import Agent
from agno.models.groq import Groq
import streamlit as st
from dotenv import load_dotenv
import graphviz
from clean_database import clean_database
# Charger les variables d'environnement depuis le fichier .env
load_dotenv()
# Fonctions de normalisation de la base de données
def detecter_format(fichier):
extension = os.path.splitext(fichier)[1].lower()
if extension in [".db", ".sqlite"]:
return "sqlite"
elif extension == ".json":
return "json"
elif extension == ".csv":
return "csv"
elif extension in [".xls", ".xlsx"]:
return "excel"
else:
return "inconnu"
def json_to_sqlite(fichier_json):
with open(fichier_json, 'r', encoding='utf-8') as file:
data = json.load(file)
db_name = 'temp_json.db'
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
table_name = "data"
columns = ', '.join(data[0].keys())
placeholders = ', '.join('?' * len(data[0]))
cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} ({columns})")
for row in data:
cursor.execute(f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})", tuple(row.values()))
conn.commit()
conn.close()
return db_name
def csv_to_sqlite(fichier_csv):
df = pd.read_csv(fichier_csv, encoding='utf-8')
db_name = 'temp_csv.db'
conn = sqlite3.connect(db_name)
df.to_sql('data', conn, if_exists='replace', index=False)
conn.close()
return db_name
def excel_to_sqlite(fichier_excel):
df = pd.read_excel(fichier_excel, encoding='utf-8')
db_name = 'temp_excel.db'
conn = sqlite3.connect(db_name)
df.to_sql('data', conn, if_exists='replace', index=False)
conn.close()
return db_name
def preparer_bdd(input_path):
format_detecte = detecter_format(input_path)
if format_detecte == "sqlite":
return input_path
elif format_detecte == "json":
return json_to_sqlite(input_path)
elif format_detecte == "csv":
return csv_to_sqlite(input_path)
elif format_detecte == "excel":
return excel_to_sqlite(input_path)
else:
st.error("Format non supporté. Utilisez un fichier .db, .json, .csv, .xls ou .xlsx")
return None
def extraire_bdd(db_path):
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
tables = [table[0] for table in cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%';"
).fetchall()]
if not tables:
st.warning("Aucune table trouvée.")
return ""
output_bdd = ""
for table in tables:
output_bdd += f"\n📌 Table `{table}` :\n"
cursor.execute(f"PRAGMA table_info({table});")
columns = cursor.fetchall()
col_names = [col[1] for col in columns]
col_types = [col[2] for col in columns]
output_bdd += f"📝 Colonnes: {', '.join(f'{name} ({ctype})' for name, ctype in zip(col_names, col_types))}\n"
cursor.execute(f"SELECT * FROM {table}")
rows = cursor.fetchall()
if rows:
output_bdd += "\n".join(f" -> {row}" for row in rows) + "\n"
else:
output_bdd += "⚠️ Aucune donnée trouvée.\n"
return output_bdd
# Création de l'agent Groq pour la vérification de la normalisation
normalization_checker = Agent(
model=Groq(id="deepseek-r1-distill-llama-70b", temperature=0.0, top_p=0),
description="Vérifie si une base de données est normalisée.",
instructions=[
"Analyse la structure de la base de données fournie en entrée.",
"Détermine si elle respecte les formes normales (1NF, 2NF, 3NF, BCNF).",
"Migre les données de l'ancienne base de données vers la nouvelle base de données normalisée sans générer de code SQL.",
"Assurez-vous que toutes les données sont correctement transférées et que les relations sont maintenues.",
"Si la base n'est pas normalisée, propose une version améliorée sans générer de schéma SQL."
],
markdown=True
)
def analyser_bdd(output_bdd: str):
prompt = f"""
Voici la structure et les données de la base SQLite :
{output_bdd}
Tu es un expert en base de données exécute l'algorithme suivant pour normaliser et migrer la base de données. Affiche **uniquement** le résultat final sans explication détaillée et Ne pas ajouter de nouvelles colonnes non mentionnées dans la base de données d'origine.
---
### **Algorithme de normalisation et migration**
Début
Initialiser une variable `resultat_final` pour accumuler les résultats.
# Analyser la structure actuelle de la base de données
Pour chaque table `T` dans la base de données faire
Détecter la clé primaire existante ou en attribuer une si nécessaire.
# Appliquer la 1NF : Atomisation des attributs
Si `T` contient des attributs non atomiques alors
Transformer les attributs en valeurs atomiques.
Assigner les clés primaires et ajouter les clés étrangères si nécessaire.
Fin Si
# Appliquer la 2NF : Éliminer les dépendances fonctionnelles partielles
Si `T` contient une clé primaire composite et des attributs qui dépendent d’une partie seulement de la clé alors
Décomposer `T` en nouvelles tables en respectant la 2NF.
Assigner les clés primaires et ajouter les clés étrangères si nécessaire.
Fin Si
# Appliquer la 3NF : Éliminer les dépendances transitives
Si `T` contient des dépendances transitives (un attribut dépend d’un autre attribut non clé) alors
Décomposer `T` en nouvelles tables pour isoler ces dépendances.
Assigner les clés primaires et ajouter les clés étrangères si nécessaire.
Fin Si
# Appliquer la BCNF : Suppression des dépendances anormales
Si `T` contient une dépendance fonctionnelle où un attribut non clé détermine une clé candidate alors
Décomposer `T` en nouvelles relations respectant la BCNF.
Assigner les clés primaires et ajouter les clés étrangères si nécessaire.
Fin Si
# Migrer les données
Pour chaque nouvelle table normalisée faire
Identifier les données à migrer depuis l'ancienne base de données.
Transformer ou réorganiser les données selon le schéma normalisé proposé.
Insérer les données transformées dans la nouvelle table normalisée.
Fin Pour
# Le résultat doit absolument respecter cette structure.
Ajouter "📝 Nouvelles tables proposées :" à `resultat_final`.
Pour chaque nouvelle table `N` créée faire
Ajouter "📌 Table `nom_nouvelle_table` :" à `resultat_final`.
Ajouter "📝 Colonnes: colonne1 (type1), colonne2 (type2), ..." à `resultat_final`.
Ajouter "🔑 Clé primaire: `colonne_PK`" si définie.
Ajouter "🔗 Clé étrangère: `colonne_FK` → `table_referencée` (`colonne_referencée`)" si applicable.
Ajouter "📋 Données :" à `resultat_final`.
Pour chaque enregistrement migré dans la table faire
Ajouter " - `valeur1`, `valeur2`, ..." à `resultat_final`.
Fin Pour
Fin Pour
# Vérification finale avant affichage
Afficher `resultat_final`
Afficher "✅ Normalisation complète et migration réussie."
Fin
---
"""
response = normalization_checker.run(prompt)
resultat = response.content.strip()
resultat_sans_think = re.sub(r"<think>.*?</think>", "", resultat, flags=re.DOTALL).strip()
return resultat_sans_think
# Création de l'agent 2 (vérifie la proposition de normalisation)
normalization_validator = Agent(
model=Groq(id="qwen-qwq-32b", temperature=0.0, top_p=0),
description="Vérifie si la base de données normalisée proposée est correcte.",
instructions=[
"Analyse la normalisation proposée et vérifie si elle respecte les formes normales.",
"Compare la proposition avec la base de données générée pour s'assurer de leur correspondance.",
"Donne une proposition améliorée si nécessaire."
],
markdown=True
)
def verifier_normalisation(proposition_normalisee: str, output_bdd: str):
prompt = f"""
Voici la base de données générée après application de la normalisation :
Vérifie si la normalisation proposée correspond bien à {output_bdd}.
Voici une proposition de base de données normalisée :
{proposition_normalisee}
- Tu es un expert en base de données exécute l'algorithme suivant pour vérifier si la base obtenue correspond bien à la normalisation attendue. Affiche **uniquement** le résultat final sans explication détaillée.
---
### **Algorithme de vérification et correction des formes normales**
Début
Initialiser une variable `corrections_appliquees` = False
Pour chaque table dans la base de données faire
Si tous les attributs sont atomiques alors
printf("📌 Vérification de la table `nom_table`")
printf("✅ La table `nom_table` est en 1NF")
Si la table ne contient pas de dépendances fonctionnelles partielles alors
printf("✅ La table `nom_table` est en 2NF")
Si la table ne contient pas de dépendances transitives alors
printf("✅ La table `nom_table` est en 3NF")
Si chaque dépendance fonctionnelle est basée sur une clé candidate alors
printf("✅ La table `nom_table` est en BCNF")
Sinon
printf("❌ La table `nom_table` ne respecte pas la BCNF → Correction appliquée.")
Appliquer la correction en décomposant la table en relations BCNF.
corrections_appliquees = True
Fin Si
Sinon
printf("❌ La table `nom_table` ne respecte pas la 3NF → Correction appliquée.")
Appliquer la correction en éliminant les dépendances transitives.
corrections_appliquees = True
Fin Si
Sinon
printf("❌ La table `nom_table` ne respecte pas la 2NF → Correction appliquée.")
Appliquer la correction en éliminant les dépendances partielles.
corrections_appliquees = True
Fin Si
Sinon
printf("❌ La table `nom_table` ne respecte pas la 1NF → Correction appliquée.")
Appliquer la correction en atomisant les attributs.
corrections_appliquees = True
Fin Si
Fin Pour
Si corrections_appliquees == True alors
printf("⚠️ Des corrections ont été appliquées durant la vérification.")
Fin Si
printf("🔍 Normalisation terminée.")
Fin
---
"""
response = normalization_validator.run(prompt)
resultat = response.content.strip()
resultat_sans_think = re.sub(r"<think>.*?</think>", "", resultat, flags=re.DOTALL).strip()
return resultat_sans_think
# Création de l'agent 3 (crée la nouvelle bdd normalisée)
sql_generator = Agent(
model=Groq(id="whisper-large-v3", temperature=0.0, top_p=0),
description="Crée la nouvelle bdd normalisée.",
instructions=[
"Analyse le résultat de normalisation et génère les requêtes SQL pour créer la nouvelle bdd."
],
markdown=True
)
def génération_des_requêtes(proposition_normalisee: str):
prompt = f"""
Voici une proposition de base de données normalisée :
{proposition_normalisee}
Génère uniquement les requêtes SQL pour créer et insérer des données dans la nouvelle base de données normalisée (crée les nouvelle tables et insérée tous les donnez et n'ajoutes rien).
"""
response = sql_generator.run(prompt)
resultat = response.content.strip()
resultat_sans_think = re.sub(r"<think>.*?</think>", "", resultat, flags=re.DOTALL).strip()
return resultat_sans_think
def nettoyer_requetes_sql(texte):
# Expression régulière pour capturer les requêtes CREATE TABLE et INSERT INTO
pattern = r"(CREATE TABLE.*?;|INSERT INTO.*?;)"
requetes_sql = re.findall(pattern, texte, re.DOTALL | re.IGNORECASE)
return "\n\n".join(req.strip() for req in requetes_sql)
# --- Fonctions utilitaires ---
def create_sqlite_db_from_file(sql_file, db_name):
if os.path.exists(db_name):
os.remove(db_name)
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
with open(sql_file, 'r') as file:
sql_script = file.read()
cursor.executescript(sql_script)
conn.commit()
conn.close()
def get_database_structure(db_name):
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
# Récupérer les tables
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
structure = ""
for table in tables:
table_name = table[0]
structure += f"Table: {table_name}\n"
# Récupérer les colonnes
cursor.execute(f"PRAGMA table_info({table_name});")
columns = cursor.fetchall()
for column in columns:
structure += f" Colonne: {column[1]} (Type: {column[2]})\n"
conn.close()
return structure
def get_sample_data(db_name, limit=3):
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
sample_text = ""
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
for table in tables:
table_name = table[0]
sample_text += f"\nExtrait de la table {table_name} :\n"
df = pd.read_sql_query(f"SELECT * FROM {table_name} LIMIT {limit}", conn)
sample_text += df.to_string(index=False)
sample_text += "\n"
conn.close()
return sample_text
def transform_prompt_to_sql(prompt, db_structure):
instructions = f"""
Voici la structure de la base de données :
{db_structure}
Voici la requête de l'utilisateur :
{prompt}
Transforme ce prompt en une requête SQL en utilisant la structure de la base de données fournie.
"""
response = sql_transformer.run(instructions)
return response.content.strip()
def clean_sql_query(response):
# Utiliser une expression régulière pour extraire uniquement la requête SQL
match = re.search(r'```sql\s*(.*?)\s*```', response, re.DOTALL)
if match:
return match.group(1).strip()
else:
raise ValueError("Impossible d'extraire la requête SQL de la réponse.")
def execute_sql_query(db_name, sql_query):
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
cursor.execute(sql_query)
results = cursor.fetchall()
conn.close()
return results
def visualize_database_schema_from_sql(sql_file):
with open(sql_file, 'r') as file:
sql_script = file.read()
# Extraction des tables et colonnes
tables = {}
pks = {}
fks = []
create_table_pattern = re.compile(r'CREATE TABLE\s+(\w+)\s*\((.*?)\);', re.DOTALL | re.IGNORECASE)
fk_pattern = re.compile(r'FOREIGN KEY\s*\((\w+)\)\s*REFERENCES\s+(\w+)\s*\((\w+)\)', re.IGNORECASE)
pk_inline_pattern = re.compile(r'^(\w+)\s+\w+.*PRIMARY KEY', re.IGNORECASE)
pk_constraint_pattern = re.compile(r'PRIMARY KEY\s*\((.*?)\)', re.IGNORECASE)
for match in create_table_pattern.finditer(sql_script):
table_name = match.group(1)
body = match.group(2)
lines = [line.strip() for line in body.split(',') if line.strip()]
columns = []
primary_keys = []
for line in lines:
if line.upper().startswith("FOREIGN KEY"):
fk_match = fk_pattern.search(line)
if fk_match:
source_col = fk_match.group(1)
ref_table = fk_match.group(2)
ref_col = fk_match.group(3)
fks.append((table_name, source_col, ref_table, ref_col))
elif line.upper().startswith("PRIMARY KEY"):
pk_match = pk_constraint_pattern.search(line)
if pk_match:
pk_cols = [pk.strip() for pk in pk_match.group(1).split(',')]
primary_keys.extend(pk_cols)
else:
parts = line.split()
if len(parts) == 0:
continue # ligne vide ou espace
col_name = parts[0].strip("()") # on enlève les parenthèses résiduelles
if col_name.isidentifier(): # vérifie que c’est un nom valide
if col_name not in columns: # Vérifie si la colonne n'est pas déjà présente
columns.append(col_name)
inline_pk = pk_inline_pattern.match(line)
if inline_pk:
if inline_pk.group(1) not in primary_keys: # Vérifie si la clé primaire n'est pas déjà présente
primary_keys.append(inline_pk.group(1))
tables[table_name] = columns
pks[table_name] = primary_keys
# Construction du graphe avec style MERISE
dot = graphviz.Digraph(format='png')
dot.attr('node', shape='plaintext')
for table, columns in tables.items():
rows = []
for col in columns:
if col in pks.get(table, []):
rows.append(f'<TR><TD ALIGN="LEFT"><U>{col}</U></TD></TR>')
else:
rows.append(f'<TR><TD ALIGN="LEFT">{col}</TD></TR>')
label = f"""<<TABLE BORDER="1" CELLBORDER="0" CELLSPACING="0">
<TR><TD BGCOLOR="lightgray"><B>{table}</B></TD></TR>
{''.join(rows)}
</TABLE>>"""
dot.node(table, label=label)
for src_table, src_col, tgt_table, tgt_col in fks:
dot.edge(src_table, tgt_table, label=f"{src_col}{tgt_col}")
st.graphviz_chart(dot)
# --- Config Agent ---
sql_transformer = Agent(
model=Groq(id="llama3-70b-8192", temperature=0.0, top_p=0),
description="Transforme une requête en langage naturel en requête SQL.",
instructions=[
"Analyse la requête fournie en entrée.",
"Transforme la requête en langage naturel en une requête SQL.",
],
markdown=True
)
# --- Interface Streamlit ---
st.set_page_config(page_title="Assistant SQL avec LLM", layout="wide")
st.title("🧠📊 Assistant SQL intelligent (avec LLM)")
db_name = "database.db"
sql_file = "output.sql"
# Champ de saisie pour le fichier d'entrée
input_file = st.file_uploader("Choisissez un fichier (.db, .json, .csv, .xls ou .xlsx)", type=["db", "sqlite", "json", "csv", "xls", "xlsx"])
if input_file:
# Enregistrer le fichier téléchargé temporairement
temp_file_path = f"temp_{input_file.name}"
with open(temp_file_path, "wb") as f:
f.write(input_file.getbuffer())
prepared_db_path = preparer_bdd(temp_file_path)
if prepared_db_path:
st.success(f"Base de données prête : {prepared_db_path}")
clean_database(prepared_db_path)
st.success("Base de données nettoyée.")
output_bdd = extraire_bdd(prepared_db_path)
if output_bdd:
proposition_normalisee = output_bdd
while True:
resultat = analyser_bdd(proposition_normalisee)
verification = verifier_normalisation(resultat, output_bdd)
if "⚠️ Des corrections ont été appliquées durant la vérification." in verification:
proposition_normalisee = verification
else:
st.text("\n🔍 Résultat final de l'analyse :\n" + resultat)
st.text("\n✅ Vérification de la normalisation :\n" + verification)
break
# Générer les requêtes SQL pour la nouvelle base de données normalisée
sql_content = génération_des_requêtes(resultat)
requetes_nettoyees = nettoyer_requetes_sql(sql_content)
# Écrire les requêtes SQL dans le fichier output.sql
with open(sql_file, "w", encoding='utf-8') as fichier_sql:
fichier_sql.write(requetes_nettoyees)
st.success(f"Fichier SQL généré: {sql_file}")
create_sqlite_db_from_file(sql_file, db_name)
# Visualiser le schéma de la base de données
st.subheader("Schéma des relations entre les tables")
visualize_database_schema_from_sql(sql_file)
# Champ de saisie pour les requêtes en langage naturel
prompt = st.text_input("💬 Pose ta question :", placeholder="Ex: Liste les patients atteints de diabète")
if prompt:
st.write("🛠️ Génération SQL en cours…")
structure = get_database_structure(db_name)
response = transform_prompt_to_sql(prompt, structure)
sql_query = clean_sql_query(response)
st.code(sql_query, language="sql")
try:
df = execute_sql_query(db_name, sql_query)
st.success("✅ Requête exécutée avec succès !")
st.dataframe(df)
except Exception as e:
st.error(f"❌ Erreur lors de l'exécution de la requête : {e}")