""" Data Processing Functions. Supports: - Segmentation of long text - Segmentation of file content """ from langchain_community.document_loaders import TextLoader, PyPDFLoader, Docx2txtLoader, BSHTMLLoader, JSONLoader from nltk.tokenize import sent_tokenize from collections import Counter import re import json import yaml import os import yaml import os import inspect import ast with open(os.path.join(os.path.dirname(__file__), "..", "config.yaml")) as file: config = yaml.safe_load(file) # Load configuration def load_extraction_config(yaml_path): # Read YAML content from the file path if not os.path.exists(yaml_path): print(f"Error: The config file '{yaml_path}' does not exist.") return {} with open(yaml_path, 'r') as file: config = yaml.safe_load(file) # Extract the 'extraction' configuration dictionary model_config = config.get('model', {}) extraction_config = config.get('extraction', {}) # Model config model_name_or_path = model_config.get('model_name_or_path', "") model_category = model_config.get('category', "") api_key = model_config.get('api_key', "") base_url = model_config.get('base_url', "") vllm_serve = model_config.get('vllm_serve', False) # Extraction config task = extraction_config.get('task', "") instruction = extraction_config.get('instruction', "") text = extraction_config.get('text', "") output_schema = extraction_config.get('output_schema', "") constraint = extraction_config.get('constraint', "") truth = extraction_config.get('truth', "") use_file = extraction_config.get('use_file', False) file_path = extraction_config.get('file_path', "") mode = extraction_config.get('mode', "quick") update_case = extraction_config.get('update_case', False) show_trajectory = extraction_config.get('show_trajectory', False) # Construct config (optional: for constructing your knowledge graph) if 'construct' in config: construct_config = config.get('construct', {}) database = construct_config.get('database', "") url = construct_config.get('url', "") username = construct_config.get('username', "") password = construct_config.get('password', "") # Return a dictionary containing these variables return { "model": { "model_name_or_path": model_name_or_path, "category": model_category, "api_key": api_key, "base_url": base_url, "vllm_serve": vllm_serve }, "extraction": { "task": task, "instruction": instruction, "text": text, "output_schema": output_schema, "constraint": constraint, "truth": truth, "use_file": use_file, "file_path": file_path, "mode": mode, "update_case": update_case, "show_trajectory": show_trajectory }, "construct": { "database": database, "url": url, "username": username, "password": password } } # Return a dictionary containing these variables return { "model": { "model_name_or_path": model_name_or_path, "category": model_category, "api_key": api_key, "base_url": base_url, "vllm_serve": vllm_serve }, "extraction": { "task": task, "instruction": instruction, "text": text, "output_schema": output_schema, "constraint": constraint, "truth": truth, "use_file": use_file, "file_path": file_path, "mode": mode, "update_case": update_case, "show_trajectory": show_trajectory } } # Split the string text into chunks def chunk_str(text): sentences = sent_tokenize(text) chunks = [] current_chunk = [] current_length = 0 for sentence in sentences: token_count = len(sentence.split()) if current_length + token_count <= config['agent']['chunk_token_limit']: current_chunk.append(sentence) current_length += token_count else: if current_chunk: chunks.append(' '.join(current_chunk)) current_chunk = [sentence] current_length = token_count if current_chunk: chunks.append(' '.join(current_chunk)) return chunks # Load and split the content of a file def chunk_file(file_path): pages = [] if file_path.endswith(".pdf"): loader = PyPDFLoader(file_path) elif file_path.endswith(".txt"): loader = TextLoader(file_path) elif file_path.endswith(".docx"): loader = Docx2txtLoader(file_path) elif file_path.endswith(".html"): loader = BSHTMLLoader(file_path) elif file_path.endswith(".json"): loader = JSONLoader(file_path) else: raise ValueError("Unsupported file format") # Inform that the format is unsupported pages = loader.load_and_split() docs = "" for item in pages: docs += item.page_content pages = chunk_str(docs) return pages def process_single_quotes(text): result = re.sub(r"(? 0 else 0 recall = TP / (TP + FN) if (TP + FN) > 0 else 0 f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0 return precision, recall, f1_score def current_function_name(): try: stack = inspect.stack() if len(stack) > 1: outer_func_name = stack[1].function return outer_func_name else: print("No caller function found") return None except Exception as e: print(f"An error occurred: {e}") pass def normalize_obj(value): if isinstance(value, dict): return frozenset((k, normalize_obj(v)) for k, v in value.items()) elif isinstance(value, (list, set, tuple)): return tuple(Counter(map(normalize_obj, value)).items()) elif isinstance(value, str): return format_string(value) return value def dict_list_to_set(data_list): result_set = set() try: for dictionary in data_list: value_tuple = tuple(format_string(value) for value in dictionary.values()) result_set.add(value_tuple) return result_set except Exception as e: print (f"Failed to convert dictionary list to set: {data_list}") return result_set