Spaces:
Running
Running
File size: 12,000 Bytes
1b97239 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 |
# Standard library imports
import re
import json
import asyncio
from typing import Annotated, Optional, Dict, Any, List
# Related third-party imports
import yaml
# Local imports
from src.text.model import LanguageModelManager
from src.audio.utils import Formatter
class LLMOrchestrator:
"""
A handler to perform specific LLM tasks such as classification or sentiment analysis.
This class uses a language model to perform different tasks by dynamically changing the prompt.
Parameters
----------
config_path : str
Path to the configuration file for the language model manager.
prompt_config_path : str
Path to the configuration file containing prompts for different tasks.
model_id : str, optional
Identifier of the model to use. Defaults to "llama".
cache_size : int, optional
Cache size for the language model manager. Defaults to 2.
Attributes
----------
manager : LanguageModelManager
An instance of LanguageModelManager for interacting with the model.
model_id : str
The identifier of the language model in use.
prompts : Dict[str, Dict[str, str]]
A dictionary containing prompts for different tasks.
"""
def __init__(
self,
config_path: Annotated[str, "Path to the configuration file"],
prompt_config_path: Annotated[str, "Path to the prompt configuration file"],
model_id: Annotated[str, "Language model identifier"] = "llama",
cache_size: Annotated[int, "Cache size for the language model manager"] = 2,
):
"""
Initializes the LLMOrchestrator with a language model manager and loads prompts.
Parameters
----------
config_path : str
Path to the configuration file for the language model manager.
prompt_config_path : str
Path to the configuration file containing prompts for different tasks.
model_id : str, optional
Identifier of the model to use. Defaults to "llama".
cache_size : int, optional
Cache size for the language model manager. Defaults to 2.
"""
self.manager = LanguageModelManager(config_path=config_path, cache_size=cache_size)
self.model_id = model_id
self.prompts = self._load_prompts(prompt_config_path)
@staticmethod
def _load_prompts(prompt_config_path: str) -> Dict[str, Dict[str, str]]:
"""
Loads prompts from the prompt configuration file.
Parameters
----------
prompt_config_path : str
Path to the prompt configuration file.
Returns
-------
Dict[str, Dict[str, str]]
A dictionary containing prompts for different tasks.
"""
with open(prompt_config_path, encoding='utf-8') as f:
prompts = yaml.safe_load(f)
return prompts
@staticmethod
def extract_json(
response: Annotated[str, "The response string to extract JSON from"]
) -> Annotated[Optional[Dict[str, Any]], "Extracted JSON as a dictionary or None if not found"]:
"""
Extracts the last valid JSON object from a given response string.
Parameters
----------
response : str
The response string to extract JSON from.
Returns
-------
Optional[Dict[str, Any]]
The last valid JSON dictionary if successfully extracted and parsed, otherwise None.
"""
json_pattern = r'\{(?:[^{}]|(?:\{[^{}]*\}))*\}'
matches = re.findall(json_pattern, response)
for match in reversed(matches):
try:
return json.loads(match)
except json.JSONDecodeError:
continue
return None
async def generate(
self,
prompt_name: Annotated[str, "The name of the prompt to use (e.g., 'Classification', 'SentimentAnalysis')"],
user_input: Annotated[Any, "The user's context or input data"],
system_input: Annotated[Optional[Any], "The system's context or input data"] = None
) -> Annotated[Dict[str, Any], "Task results or error dictionary"]:
"""
Performs the specified LLM task using the selected prompt, supporting both user and optional system contexts.
"""
if prompt_name not in self.prompts:
return {"error": f"Prompt '{prompt_name}' is not defined in prompt.yaml."}
system_prompt_template = self.prompts[prompt_name].get('system', '')
user_prompt_template = self.prompts[prompt_name].get('user', '')
if not system_prompt_template or not user_prompt_template:
return {"error": f"Prompts for '{prompt_name}' are incomplete."}
formatted_user_input = Formatter.format_ssm_as_dialogue(user_input)
if system_input:
system_prompt = system_prompt_template.format(system_context=system_input)
else:
system_prompt = system_prompt_template
user_prompt = user_prompt_template.format(user_context=formatted_user_input)
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
response = await self.manager.generate(
model_id=self.model_id,
messages=messages,
max_new_tokens=10000,
)
print(response)
dict_obj = self.extract_json(response)
if dict_obj:
return dict_obj
else:
return {"error": "No valid JSON object found in the response."}
class LLMResultHandler:
"""
A handler class to process and validate the output from a Language Learning Model (LLM)
and format structured data.
This class ensures that the input data conforms to expected formats and applies fallback
mechanisms to maintain data integrity.
Methods
-------
validate_and_fallback(llm_result, ssm)
Validates the LLM result against structured speaker metadata and applies fallback.
_fallback(ssm)
Applies fallback formatting to the speaker data.
log_result(ssm, llm_result)
Logs the final processed data and the original LLM result.
"""
def __init__(self):
"""
Initializes the LLMResultHandler class.
"""
pass
def validate_and_fallback(
self,
llm_result: Annotated[Dict[str, str], "LLM result with customer and CSR speaker identifiers"],
ssm: Annotated[List[Dict[str, Any]], "List of sentences with speaker metadata"]
) -> Annotated[List[Dict[str, Any]], "Processed speaker metadata"]:
"""
Validates the LLM result and applies corrections to the speaker metadata.
Parameters
----------
llm_result : dict
A dictionary containing speaker identifiers for 'Customer' and 'CSR'.
ssm : list of dict
A list of dictionaries where each dictionary represents a sentence with
metadata, including the 'speaker'.
Returns
-------
list of dict
The processed speaker metadata with standardized speaker labels.
Examples
--------
>>> result = {"Customer": "Speaker 1", "CSR": "Speaker 2"}
>>> ssm_ = [{"speaker": "Speaker 1", "text": "Hello!"}, {"speaker": "Speaker 2", "text": "Hi!"}]
>>> handler = LLMResultHandler()
>>> handler.validate_and_fallback(llm_result, ssm)
[{'speaker': 'Customer', 'text': 'Hello!'}, {'speaker': 'CSR', 'text': 'Hi!'}]
"""
if not isinstance(llm_result, dict):
return self._fallback(ssm)
if "Customer" not in llm_result or "CSR" not in llm_result:
return self._fallback(ssm)
customer_speaker = llm_result["Customer"]
csr_speaker = llm_result["CSR"]
speaker_pattern = r"^Speaker\s+\d+$"
if (not re.match(speaker_pattern, customer_speaker)) or (not re.match(speaker_pattern, csr_speaker)):
return self._fallback(ssm)
ssm_speakers = {sentence["speaker"] for sentence in ssm}
if customer_speaker not in ssm_speakers or csr_speaker not in ssm_speakers:
return self._fallback(ssm)
for sentence in ssm:
if sentence["speaker"] == csr_speaker:
sentence["speaker"] = "CSR"
elif sentence["speaker"] == customer_speaker:
sentence["speaker"] = "Customer"
else:
sentence["speaker"] = "Customer"
return ssm
@staticmethod
def _fallback(
ssm: Annotated[List[Dict[str, Any]], "List of sentences with speaker metadata"]
) -> Annotated[List[Dict[str, Any]], "Fallback speaker metadata"]:
"""
Applies fallback formatting to speaker metadata when validation fails.
Parameters
----------
ssm : list of dict
A list of dictionaries representing sentences with speaker metadata.
Returns
-------
list of dict
The speaker metadata with fallback formatting applied.
Examples
--------
>>> ssm_ = [{"speaker": "Speaker 1", "text": "Hello!"}, {"speaker": "Speaker 2", "text": "Hi!"}]
>>> handler = LLMResultHandler()
>>> handler._fallback(ssm)
[{'speaker': 'CSR', 'text': 'Hello!'}, {'speaker': 'Customer', 'text': 'Hi!'}]
"""
if len(ssm) > 0:
first_speaker = ssm[0]["speaker"]
for sentence in ssm:
if sentence["speaker"] == first_speaker:
sentence["speaker"] = "CSR"
else:
sentence["speaker"] = "Customer"
return ssm
@staticmethod
def log_result(
ssm: Annotated[List[Dict[str, Any]], "Final processed speaker metadata"],
llm_result: Annotated[Dict[str, str], "Original LLM result"]
) -> None:
"""
Logs the final processed speaker metadata and the original LLM result.
Parameters
----------
ssm : list of dict
The processed speaker metadata.
llm_result : dict
The original LLM result.
Returns
-------
None
Examples
--------
>>> ssm_ = [{"speaker": "CSR", "text": "Hello!"}, {"speaker": "Customer", "text": "Hi!"}]
>>> result = {"Customer": "Speaker 1", "CSR": "Speaker 2"}
>>> handler = LLMResultHandler()
>>> handler.log_result(ssm, llm_result)
Final SSM: [{'speaker': 'CSR', 'text': 'Hello!'}, {'speaker': 'Customer', 'text': 'Hi!'}]
LLM Result: {'Customer': 'Speaker 1', 'CSR': 'Speaker 2'}
"""
print("Final SSM:", ssm)
print("LLM Result:", llm_result)
if __name__ == "__main__":
# noinspection PyMissingOrEmptyDocstring
async def main():
handler = LLMOrchestrator(
config_path="config/config.yaml",
prompt_config_path="config/prompt.yaml",
model_id="openai",
)
conversation = [
{"speaker": "Speaker 1", "text": "Hello, I need help with my order."},
{"speaker": "Speaker 0", "text": "Sure, I'd be happy to assist you."},
{"speaker": "Speaker 1", "text": "I haven't received it yet."},
{"speaker": "Speaker 0", "text": "Let me check the status for you."}
]
speaker_roles = await handler.generate("Classification", conversation)
print("Speaker Roles:", speaker_roles)
print("Type:", type(speaker_roles))
sentiment_analyzer = LLMOrchestrator(
config_path="config/config.yaml",
prompt_config_path="config/prompt.yaml"
)
sentiment = await sentiment_analyzer.generate("SentimentAnalysis", conversation)
print("\nSentiment Analysis:", sentiment)
asyncio.run(main())
|