import os import numpy as np import pandas as pd import json from typing import Dict, List from openai import OpenAI from pathlib import Path class SDCClassifier: def __init__(self, openai_api_key: str = None): """ Ініціалізація класифікатора SDC Args: openai_api_key: API ключ для OpenAI (опціонально, можна взяти з env) """ self.client = OpenAI(api_key=openai_api_key or os.getenv("OPENAI_API_KEY")) self.classes_json = {} self.class_signatures = None self.df = None self.embeddings = None self.embeddings_mean = None self.embeddings_std = None def load_classes(self, json_path: str) -> dict: """Завантаження класів та їх хінтів з JSON файлу""" try: with open(json_path, 'r', encoding='utf-8') as f: self.classes_json = json.load(f) return self.classes_json except FileNotFoundError: print(f"Файл {json_path} не знайдено!") return {} except json.JSONDecodeError: print(f"Помилка читання JSON з файлу {json_path}!") return {} def save_signatures(self, filename: str = "signatures.npz") -> None: """Зберігає signatures у NPZ файл""" if self.class_signatures: np.savez(filename, **self.class_signatures) def load_signatures(self, filename: str = "signatures.npz") -> Dict[str, np.ndarray]: """Завантажує signatures з NPZ файлу""" try: with np.load(filename) as data: self.class_signatures = {key: data[key] for key in data.files} return self.class_signatures except (FileNotFoundError, IOError): return None def get_openai_embedding(self, text: str, model_name: str = "text-embedding-3-large") -> list: """Отримання ембедінгу тексту через OpenAI API""" response = self.client.embeddings.create( input=text, model=model_name ) return response.data[0].embedding def embed_hints(self, hint_list: List[str], model_name: str) -> np.ndarray: """Створення ембедінгів для списку хінтів""" emb_list = [] total_hints = len(hint_list) for idx, hint in enumerate(hint_list, 1): try: print(f" Отримання embedding {idx}/{total_hints}: '{hint}'") emb = self.get_openai_embedding(hint, model_name=model_name) emb_list.append(emb) except Exception as e: print(f" Помилка при отриманні embedding для '{hint}': {str(e)}") continue if not emb_list: raise ValueError("Не вдалося отримати жодного embedding") return np.array(emb_list, dtype=np.float32) def initialize_signatures(self, model_name: str = "text-embedding-3-large", signatures_file: str = "signatures.npz", force_rebuild: bool = False) -> str: """Ініціалізує signatures: завантажує існуючі або створює нові""" if not self.classes_json: return "Помилка: Не знайдено жодного класу в classes.json" print(f"Знайдено {len(self.classes_json)} класів") if not force_rebuild and os.path.exists(signatures_file): try: loaded_signatures = self.load_signatures(signatures_file) if loaded_signatures and all(cls in loaded_signatures for cls in self.classes_json): print("Успішно завантажено збережені signatures") return f"Завантажено існуючі signatures для {len(self.class_signatures)} класів" except Exception as e: print(f"Помилка при завантаженні signatures: {str(e)}") try: self.class_signatures = {} total_classes = len(self.classes_json) print(f"Починаємо створення нових signatures для {total_classes} класів...") for idx, (cls_name, hints) in enumerate(self.classes_json.items(), 1): if not hints: print(f"Пропускаємо клас {cls_name} - немає хінтів") continue print(f"Обробка класу {cls_name} ({idx}/{total_classes})...") try: arr = self.embed_hints(hints, model_name=model_name) self.class_signatures[cls_name] = arr.mean(axis=0) print(f"Успішно створено signature для {cls_name}") except Exception as e: print(f"Помилка при створенні signature для {cls_name}: {str(e)}") continue if not self.class_signatures: return "Помилка: Не вдалося створити жодного signature" self.save_signatures(signatures_file) print("Signatures збережено у файл") return f"Створено та збережено нові signatures для {len(self.class_signatures)} класів" except Exception as e: return f"Помилка при створенні signatures: {str(e)}" def load_data(self, csv_path: str = "messages.csv", emb_path: str = "embeddings.npy"): """Завантаження даних з CSV та NPY файлів""" self.df = pd.read_csv(csv_path) emb_local = np.load(emb_path) assert len(self.df) == len(emb_local), "CSV і embeddings різної довжини!" self.df["Target"] = "Unlabeled" self.embeddings_mean = emb_local.mean(axis=0) self.embeddings_std = emb_local.std(axis=0) self.embeddings = (emb_local - self.embeddings_mean) / self.embeddings_std return f"Завантажено {len(self.df)} рядків" def predict_classes(self, text_embedding: np.ndarray, threshold: float = 0.0) -> Dict[str, float]: """Передбачення класів для одного тексту""" results = {} for cls, sign in self.class_signatures.items(): score = float(np.dot(text_embedding, sign)) if score > threshold: results[cls] = score return dict(sorted(results.items(), key=lambda x: x[1], reverse=True)) def process_single_text(self, text: str, threshold: float = 0.3) -> dict: """Обробка одного тексту""" if self.class_signatures is None: return {"error": "Спочатку збудуйте signatures!"} emb = self.get_openai_embedding(text) if self.embeddings_mean is not None and self.embeddings_std is not None: emb = (emb - self.embeddings_mean) / self.embeddings_std predictions = self.predict_classes(emb, threshold) if not predictions: return {"message": text, "result": "Жодного класу не знайдено"} formatted_results = [] for cls, score in predictions.items(): formatted_results.append(f"{cls}: {score:.2%}") return { "message": text, "result": "\n".join(formatted_results) } def classify_rows(self, filter_substring: str = "", threshold: float = 0.3): """Класифікація всіх або відфільтрованих рядків""" if self.class_signatures is None: return "Спочатку збудуйте signatures!" if self.df is None or self.embeddings is None: return "Дані не завантажені! Спочатку викличте load_data." if filter_substring: filtered_idx = self.df[self.df["Message"].str.contains(filter_substring, case=False, na=False)].index else: filtered_idx = self.df.index for cls in self.class_signatures.keys(): self.df[f"Score_{cls}"] = 0.0 for i in filtered_idx: emb_vec = self.embeddings[i] predictions = self.predict_classes(emb_vec, threshold=threshold) for cls, score in predictions.items(): self.df.at[i, f"Score_{cls}"] = score main_classes = [cls for cls, score in predictions.items() if score > threshold] self.df.at[i, "Target"] = "|".join(main_classes) if main_classes else "None" result_columns = ["Message", "Target"] + [f"Score_{cls}" for cls in self.class_signatures.keys()] result_df = self.df.loc[filtered_idx, result_columns].copy() return result_df.reset_index(drop=True) def save_results(self, output_path: str = "messages_with_labels.csv") -> str: """Зберігання результатів класифікації""" if self.df is None: return "Дані відсутні!" self.df.to_csv(output_path, index=False) return f"Дані збережено у файл {output_path}"