import os import numpy as np import pandas as pd import json from typing import Dict, List from openai import OpenAI from pathlib import Path from embedding_cache import EmbeddingCache class SDCClassifier: def __init__(self, openai_api_key: str = None, cache_path: str = "embeddings_cache.db"): """ Ініціалізація класифікатора SDC Args: openai_api_key: API ключ для OpenAI (опціонально, можна взяти з env) cache_path: шлях до файлу кешу ембедінгів """ self.client = OpenAI(api_key=openai_api_key or os.getenv("OPENAI_API_KEY")) self.classes_json = {} self.class_signatures = None self.df = None self.embeddings = None self.embeddings_mean = None self.embeddings_std = None # Створення директорії для кешу, якщо потрібно cache_dir = os.path.dirname(cache_path) if cache_dir and not os.path.exists(cache_dir): os.makedirs(cache_dir) # Ініціалізація кешу from embedding_cache import EmbeddingCache self.cache = EmbeddingCache(cache_path) # Базовий стан self.base_classes_json = {} self.base_signatures = None def load_initial_state(self, classes_file: str, signatures_file: str) -> str: """ Завантаження початкового стану при старті застосунку Args: classes_file: шлях до файлу з класами signatures_file: шлях до файлу з signatures Returns: str: повідомлення про результат завантаження """ try: self.base_classes_json = self.load_classes(classes_file) if os.path.exists(signatures_file): self.base_signatures = self.load_signatures(signatures_file) # Встановлюємо поточний стан як базовий self.classes_json = self.base_classes_json.copy() self.class_signatures = self.base_signatures.copy() if self.base_signatures else None return f"Завантажено {len(self.base_classes_json)} базових класів" except Exception as e: return f"Помилка при завантаженні базового стану: {str(e)}" def restore_base_state(self) -> None: """Відновлення базового стану""" self.classes_json = self.base_classes_json.copy() self.class_signatures = self.base_signatures.copy() if self.base_signatures else None def load_initial_state(self, classes_file: str, signatures_file: str) -> str: """ Завантаження початкового стану при старті застосунку Args: classes_file: шлях до файлу з класами signatures_file: шлях до файлу з signatures Returns: str: повідомлення про результат завантаження """ try: self.base_classes_json = self.load_classes(classes_file) if os.path.exists(signatures_file): self.base_signatures = self.load_signatures(signatures_file) # Встановлюємо поточний стан як базовий self.classes_json = self.base_classes_json.copy() self.class_signatures = self.base_signatures.copy() if self.base_signatures else None return f"Завантажено {len(self.base_classes_json)} базових класів" except Exception as e: return f"Помилка при завантаженні базового стану: {str(e)}" def restore_base_state(self) -> None: """Відновлення базового стану""" self.classes_json = self.base_classes_json.copy() self.class_signatures = self.base_signatures.copy() if self.base_signatures else None def load_initial_state(self, classes_file: str, signatures_file: str): """Завантаження початкового стану при старті застосунку""" self.base_classes_json = self.load_classes(classes_file) self.base_signatures = self.load_signatures(signatures_file) # Встановлюємо поточний стан як базовий self.classes_json = self.base_classes_json.copy() self.class_signatures = self.base_signatures.copy() if self.base_signatures else None def restore_base_state(self): """Відновлення базового стану""" self.classes_json = self.base_classes_json.copy() self.class_signatures = self.base_signatures.copy() if self.base_signatures else None def load_classes(self, json_path: str) -> dict: """Завантаження класів та їх хінтів з JSON файлу""" try: # Якщо передано вміст файлу замість шляху if isinstance(json_path, dict): self.classes_json = json_path else: with open(json_path, 'r', encoding='utf-8') as f: self.classes_json = json.load(f) # Валідація структури if not all(isinstance(hints, list) for hints in self.classes_json.values()): raise ValueError("Кожен клас повинен мати список хінтів") return self.classes_json except FileNotFoundError: print(f"Файл {json_path} не знайдено!") return {} except json.JSONDecodeError: print(f"Помилка читання JSON з файлу {json_path}!") return {} def save_signatures(self, filename: str = "signatures.npz") -> None: """Зберігає signatures у NPZ файл""" if self.class_signatures: np.savez(filename, **self.class_signatures) def load_signatures(self, filename: str = "signatures.npz") -> Dict[str, np.ndarray]: """Завантажує signatures з NPZ файлу""" try: with np.load(filename) as data: self.class_signatures = {key: data[key] for key in data.files} return self.class_signatures except (FileNotFoundError, IOError): return None def get_openai_embedding(self, text: str, model_name: str = "text-embedding-3-large") -> list: """ Отримання ембедінгу тексту через OpenAI API з використанням кешу Args: text: текст для ембедінгу model_name: назва моделі OpenAI Returns: list: ембедінг тексту """ # Спроба отримати з кешу cached_embedding = self.cache.get(text, model_name) if cached_embedding is not None: return cached_embedding.tolist() # Якщо нема в кеші - отримуємо через API response = self.client.embeddings.create( input=text, model=model_name ) embedding = response.data[0].embedding # Зберігаємо в кеш self.cache.put(text, model_name, embedding) return embedding def get_cache_stats(self) -> dict: """Отримання статистики кешування""" return self.cache.get_stats() def clear_old_cache(self, days: int = 30) -> int: """Очищення старих записів з кешу""" return self.cache.clear_old(days) def embed_hints(self, hint_list: List[str], model_name: str) -> np.ndarray: """Створення ембедінгів для списку хінтів""" emb_list = [] total_hints = len(hint_list) for idx, hint in enumerate(hint_list, 1): try: print(f" Отримання embedding {idx}/{total_hints}: '{hint}'") emb = self.get_openai_embedding(hint, model_name=model_name) emb_list.append(emb) except Exception as e: print(f" Помилка при отриманні embedding для '{hint}': {str(e)}") continue if not emb_list: raise ValueError("Не вдалося отримати жодного embedding") return np.array(emb_list, dtype=np.float32) def initialize_signatures(self, model_name: str = "text-embedding-3-large", signatures_file: str = "signatures.npz", force_rebuild: bool = False) -> str: """ Ініціалізує signatures: завантажує існуючі або створює нові Args: model_name: назва моделі для ембедінгів signatures_file: шлях до файлу для збереження (None - не зберігати) force_rebuild: примусово перебудувати signatures """ if not self.classes_json: return "Помилка: Не знайдено жодного класу в classes.json" print(f"Знайдено {len(self.classes_json)} класів") # Завантажуємо існуючі signatures, якщо є файл і не примусове оновлення if not force_rebuild and signatures_file and os.path.exists(signatures_file): try: loaded_signatures = self.load_signatures(signatures_file) if loaded_signatures and all(cls in loaded_signatures for cls in self.classes_json): self.class_signatures = loaded_signatures print("Успішно завантажено збережені signatures") return f"Завантажено існуючі signatures для {len(self.class_signatures)} класів" except Exception as e: print(f"Помилка при завантаженні signatures: {str(e)}") try: self.class_signatures = {} total_classes = len(self.classes_json) print(f"Починаємо створення нових signatures для {total_classes} класів...") for idx, (cls_name, hints) in enumerate(self.classes_json.items(), 1): if not hints: print(f"Пропускаємо клас {cls_name} - немає хінтів") continue print(f"Обробка класу {cls_name} ({idx}/{total_classes})...") try: arr = self.embed_hints(hints, model_name=model_name) self.class_signatures[cls_name] = arr.mean(axis=0) print(f"Успішно створено signature для {cls_name}") except Exception as e: print(f"Помилка при створенні signature для {cls_name}: {str(e)}") continue if not self.class_signatures: return "Помилка: Не вдалося створити жодного signature" # Зберігаємо signatures тільки якщо вказано шлях до файлу if signatures_file: try: self.save_signatures(signatures_file) print("Signatures збережено у файл") except Exception as e: print(f"Помилка при збереженні signatures: {str(e)}") return f"Створено нові signatures для {len(self.class_signatures)} класів" except Exception as e: return f"Помилка при створенні signatures: {str(e)}" def load_data(self, csv_path: str = "messages.csv", emb_path: str = "embeddings.npy"): """Завантаження даних з CSV та NPY файлів""" self.df = pd.read_csv(csv_path) emb_local = np.load(emb_path) assert len(self.df) == len(emb_local), "CSV і embeddings різної довжини!" self.df["Target"] = "Unlabeled" self.embeddings_mean = emb_local.mean(axis=0) self.embeddings_std = emb_local.std(axis=0) self.embeddings = (emb_local - self.embeddings_mean) / self.embeddings_std return f"Завантажено {len(self.df)} рядків" def predict_classes(self, text_embedding: np.ndarray, threshold: float = 0.0) -> Dict[str, float]: """Передбачення класів для одного тексту""" results = {} for cls, sign in self.class_signatures.items(): score = float(np.dot(text_embedding, sign)) if score > threshold: results[cls] = score return dict(sorted(results.items(), key=lambda x: x[1], reverse=True)) def process_single_text(self, text: str, threshold: float = 0.3) -> dict: """Обробка одного тексту""" if self.class_signatures is None: return {"error": "Спочатку збудуйте signatures!"} emb = self.get_openai_embedding(text) if self.embeddings_mean is not None and self.embeddings_std is not None: emb = (emb - self.embeddings_mean) / self.embeddings_std predictions = self.predict_classes(emb, threshold) if not predictions: return {"message": text, "result": "Жодного класу не знайдено"} formatted_results = [] for cls, score in predictions.items(): formatted_results.append(f"{cls}: {score:.2%}") return { "message": text, "result": "\n".join(formatted_results) } def classify_rows(self, filter_substring: str = "", threshold: float = 0.3): """Класифікація всіх або відфільтрованих рядків""" if self.class_signatures is None: return "Спочатку збудуйте signatures!" if self.df is None or self.embeddings is None: return "Дані не завантажені! Спочатку викличте load_data." if filter_substring: filtered_idx = self.df[self.df["Message"].str.contains(filter_substring, case=False, na=False)].index else: filtered_idx = self.df.index for cls in self.class_signatures.keys(): self.df[f"Score_{cls}"] = 0.0 for i in filtered_idx: emb_vec = self.embeddings[i] predictions = self.predict_classes(emb_vec, threshold=threshold) for cls, score in predictions.items(): self.df.at[i, f"Score_{cls}"] = score main_classes = [cls for cls, score in predictions.items() if score > threshold] self.df.at[i, "Target"] = "|".join(main_classes) if main_classes else "None" result_columns = ["Message", "Target"] + [f"Score_{cls}" for cls in self.class_signatures.keys()] result_df = self.df.loc[filtered_idx, result_columns].copy() return result_df.reset_index(drop=True) def save_results(self, output_path: str = "messages_with_labels.csv") -> str: """Зберігання результатів класифікації""" if self.df is None: return "Дані відсутні!" self.df.to_csv(output_path, index=False) return f"Дані збережено у файл {output_path}"