File size: 19,528 Bytes
8e0449f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
import os
import sqlite3
import json
import pandas as pd
import argparse
import re
from agno.agent import Agent
from agno.models.groq import Groq
from dotenv import load_dotenv
import sys
from clean_database import clean_database





sys.stdout.reconfigure(encoding='utf-8')

load_dotenv()

def detecter_format(fichier):
    """Retourne l'extension du fichier sous forme explicite."""
    extension = os.path.splitext(fichier)[1].lower()

    if extension in [".db", ".sqlite"]:
        return "sqlite"
    elif extension == ".json":
        return "json"
    elif extension == ".csv":
        return "csv"
    elif extension in [".xls", ".xlsx"]:
        return "excel"
    else:
        return "inconnu"

def json_to_sqlite(fichier_json):
    """Convertit un fichier JSON en base SQLite."""
    try:
        with open(fichier_json, 'r') as file:
            data = json.load(file)
    except json.JSONDecodeError as e:
        print(f"Erreur lors de la lecture du fichier JSON : {e}")
        return None

    db_name = 'temp_json.db'
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()

    # Supposons que les données JSON sont une liste de dictionnaires (une table)
    table_name = "data"
    columns = ', '.join(data[0].keys())
    placeholders = ', '.join('?' * len(data[0]))

    cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} ({columns})")

    for row in data:
        cursor.execute(f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})", tuple(row.values()))

    conn.commit()
    conn.close()

    print(f"✅ Fichier JSON converti avec succès en {db_name}")
    return db_name

def csv_to_sqlite(fichier_csv):
    """Convertit un fichier CSV en base SQLite."""
    try:
        df = pd.read_csv(fichier_csv)
    except pd.errors.ParserError as e:
        print(f"Erreur lors de la lecture du fichier CSV : {e}")
        return None

    db_name = 'temp_csv.db'
    conn = sqlite3.connect(db_name)
    df.to_sql('data', conn, if_exists='replace', index=False)
    conn.close()

    print(f"✅ Fichier CSV converti avec succès en {db_name}")
    return db_name

def excel_to_sqlite(fichier_excel):
    """Convertit un fichier Excel en base SQLite."""
    try:
        df = pd.read_excel(fichier_excel)
    except Exception as e:
        print(f"Erreur lors de la lecture du fichier Excel : {e}")
        return None

    db_name = 'temp_excel.db'
    conn = sqlite3.connect(db_name)
    df.to_sql('data', conn, if_exists='replace', index=False)
    conn.close()

    print(f"✅ Fichier Excel converti avec succès en {db_name}")
    return db_name

def preparer_bdd(input_path):
    """Détecte le type du fichier et le convertit en SQLite si nécessaire."""
    format_detecte = detecter_format(input_path)
    print(f"📂 Format détecté : {format_detecte.upper()}")

    if format_detecte == "sqlite":
        print("➡️  Aucun traitement requis, base SQLite déjà prête.")
        return input_path
    elif format_detecte == "json":
        return json_to_sqlite(input_path)
    elif format_detecte == "csv":
        return csv_to_sqlite(input_path)
    elif format_detecte == "excel":
        return excel_to_sqlite(input_path)
    else:
        print("❌ Format non supporté. Utilise un fichier .db, .json, .csv, .xls ou .xlsx")
        return None
    
    

def extraire_bdd(db_path):
    """Récupère la structure et les données d'une base SQLite."""
    try:
        with sqlite3.connect(db_path) as conn:
            cursor = conn.cursor()

            # Récupérer les tables
            tables = [table[0] for table in cursor.execute(
                "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%';"
            ).fetchall()]

            if not tables:
                print("⚠️ Aucune table trouvée.")
                return ""

            output_bdd = ""

            for table in tables:
                output_bdd += f"\n📌 Table `{table}` :\n"

                # Récupérer les colonnes
                cursor.execute(f"PRAGMA table_info({table});")
                columns = cursor.fetchall()
                col_names = [col[1] for col in columns]
                col_types = [col[2] for col in columns]

                output_bdd += f"📝 Colonnes: {', '.join(f'{name} ({ctype})' for name, ctype in zip(col_names, col_types))}\n"

                # Récupérer les données de la table
                cursor.execute(f"SELECT * FROM {table}")
                rows = cursor.fetchall()
                if rows:
                    output_bdd += "\n".join(f" -> {row}" for row in rows) + "\n"
                else:
                    output_bdd += "⚠️ Aucune donnée trouvée.\n"

            return output_bdd

    except sqlite3.Error as e:
        print(f"❌ Erreur SQLite : {e}")
        return ""



# Création de l'agent Groq pour la vérification de la normalisation
normalization_checker = Agent(
    model=Groq(id="deepseek-r1-distill-llama-70b", temperature=0.0, top_p=0),
    description="Vérifie si une base de données est normalisée.",
    instructions=[
        "Analyse la structure de la base de données fournie en entrée.",
        "Détermine si elle respecte les formes normales (1NF, 2NF, 3NF, BCNF).",
        "Migre les données de l'ancienne base de données vers la nouvelle base de données normalisée sans générer de code SQL.",
        "Assurez-vous que toutes les données sont correctement transférées et que les relations sont maintenues.",
        "Si la base n'est pas normalisée, propose une version améliorée sans générer de schéma SQL."
    ],
    markdown=True
)

def analyser_bdd(output_bdd: str):
    """Utilise le premier agent pour analyser et normaliser la base de données."""
    prompt = f"""

    Voici la structure et les données de la base SQLite :



    {output_bdd}



    Tu es un expert en base de données exécute l'algorithme suivant pour normaliser et migrer la base de données. Affiche **uniquement** le résultat final sans explication détaillée et Ne pas ajouter de nouvelles colonnes non mentionnées dans la base de données d'origine .



---



### **Algorithme de normalisation et migration**

Début

   Initialiser une variable `resultat_final` pour accumuler les résultats.



   # Analyser la structure actuelle de la base de données

   Pour chaque table `T` dans la base de données faire

      Détecter la clé primaire existante ou en attribuer une si nécessaire.



      # Appliquer la 1NF : Atomisation des attributs

      Si `T` contient des attributs non atomiques alors

         Transformer les attributs en valeurs atomiques.

         Assigner les clés primaires et ajouter les clés étrangères si nécessaire.

      Fin Si



      # Appliquer la 2NF : Éliminer les dépendances fonctionnelles partielles

      Si `T` contient une clé primaire composite et des attributs qui dépendent d’une partie seulement de la clé alors

         Décomposer `T` en nouvelles tables en respectant la 2NF.

         Assigner les clés primaires et ajouter les clés étrangères si nécessaire.

      Fin Si



      # Appliquer la 3NF : Éliminer les dépendances transitives

      Si `T` contient des dépendances transitives (un attribut dépend d’un autre attribut non clé) alors

         Décomposer `T` en nouvelles tables pour isoler ces dépendances.

         Assigner les clés primaires et ajouter les clés étrangères si nécessaire.

      Fin Si



      # Appliquer la BCNF : Suppression des dépendances anormales

      Si `T` contient une dépendance fonctionnelle où un attribut non clé détermine une clé candidate alors

         Décomposer `T` en nouvelles relations respectant la BCNF.

         Assigner les clés primaires et ajouter les clés étrangères si nécessaire.

      Fin Si



      # Migrer les données

      Pour chaque nouvelle table normalisée faire

         Identifier les données à migrer depuis l'ancienne base de données.

         Transformer ou réorganiser les données selon le schéma normalisé proposé.

         Insérer les données transformées dans la nouvelle table normalisée.



      Fin Pour



      # Ajouter les nouvelles tables normalisées au résultat final

      Ajouter "📝 Nouvelles tables proposées :" à `resultat_final`.

      Pour chaque nouvelle table `N` créée faire

         Ajouter "📌 Table `nom_nouvelle_table` :" à `resultat_final`.

         Ajouter "📝 Colonnes: colonne1 (type1), colonne2 (type2), ..." à `resultat_final`.

         Ajouter "🔑 Clé primaire: `colonne_PK`" si définie.

         Ajouter "🔗 Clé étrangère: `colonne_FK` → `table_referencée` (`colonne_referencée`)" si applicable.

         Ajouter "📋 Données :" à `resultat_final`.

         Pour chaque enregistrement migré dans la table faire

            Ajouter " - `valeur1`, `valeur2`, ..." à `resultat_final`.

         Fin Pour



      Fin Pour



   # Vérification finale avant affichage



      Afficher `resultat_final`

      Afficher "✅ Normalisation complète et migration réussie."



Fin





---

"""

    response = normalization_checker.run(prompt)
    resultat = response.content.strip()

    # Supprimer le texte entre <think>...</think>
    resultat_sans_think = re.sub(r"<think>.*?</think>", "", resultat, flags=re.DOTALL).strip()
    return resultat_sans_think



# Création de l'agent 2 (vérifie la proposition de normalisation)
normalization_validator = Agent(
    model=Groq(id="qwen-2.5-32b", temperature=0.0, top_p=0),
    description="Vérifie si la base de données normalisée proposée est correcte.",
    instructions=[
        "Analyse la normalisation proposée et vérifie si elle respecte les formes normales.",
        "Compare la proposition avec la base de données générée pour s'assurer de leur correspondance.",
        "Donne une proposition améliorée si nécessaire."
    ],
    markdown=True
)

def verifier_normalisation(proposition_normalisee: str, output_bdd: str):
    """Utilise le deuxième agent pour valider la normalisation proposée et sa correspondance avec la base de données générée."""
    prompt = f"""

    Voici la base de données générée après application de la normalisation :

    Vérifie si la normalisation proposée correspond bien à {output_bdd}.



    Voici une proposition de base de données normalisée :



    {proposition_normalisee}



    - Tu es un expert en base de données exécute l'algorithme suivant pour vérifier si la base obtenue correspond bien à la normalisation attendue. Affiche **uniquement** le résultat final sans explication détaillée.

    ---



    ### **Algorithme de vérification et correction des formes normales**

    Début

       Initialiser une variable `corrections_appliquees` = False



       Pour chaque table dans la base de données faire

          Si tous les attributs sont atomiques alors

             printf("📌 Vérification de la table `nom_table`")

             printf("✅ La table `nom_table` est en 1NF")



             Si la table ne contient pas de dépendances fonctionnelles partielles alors

                printf("✅ La table `nom_table` est en 2NF")



                Si la table ne contient pas de dépendances transitives alors

                   printf("✅ La table `nom_table` est en 3NF")



                   Si chaque dépendance fonctionnelle est basée sur une clé candidate alors

                      printf("✅ La table `nom_table` est en BCNF")

                   Sinon

                      printf("❌ La table `nom_table` ne respecte pas la BCNF → Correction appliquée.")

                      Appliquer la correction en décomposant la table en relations BCNF.

                      corrections_appliquees = True

                   Fin Si

                Sinon

                   printf("❌ La table `nom_table` ne respecte pas la 3NF → Correction appliquée.")

                   Appliquer la correction en éliminant les dépendances transitives.

                   corrections_appliquees = True

                Fin Si

             Sinon

                printf("❌ La table `nom_table` ne respecte pas la 2NF → Correction appliquée.")

                Appliquer la correction en éliminant les dépendances partielles.

                corrections_appliquees = True

             Fin Si

          Sinon

             printf("❌ La table `nom_table` ne respecte pas la 1NF → Correction appliquée.")

             Appliquer la correction en atomisant les attributs.

             corrections_appliquees = True

          Fin Si

       Fin Pour



       Si corrections_appliquees == True alors

          printf("⚠️ Des corrections ont été appliquées durant la vérification.")

       Fin Si



       printf("🔍 Normalisation terminée.")

    Fin



   ---

   """

    response = normalization_validator.run(prompt)
    return response.content.strip()



def generate_sql_from_normalized_schema(normalized_schema, output_file):
    """Génère les requêtes SQL pour créer et insérer des données dans une base de données normalisée."""
    create_table_statements = []
    insert_data_statements = []

    # Analyser le schéma normalisé
    tables = normalized_schema.split('📌 Table')
    for table in tables:
        if not table.strip():
            continue

        # Extraire le nom de la table
        table_name = ""
        table_parts = re.split(r'`| :', table, maxsplit=2)
        if len(table_parts) >= 2:
            table_name = table_parts[1].strip()

        # Extraire les colonnes et leurs types
        columns = []
        col_types = {}
        if '📝 Colonnes: ' in table:
            columns_line = table.split('📝 Colonnes: ')[1].split('\n')[0]
            columns = [col.strip().split(' (')[0] for col in columns_line.split(', ')]
            col_types = {col.split(' (')[0]: col.split(' (')[1].rstrip(')') for col in columns_line.split(', ')}

        # Extraire la clé primaire (peut être composée)
        primary_keys = []
        pk_match = re.search(r'🔑 Clé primaire: ([\w, ]+)', table)
        if pk_match:
            primary_keys = [key.strip() for key in pk_match.group(1).split(',')]

        # Extraire les clés étrangères
        foreign_keys = []
        fk_matches = re.findall(r'🔗 Clé étrangère: (\w+) → (\w+) \((\w+)\)', table)
        for fk_col, ref_table, ref_col in fk_matches:
            foreign_keys.append((fk_col, ref_table, ref_col))

        # Générer CREATE TABLE
        if table_name and columns:
            create_sql = f"CREATE TABLE {table_name} (\n"

            # Ajouter les colonnes avec leurs types
            for col in columns:
                col_type = col_types.get(col, "TEXT").upper()
                create_sql += f"    {col} {col_type},\n"

            # Ajouter la clé primaire (peut être composée)
            if primary_keys:
                create_sql += f"    PRIMARY KEY ({', '.join(primary_keys)}),\n"

            # Ajouter les clés étrangères
            for fk in foreign_keys:
                create_sql += f"    FOREIGN KEY ({fk[0]}) REFERENCES {fk[1]}({fk[2]}),\n"

            # Nettoyer les virgules finales
            create_sql = re.sub(r',\n$', '\n', create_sql) + ");"
            create_table_statements.append(create_sql)

        # Extraire les données et les insérer en une seule requête
        if '📋 Données :' in table:
            data_section = table.split('📋 Données :')[1].split('\n\n')[0]
            data_rows = re.findall(r' - (.*?)\n', data_section)

            if data_rows:
                insert_sql = f"INSERT INTO {table_name} ("
                insert_sql += ', '.join(columns) + ") VALUES\n"

                formatted_values_list = []
                for row in data_rows:
                    # Nettoyer les guillemets simples et backticks
                    values = [v.strip().strip("'`") for v in row.split(', ')]

                    formatted_values = []
                    for i, value in enumerate(values):
                        col_type = col_types.get(columns[i], "TEXT").upper()
                        if col_type in ['TEXT', 'DATE']:
                            formatted_values.append(f"'{value}'")
                        else:
                            formatted_values.append(f"{value}")

                    formatted_values_list.append(f"({', '.join(formatted_values)})")

                insert_sql += ",\n".join(formatted_values_list) + ";"
                insert_data_statements.append(insert_sql)

            # Ajouter une ligne vide après les insertions pour cette table
            insert_data_statements.append("")

    # Écrire dans le fichier
    with open(output_file, 'w', encoding='utf-8') as f:
        f.write("-- Structure de la base de données\n")
        f.write("\n".join(create_table_statements))
        f.write("\n\n-- Migrer les données de l'ancienne bdd\n")
        f.write("\n".join(insert_data_statements))




# --- Exécuter le script ---
def main():
    parser = argparse.ArgumentParser(description="Outil de transformation de fichier vers base SQLite et normalisation de BDD")
    parser.add_argument('--input', type=str, required=True, help="Chemin vers le fichier d'entrée (peut être .db, .json, .csv, .xls ou .xlsx)")
    parser.add_argument('output_file', type=str, help='Chemin vers le fichier SQL de sortie')
    args = parser.parse_args()

    # Préparer la base de données selon le format
    prepared_db_path = preparer_bdd(args.input)
    if not prepared_db_path:
        sys.exit("❌ Erreur lors de la préparation de la base de données.")
    print(f"✅ Base de données prête : {prepared_db_path}")
    
    # Nettoyer la base de données
    clean_database(prepared_db_path)
    print(f"✅ Base de données nettoyée : {prepared_db_path}")

    # Extraire la structure et les données de la base SQLite préparée
    output_bdd = extraire_bdd(prepared_db_path)
    if not output_bdd:
        sys.exit("❌ Erreur dans l'extraction de la base de données.")

    # Mise en place de l'auto-correction et auto-vérification (self-consistency)
    proposition_normalisee = output_bdd
    while True:
        resultat = analyser_bdd(proposition_normalisee)
        verification = verifier_normalisation(resultat, output_bdd)
        # Si des corrections sont détectées, mettre à jour la proposition sans afficher le résultat
        if "⚠️ Des corrections ont été appliquées durant la vérification." in verification:
            proposition_normalisee = verification
        else:
            # Afficher uniquement le résultat final correct et sortir de la boucle
            print("\n🔍 Résultat final de l'analyse :\n", resultat)
            print("\n✅ Vérification de la normalisation :\n", verification)
            break

    # Générer le fichier SQL final
    generate_sql_from_normalized_schema(resultat, args.output_file)
    print(f"✅ Fichier SQL généré: {args.output_file}")

if __name__ == "__main__":
    main()