Chamin09 commited on
Commit
b47ceab
·
verified ·
1 Parent(s): a419c1b

Update app/orchestrator.py

Browse files
Files changed (1) hide show
  1. app/orchestrator.py +584 -579
app/orchestrator.py CHANGED
@@ -1,579 +1,584 @@
1
- # Updated app/orchestrator.py
2
- import logging
3
- import os
4
- import time
5
- from typing import Dict, List, Optional, Tuple, Union, Any
6
- from datetime import datetime, timedelta
7
- import json
8
-
9
- from app.error_handler import ErrorHandler, with_error_handling
10
- from app.synchronizer import Synchronizer
11
-
12
- import threading
13
-
14
- class Orchestrator:
15
- def __init__(self, coordinator_agent, text_model_manager=None, image_model_manager=None,
16
- summary_model_manager=None, token_manager=None, cache_manager=None,
17
- metrics_calculator=None):
18
- """Initialize the Orchestrator with required components."""
19
- self.logger = logging.getLogger(__name__)
20
- self.coordinator_agent = coordinator_agent
21
- self.text_model_manager = text_model_manager
22
- self.image_model_manager = image_model_manager
23
- self.summary_model_manager = summary_model_manager
24
- self.token_manager = token_manager
25
- self.cache_manager = cache_manager
26
- self.metrics_calculator = metrics_calculator
27
-
28
- # Initialize error handler
29
- self.error_handler = ErrorHandler(metrics_calculator=metrics_calculator)
30
-
31
- # Register fallbacks
32
- self._register_fallbacks()
33
-
34
- # Track active sessions
35
- self.active_sessions = {}
36
- self.session_counter = 0
37
-
38
- self.synchronizer = Synchronizer()
39
- self._register_agents_with_synchronizer()
40
-
41
- def _register_agents_with_synchronizer(self):
42
- """Register all agents with the synchronizer."""
43
- # Register the coordinator
44
- self.synchronizer.register_agent("coordinator_agent")
45
-
46
- # Register other agents if available
47
- if hasattr(self, "text_analysis_agent") and self.text_analysis_agent:
48
- self.synchronizer.register_agent("text_analysis_agent")
49
-
50
- if hasattr(self, "image_processing_agent") and self.image_processing_agent:
51
- self.synchronizer.register_agent("image_processing_agent")
52
-
53
- if hasattr(self, "report_generation_agent") and self.report_generation_agent:
54
- self.synchronizer.register_agent("report_generation_agent")
55
-
56
- if hasattr(self, "metrics_agent") and self.metrics_agent:
57
- self.synchronizer.register_agent("metrics_agent")
58
-
59
-
60
- def coordinate_workflow_with_synchronization(self, session_id: str, topic: str,
61
- text_files: List[str], image_files: List[str]) -> Dict[str, Any]:
62
- """
63
- Coordinate a workflow with explicit synchronization points.
64
- This provides more control over the workflow execution than the standard process_request.
65
- """
66
- if session_id not in self.active_sessions:
67
- return {"error": f"Session {session_id} not found. Please create a new session."}
68
-
69
- session = self.active_sessions[session_id]
70
- session["status"] = "processing"
71
-
72
- # Create a workflow ID
73
- workflow_id = f"workflow_{int(time.time())}"
74
-
75
- # Initialize workflow
76
- workflow_result = self.coordinator_agent.initialize_workflow(topic, text_files, image_files)
77
-
78
- # Store workflow ID
79
- if "workflow_id" in workflow_result:
80
- workflow_id = workflow_result["workflow_id"]
81
- session["workflows"].append(workflow_id)
82
- session["current_workflow"] = workflow_id
83
-
84
- # Create synchronization barriers
85
- analysis_barrier_id = self.synchronizer.create_barrier(
86
- "analysis_complete",
87
- ["text_analysis_agent", "image_processing_agent"]
88
- )
89
-
90
- report_barrier_id = self.synchronizer.create_barrier(
91
- "report_ready",
92
- ["report_generation_agent"]
93
- )
94
-
95
- # Set up dependencies
96
- if hasattr(self, "report_generation_agent") and self.report_generation_agent:
97
- self.synchronizer.register_dependencies(
98
- "report_generation_agent",
99
- f"generate_report_{workflow_id}",
100
- [
101
- ("text_analysis_agent", f"analyze_text_{workflow_id}"),
102
- ("image_processing_agent", f"process_images_{workflow_id}")
103
- ]
104
- )
105
-
106
- # Start text analysis in background
107
- if hasattr(self, "text_analysis_agent") and self.text_analysis_agent and text_files:
108
- def text_analysis_task():
109
- try:
110
- # Process text files
111
- result = self.text_analysis_agent.process_text_files(topic, text_files)
112
-
113
- # Signal completion
114
- self.synchronizer.signal_completion(
115
- "text_analysis_agent",
116
- f"analyze_text_{workflow_id}",
117
- result
118
- )
119
-
120
- # Arrive at barrier
121
- self.synchronizer.arrive_at_barrier(analysis_barrier_id, "text_analysis_agent", result)
122
-
123
- return result
124
- except Exception as e:
125
- self.logger.error(f"Error in text analysis: {str(e)}")
126
- # Signal completion with error
127
- self.synchronizer.signal_completion(
128
- "text_analysis_agent",
129
- f"analyze_text_{workflow_id}",
130
- {"error": str(e)}
131
- )
132
- # Arrive at barrier with error
133
- self.synchronizer.arrive_at_barrier(
134
- analysis_barrier_id,
135
- "text_analysis_agent",
136
- {"error": str(e)}
137
- )
138
- return {"error": str(e)}
139
-
140
- # Start in background thread
141
- text_thread = threading.Thread(target=text_analysis_task)
142
- text_thread.daemon = True
143
- text_thread.start()
144
-
145
- else:
146
- # If no text analysis, signal completion with empty result
147
- self.synchronizer.signal_completion(
148
- "text_analysis_agent",
149
- f"analyze_text_{workflow_id}",
150
- {"status": "skipped", "reason": "No text files or text analysis agent"}
151
- )
152
- self.synchronizer.arrive_at_barrier(
153
- analysis_barrier_id,
154
- "text_analysis_agent",
155
- {"status": "skipped"}
156
- )
157
-
158
- # Start image processing in background
159
- if hasattr(self, "image_processing_agent") and self.image_processing_agent and image_files:
160
- def image_processing_task():
161
- try:
162
- # Process images
163
- result = self.image_processing_agent.process_image_files(topic, image_files)
164
-
165
- # Signal completion
166
- self.synchronizer.signal_completion(
167
- "image_processing_agent",
168
- f"process_images_{workflow_id}",
169
- result
170
- )
171
-
172
- # Arrive at barrier
173
- self.synchronizer.arrive_at_barrier(analysis_barrier_id, "image_processing_agent", result)
174
-
175
- return result
176
- except Exception as e:
177
- self.logger.error(f"Error in image processing: {str(e)}")
178
- # Signal completion with error
179
- self.synchronizer.signal_completion(
180
- "image_processing_agent",
181
- f"process_images_{workflow_id}",
182
- {"error": str(e)}
183
- )
184
- # Arrive at barrier with error
185
- self.synchronizer.arrive_at_barrier(
186
- analysis_barrier_id,
187
- "image_processing_agent",
188
- {"error": str(e)}
189
- )
190
- return {"error": str(e)}
191
-
192
- # Start in background thread
193
- image_thread = threading.Thread(target=image_processing_task)
194
- image_thread.daemon = True
195
- image_thread.start()
196
-
197
- else:
198
- # If no image processing, signal completion with empty result
199
- self.synchronizer.signal_completion(
200
- "image_processing_agent",
201
- f"process_images_{workflow_id}",
202
- {"status": "skipped", "reason": "No image files or image processing agent"}
203
- )
204
- self.synchronizer.arrive_at_barrier(
205
- analysis_barrier_id,
206
- "image_processing_agent",
207
- {"status": "skipped"}
208
- )
209
-
210
- # Wait for analysis to complete
211
- if not self.synchronizer.wait_for_barrier(analysis_barrier_id, timeout=300): # 5 minute timeout
212
- self.logger.error(f"Timeout waiting for analysis to complete")
213
- session["status"] = "error"
214
- return {
215
- "error": "Timeout waiting for analysis to complete",
216
- "status": "timeout",
217
- "workflow_id": workflow_id
218
- }
219
-
220
- # Get analysis results
221
- barrier_data = self.synchronizer.get_barrier_data(analysis_barrier_id)
222
- text_analysis = barrier_data.get("text_analysis_agent", {})
223
- image_analysis = barrier_data.get("image_processing_agent", {})
224
-
225
- # Check for errors
226
- text_error = "error" in text_analysis
227
- image_error = "error" in image_analysis
228
-
229
- if text_error and image_error:
230
- session["status"] = "error"
231
- return {
232
- "error": "Both text and image analysis failed",
233
- "text_error": text_analysis.get("error", "Unknown error"),
234
- "image_error": image_analysis.get("error", "Unknown error"),
235
- "status": "error",
236
- "workflow_id": workflow_id
237
- }
238
-
239
- # Generate report
240
- if hasattr(self, "report_generation_agent") and self.report_generation_agent:
241
- def report_generation_task():
242
- try:
243
- # Wait for dependencies to be met
244
- if not self.synchronizer.are_dependencies_met(
245
- "report_generation_agent", f"generate_report_{workflow_id}"):
246
- self.logger.info("Waiting for dependencies to be met for report generation")
247
-
248
- # Generate report
249
- result = self.report_generation_agent.generate_report(
250
- topic, text_analysis, image_analysis)
251
-
252
- # Signal completion
253
- self.synchronizer.signal_completion(
254
- "report_generation_agent",
255
- f"generate_report_{workflow_id}",
256
- result
257
- )
258
-
259
- # Arrive at barrier
260
- self.synchronizer.arrive_at_barrier(report_barrier_id, "report_generation_agent", result)
261
-
262
- return result
263
- except Exception as e:
264
- self.logger.error(f"Error in report generation: {str(e)}")
265
- # Signal completion with error
266
- self.synchronizer.signal_completion(
267
- "report_generation_agent",
268
- f"generate_report_{workflow_id}",
269
- {"error": str(e)}
270
- )
271
- # Arrive at barrier with error
272
- self.synchronizer.arrive_at_barrier(
273
- report_barrier_id,
274
- "report_generation_agent",
275
- {"error": str(e)}
276
- )
277
- return {"error": str(e)}
278
-
279
- # Start in background thread
280
- report_thread = threading.Thread(target=report_generation_task)
281
- report_thread.daemon = True
282
- report_thread.start()
283
-
284
- # Wait for report to be ready
285
- if not self.synchronizer.wait_for_barrier(report_barrier_id, timeout=300): # 5 minute timeout
286
- self.logger.error(f"Timeout waiting for report generation")
287
- session["status"] = "error"
288
- return {
289
- "error": "Timeout waiting for report generation",
290
- "status": "timeout",
291
- "workflow_id": workflow_id,
292
- "partial_results": {
293
- "text_analysis": text_analysis,
294
- "image_analysis": image_analysis
295
- }
296
- }
297
-
298
- # Get report
299
- barrier_data = self.synchronizer.get_barrier_data(report_barrier_id)
300
- report = barrier_data.get("report_generation_agent", {})
301
-
302
- # Check for errors
303
- if "error" in report:
304
- session["status"] = "error"
305
- return {
306
- "error": "Report generation failed",
307
- "report_error": report.get("error", "Unknown error"),
308
- "status": "error",
309
- "workflow_id": workflow_id,
310
- "partial_results": {
311
- "text_analysis": text_analysis,
312
- "image_analysis": image_analysis
313
- }
314
- }
315
-
316
- # Update session status
317
- session["status"] = "completed"
318
- session["last_result"] = report
319
-
320
- # Get sustainability metrics if available
321
- sustainability_metrics = None
322
- if hasattr(self, "metrics_agent") and self.metrics_agent:
323
- try:
324
- sustainability_metrics = self.metrics_agent.generate_sustainability_report()
325
- except Exception as e:
326
- self.logger.error(f"Error getting sustainability metrics: {str(e)}")
327
-
328
- # Return final result
329
- return {
330
- "status": "completed",
331
- "workflow_id": workflow_id,
332
- "topic": topic,
333
- "report": report,
334
- "sustainability_metrics": sustainability_metrics
335
- }
336
- else:
337
- # No report generation, return analysis results
338
- session["status"] = "completed"
339
- return {
340
- "status": "completed",
341
- "workflow_id": workflow_id,
342
- "topic": topic,
343
- "results": {
344
- "text_analysis": text_analysis,
345
- "image_analysis": image_analysis
346
- }
347
- }
348
-
349
-
350
- def _register_fallbacks(self):
351
- """Register fallback functions for critical operations."""
352
- # Fallback for process_request
353
- self.error_handler.register_fallback(
354
- "orchestrator", "process_request",
355
- self._fallback_process_request
356
- )
357
-
358
- # Fallback for coordinator workflow execution
359
- self.error_handler.register_fallback(
360
- "coordinator_agent", "execute_workflow",
361
- self._fallback_execute_workflow
362
- )
363
-
364
- def _fallback_process_request(self, context):
365
- """Fallback function for processing requests."""
366
- # Extract what we can from the context
367
- kwargs = context.get("kwargs", {})
368
- topic = kwargs.get("topic", "unknown")
369
- session_id = kwargs.get("session_id", "unknown")
370
-
371
- # Check if we have a session
372
- if session_id in self.active_sessions:
373
- session = self.active_sessions[session_id]
374
- session["status"] = "error"
375
- session["error"] = "Request processing failed, using fallback"
376
-
377
- return {
378
- "status": "error",
379
- "message": "An error occurred while processing your request. Using simplified processing.",
380
- "topic": topic,
381
- "fallback": True,
382
- "result": {
383
- "confidence_level": "low",
384
- "summary": "Unable to process request fully. Please try again or simplify your query."
385
- }
386
- }
387
-
388
- def _fallback_execute_workflow(self, context):
389
- """Fallback function for workflow execution."""
390
- # We can attempt direct coordination as a fallback
391
- try:
392
- if hasattr(self.coordinator_agent, "_direct_coordination"):
393
- # Extract current topic and files from coordinator agent state
394
- topic = self.coordinator_agent.current_topic
395
- if topic and topic in self.coordinator_agent.workflow_state:
396
- workflow = self.coordinator_agent.workflow_state[topic]
397
- text_files = workflow.get("text_files", [])
398
- image_files = workflow.get("image_files", [])
399
-
400
- # Try direct coordination
401
- return self.coordinator_agent._direct_coordination(topic, text_files, image_files)
402
-
403
- # If we can't do direct coordination, return a basic error response
404
- return {
405
- "status": "error",
406
- "message": "Workflow execution failed. Using fallback.",
407
- "fallback": True
408
- }
409
- except Exception as e:
410
- self.logger.error(f"Fallback for execute_workflow also failed: {str(e)}")
411
- return {
412
- "status": "critical_error",
413
- "message": "Both primary and fallback execution failed."
414
- }
415
-
416
- @with_error_handling("orchestrator", "create_session", lambda self: self.error_handler)
417
- def create_session(self) -> str:
418
- """Create a new session and return session ID."""
419
- session_id = f"session_{int(time.time())}_{self.session_counter}"
420
- self.session_counter += 1
421
-
422
- self.active_sessions[session_id] = {
423
- "created_at": datetime.now().isoformat(),
424
- "status": "initialized",
425
- "workflows": [],
426
- "current_workflow": None
427
- }
428
-
429
- self.logger.info(f"Created new session: {session_id}")
430
- return session_id
431
-
432
- @with_error_handling("orchestrator", "process_request", lambda self: self.error_handler)
433
- def process_request(self, session_id: str, topic: str, text_files: List[str],
434
- image_files: List[str]) -> Dict[str, Any]:
435
- """
436
- Process a user request within a session.
437
- Coordinates the workflow through the coordinator agent.
438
- """
439
- if session_id not in self.active_sessions:
440
- return {"error": f"Session {session_id} not found. Please create a new session."}
441
-
442
- session = self.active_sessions[session_id]
443
- session["status"] = "processing"
444
-
445
- # Initialize workflow via coordinator
446
- workflow_result = self.coordinator_agent.initialize_workflow(topic, text_files, image_files)
447
-
448
- # Store workflow ID in session
449
- workflow_id = workflow_result.get("workflow_id")
450
- if workflow_id:
451
- session["workflows"].append(workflow_id)
452
- session["current_workflow"] = workflow_id
453
-
454
- # Execute workflow with error handling
455
- try:
456
- # Try to execute with error handling
457
- result = self._execute_workflow_with_error_handling()
458
- except Exception as e:
459
- # If that fails, try direct execution as a last resort
460
- self.logger.error(f"Error executing workflow with error handling: {str(e)}")
461
- result = self.coordinator_agent.execute_workflow()
462
-
463
- # Update session status
464
- session["status"] = "completed" if not result.get("error") else "error"
465
- session["last_result"] = result
466
-
467
- return result
468
-
469
- def _execute_workflow_with_error_handling(self) -> Dict[str, Any]:
470
- """Execute workflow with error handling."""
471
- try:
472
- result = self.coordinator_agent.execute_workflow()
473
- self.error_handler.record_success("coordinator_agent", "execute_workflow")
474
- return result
475
- except Exception as e:
476
- # Create context
477
- context = {
478
- "orchestrator": self,
479
- "coordinator_agent": self.coordinator_agent
480
- }
481
-
482
- # Handle the error
483
- handled, fallback_result = self.error_handler.handle_error(
484
- "coordinator_agent", "execute_workflow", e, context)
485
-
486
- if handled:
487
- return fallback_result
488
- else:
489
- # Re-raise the exception if not handled
490
- raise
491
-
492
- @with_error_handling("orchestrator", "get_session_status", lambda self: self.error_handler)
493
- def get_session_status(self, session_id: str) -> Dict[str, Any]:
494
- """Get the status of a session."""
495
- if session_id not in self.active_sessions:
496
- return {"error": f"Session {session_id} not found"}
497
-
498
- session = self.active_sessions[session_id]
499
-
500
- # If there's an active workflow, get its status
501
- if session.get("current_workflow"):
502
- try:
503
- workflow_status = self.coordinator_agent.get_workflow_status(
504
- session["current_workflow"])
505
-
506
- return {
507
- "session_id": session_id,
508
- "status": session["status"],
509
- "created_at": session["created_at"],
510
- "workflows": session["workflows"],
511
- "current_workflow": session["current_workflow"],
512
- "workflow_status": workflow_status
513
- }
514
- except Exception as e:
515
- # If getting workflow status fails, return basic session info
516
- self.logger.error(f"Error getting workflow status: {str(e)}")
517
- return {
518
- "session_id": session_id,
519
- "status": session["status"],
520
- "created_at": session["created_at"],
521
- "workflows": session["workflows"],
522
- "error": "Failed to retrieve detailed workflow status"
523
- }
524
- else:
525
- return {
526
- "session_id": session_id,
527
- "status": session["status"],
528
- "created_at": session["created_at"],
529
- "workflows": session["workflows"]
530
- }
531
-
532
- @with_error_handling("orchestrator", "cleanup_session", lambda self: self.error_handler)
533
- def cleanup_session(self, session_id: str) -> Dict[str, Any]:
534
- """Clean up resources for a session."""
535
- if session_id not in self.active_sessions:
536
- return {"error": f"Session {session_id} not found"}
537
-
538
- session = self.active_sessions[session_id]
539
-
540
- # Clean up any active workflows
541
- if session.get("current_workflow"):
542
- try:
543
- self.coordinator_agent.cleanup_workflow(session["current_workflow"])
544
- except Exception as e:
545
- self.logger.error(f"Error cleaning up workflow: {str(e)}")
546
- # Continue with session cleanup even if workflow cleanup fails
547
-
548
- # Mark session as cleaned up
549
- session["status"] = "cleaned_up"
550
-
551
- return {
552
- "session_id": session_id,
553
- "status": "cleaned_up",
554
- "message": "Session resources have been cleaned up"
555
- }
556
-
557
- @with_error_handling("orchestrator", "get_sustainability_metrics", lambda self: self.error_handler)
558
- def get_sustainability_metrics(self, session_id: Optional[str] = None) -> Dict[str, Any]:
559
- """
560
- Get sustainability metrics for a session or the entire system.
561
- If session_id is provided, returns metrics for that session only.
562
- """
563
- if not self.metrics_calculator:
564
- return {"error": "Metrics calculator not available"}
565
-
566
- if session_id:
567
- # TODO: Implement session-specific metrics
568
- # For now, return global metrics
569
- return self.metrics_calculator.get_all_metrics()
570
- else:
571
- # Return global metrics
572
- return self.metrics_calculator.get_all_metrics()
573
-
574
- def get_error_report(self) -> Dict[str, Any]:
575
- """Get error report from the error handler."""
576
- if not self.error_handler:
577
- return {"error": "Error handler not available"}
578
-
579
- return self.error_handler.get_error_report()
 
 
 
 
 
 
1
+ # Updated app/orchestrator.py
2
+ import logging
3
+ import os
4
+ import time
5
+ from typing import Dict, List, Optional, Tuple, Union, Any
6
+ from datetime import datetime, timedelta
7
+ import json
8
+
9
+ from app.error_handler import ErrorHandler, with_error_handling
10
+ from app.synchronizer import Synchronizer
11
+
12
+ import threading
13
+
14
+ class Orchestrator:
15
+ def __init__(self, coordinator_agent, text_model_manager=None, image_model_manager=None,
16
+ summary_model_manager=None, token_manager=None, cache_manager=None,
17
+ metrics_calculator=None):
18
+ """Initialize the Orchestrator with required components."""
19
+ self.logger = logging.getLogger(__name__)
20
+ self.coordinator_agent = coordinator_agent
21
+ self.text_model_manager = text_model_manager
22
+ self.image_model_manager = image_model_manager
23
+ self.summary_model_manager = summary_model_manager
24
+ self.token_manager = token_manager
25
+ self.cache_manager = cache_manager
26
+ self.metrics_calculator = metrics_calculator
27
+
28
+ # Initialize error handler
29
+ self.error_handler = ErrorHandler(metrics_calculator=metrics_calculator)
30
+
31
+ # Register fallbacks
32
+ self._register_fallbacks()
33
+
34
+ # Track active sessions
35
+ self.active_sessions = {}
36
+ self.session_counter = 0
37
+
38
+ self.synchronizer = Synchronizer()
39
+ self._register_agents_with_synchronizer()
40
+
41
+ def _register_agents_with_synchronizer(self):
42
+ """Register all agents with the synchronizer."""
43
+ # Register the coordinator
44
+ self.synchronizer.register_agent("coordinator_agent")
45
+
46
+ # Register other agents if available
47
+ if hasattr(self, "text_analysis_agent") and self.text_analysis_agent:
48
+ self.synchronizer.register_agent("text_analysis_agent")
49
+
50
+ if hasattr(self, "image_processing_agent") and self.image_processing_agent:
51
+ self.synchronizer.register_agent("image_processing_agent")
52
+
53
+ if hasattr(self, "report_generation_agent") and self.report_generation_agent:
54
+ self.synchronizer.register_agent("report_generation_agent")
55
+
56
+ if hasattr(self, "metrics_agent") and self.metrics_agent:
57
+ self.synchronizer.register_agent("metrics_agent")
58
+
59
+
60
+ def coordinate_workflow_with_synchronization(self, session_id: str, topic: str,
61
+ text_files: List[str], image_files: List[str]) -> Dict[str, Any]:
62
+ """
63
+ Coordinate a workflow with explicit synchronization points.
64
+ This provides more control over the workflow execution than the standard process_request.
65
+ """
66
+ if session_id not in self.active_sessions:
67
+ return {"error": f"Session {session_id} not found. Please create a new session."}
68
+
69
+ session = self.active_sessions[session_id]
70
+ session["status"] = "processing"
71
+
72
+ # Create a workflow ID
73
+ workflow_id = f"workflow_{int(time.time())}"
74
+
75
+ # Initialize workflow
76
+ workflow_result = self.coordinator_agent.initialize_workflow(topic, text_files, image_files)
77
+
78
+ # Store workflow ID
79
+ if "workflow_id" in workflow_result:
80
+ workflow_id = workflow_result["workflow_id"]
81
+ session["workflows"].append(workflow_id)
82
+ session["current_workflow"] = workflow_id
83
+
84
+ # Create synchronization barriers
85
+ analysis_barrier_id = self.synchronizer.create_barrier(
86
+ "analysis_complete",
87
+ ["text_analysis_agent", "image_processing_agent"]
88
+ )
89
+
90
+ report_barrier_id = self.synchronizer.create_barrier(
91
+ "report_ready",
92
+ ["report_generation_agent"]
93
+ )
94
+
95
+ # Set up dependencies
96
+ if hasattr(self, "report_generation_agent") and self.report_generation_agent:
97
+ self.synchronizer.register_dependencies(
98
+ "report_generation_agent",
99
+ f"generate_report_{workflow_id}",
100
+ [
101
+ ("text_analysis_agent", f"analyze_text_{workflow_id}"),
102
+ ("image_processing_agent", f"process_images_{workflow_id}")
103
+ ]
104
+ )
105
+
106
+ # Start text analysis in background
107
+ if hasattr(self, "text_analysis_agent") and self.text_analysis_agent and text_files:
108
+ def text_analysis_task():
109
+ try:
110
+ # Process text files
111
+ result = self.text_analysis_agent.process_text_files(topic, text_files)
112
+
113
+ # Signal completion
114
+ self.synchronizer.signal_completion(
115
+ "text_analysis_agent",
116
+ f"analyze_text_{workflow_id}",
117
+ result
118
+ )
119
+
120
+ # Arrive at barrier
121
+ self.synchronizer.arrive_at_barrier(analysis_barrier_id, "text_analysis_agent", result)
122
+
123
+ return result
124
+ except Exception as e:
125
+ self.logger.error(f"Error in text analysis: {str(e)}")
126
+ # Signal completion with error
127
+ self.synchronizer.signal_completion(
128
+ "text_analysis_agent",
129
+ f"analyze_text_{workflow_id}",
130
+ {"error": str(e)}
131
+ )
132
+ # Arrive at barrier with error
133
+ self.synchronizer.arrive_at_barrier(
134
+ analysis_barrier_id,
135
+ "text_analysis_agent",
136
+ {"error": str(e)}
137
+ )
138
+ return {"error": str(e)}
139
+
140
+ # Start in background thread
141
+ text_thread = threading.Thread(target=text_analysis_task)
142
+ text_thread.daemon = True
143
+ text_thread.start()
144
+
145
+ else:
146
+ # If no text analysis, signal completion with empty result
147
+ self.synchronizer.signal_completion(
148
+ "text_analysis_agent",
149
+ f"analyze_text_{workflow_id}",
150
+ {"status": "skipped", "reason": "No text files or text analysis agent"}
151
+ )
152
+ self.synchronizer.arrive_at_barrier(
153
+ analysis_barrier_id,
154
+ "text_analysis_agent",
155
+ {"status": "skipped"}
156
+ )
157
+
158
+ # Start image processing in background
159
+ if hasattr(self, "image_processing_agent") and self.image_processing_agent and image_files:
160
+ def image_processing_task():
161
+ try:
162
+ # Process images
163
+ result = self.image_processing_agent.process_image_files(topic, image_files)
164
+
165
+ # Signal completion
166
+ self.synchronizer.signal_completion(
167
+ "image_processing_agent",
168
+ f"process_images_{workflow_id}",
169
+ result
170
+ )
171
+
172
+ # Arrive at barrier
173
+ self.synchronizer.arrive_at_barrier(analysis_barrier_id, "image_processing_agent", result)
174
+
175
+ return result
176
+ except Exception as e:
177
+ self.logger.error(f"Error in image processing: {str(e)}")
178
+ # Signal completion with error
179
+ self.synchronizer.signal_completion(
180
+ "image_processing_agent",
181
+ f"process_images_{workflow_id}",
182
+ {"error": str(e)}
183
+ )
184
+ # Arrive at barrier with error
185
+ self.synchronizer.arrive_at_barrier(
186
+ analysis_barrier_id,
187
+ "image_processing_agent",
188
+ {"error": str(e)}
189
+ )
190
+ return {"error": str(e)}
191
+
192
+ # Start in background thread
193
+ image_thread = threading.Thread(target=image_processing_task)
194
+ image_thread.daemon = True
195
+ image_thread.start()
196
+
197
+ else:
198
+ # If no image processing, signal completion with empty result
199
+ self.synchronizer.signal_completion(
200
+ "image_processing_agent",
201
+ f"process_images_{workflow_id}",
202
+ {"status": "skipped", "reason": "No image files or image processing agent"}
203
+ )
204
+ self.synchronizer.arrive_at_barrier(
205
+ analysis_barrier_id,
206
+ "image_processing_agent",
207
+ {"status": "skipped"}
208
+ )
209
+
210
+ # Wait for analysis to complete
211
+ if not self.synchronizer.wait_for_barrier(analysis_barrier_id, timeout=300): # 5 minute timeout
212
+ self.logger.error(f"Timeout waiting for analysis to complete")
213
+ session["status"] = "error"
214
+ return {
215
+ "error": "Timeout waiting for analysis to complete",
216
+ "status": "timeout",
217
+ "workflow_id": workflow_id
218
+ }
219
+
220
+ # Get analysis results
221
+ barrier_data = self.synchronizer.get_barrier_data(analysis_barrier_id)
222
+ text_analysis = barrier_data.get("text_analysis_agent", {})
223
+ image_analysis = barrier_data.get("image_processing_agent", {})
224
+
225
+ # Check for errors
226
+ text_error = "error" in text_analysis
227
+ image_error = "error" in image_analysis
228
+
229
+ if text_error and image_error:
230
+ session["status"] = "error"
231
+ return {
232
+ "error": "Both text and image analysis failed",
233
+ "text_error": text_analysis.get("error", "Unknown error"),
234
+ "image_error": image_analysis.get("error", "Unknown error"),
235
+ "status": "error",
236
+ "workflow_id": workflow_id
237
+ }
238
+
239
+ # Generate report
240
+ if hasattr(self, "report_generation_agent") and self.report_generation_agent:
241
+ def report_generation_task():
242
+ try:
243
+ # Wait for dependencies to be met
244
+ if not self.synchronizer.are_dependencies_met(
245
+ "report_generation_agent", f"generate_report_{workflow_id}"):
246
+ self.logger.info("Waiting for dependencies to be met for report generation")
247
+
248
+ # Generate report
249
+ result = self.report_generation_agent.generate_report(
250
+ topic, text_analysis, image_analysis)
251
+
252
+ # Signal completion
253
+ self.synchronizer.signal_completion(
254
+ "report_generation_agent",
255
+ f"generate_report_{workflow_id}",
256
+ result
257
+ )
258
+
259
+ # Arrive at barrier
260
+ self.synchronizer.arrive_at_barrier(report_barrier_id, "report_generation_agent", result)
261
+
262
+ return result
263
+ except Exception as e:
264
+ self.logger.error(f"Error in report generation: {str(e)}")
265
+ # Signal completion with error
266
+ self.synchronizer.signal_completion(
267
+ "report_generation_agent",
268
+ f"generate_report_{workflow_id}",
269
+ {"error": str(e)}
270
+ )
271
+ # Arrive at barrier with error
272
+ self.synchronizer.arrive_at_barrier(
273
+ report_barrier_id,
274
+ "report_generation_agent",
275
+ {"error": str(e)}
276
+ )
277
+ return {"error": str(e)}
278
+
279
+ # Start in background thread
280
+ report_thread = threading.Thread(target=report_generation_task)
281
+ report_thread.daemon = True
282
+ report_thread.start()
283
+
284
+ # Wait for report to be ready
285
+ if not self.synchronizer.wait_for_barrier(report_barrier_id, timeout=300): # 5 minute timeout
286
+ self.logger.error(f"Timeout waiting for report generation")
287
+ session["status"] = "error"
288
+ return {
289
+ "error": "Timeout waiting for report generation",
290
+ "status": "timeout",
291
+ "workflow_id": workflow_id,
292
+ "partial_results": {
293
+ "text_analysis": text_analysis,
294
+ "image_analysis": image_analysis
295
+ }
296
+ }
297
+
298
+ # Get report
299
+ barrier_data = self.synchronizer.get_barrier_data(report_barrier_id)
300
+ report = barrier_data.get("report_generation_agent", {})
301
+
302
+ # Check for errors
303
+ if "error" in report:
304
+ session["status"] = "error"
305
+ return {
306
+ "error": "Report generation failed",
307
+ "report_error": report.get("error", "Unknown error"),
308
+ "status": "error",
309
+ "workflow_id": workflow_id,
310
+ "partial_results": {
311
+ "text_analysis": text_analysis,
312
+ "image_analysis": image_analysis
313
+ }
314
+ }
315
+
316
+ # Update session status
317
+ session["status"] = "completed"
318
+ session["last_result"] = report
319
+
320
+ # Get sustainability metrics if available
321
+ sustainability_metrics = None
322
+ if hasattr(self, "metrics_agent") and self.metrics_agent:
323
+ try:
324
+ sustainability_metrics = self.metrics_agent.generate_sustainability_report()
325
+ except Exception as e:
326
+ self.logger.error(f"Error getting sustainability metrics: {str(e)}")
327
+
328
+ # Return final result
329
+ return {
330
+ "status": "completed",
331
+ "workflow_id": workflow_id,
332
+ "topic": topic,
333
+ "report": report,
334
+ "sustainability_metrics": sustainability_metrics
335
+ }
336
+ else:
337
+ # No report generation, return analysis results
338
+ session["status"] = "completed"
339
+ return {
340
+ "status": "completed",
341
+ "workflow_id": workflow_id,
342
+ "topic": topic,
343
+ "results": {
344
+ "text_analysis": text_analysis,
345
+ "image_analysis": image_analysis
346
+ }
347
+ }
348
+
349
+
350
+ def _register_fallbacks(self):
351
+ """Register fallback functions for critical operations."""
352
+ # Fallback for process_request
353
+ self.error_handler.register_fallback(
354
+ "orchestrator", "process_request",
355
+ self._fallback_process_request
356
+ )
357
+
358
+ # Fallback for coordinator workflow execution
359
+ self.error_handler.register_fallback(
360
+ "coordinator_agent", "execute_workflow",
361
+ self._fallback_execute_workflow
362
+ )
363
+
364
+ def _fallback_process_request(self, context):
365
+ """Fallback function for processing requests."""
366
+ # Extract what we can from the context
367
+ kwargs = context.get("kwargs", {})
368
+ topic = kwargs.get("topic", "unknown")
369
+ session_id = kwargs.get("session_id", "unknown")
370
+
371
+ # Check if we have a session
372
+ if session_id in self.active_sessions:
373
+ session = self.active_sessions[session_id]
374
+ session["status"] = "error"
375
+ session["error"] = "Request processing failed, using fallback"
376
+
377
+ return {
378
+ "status": "error",
379
+ "message": "An error occurred while processing your request. Using simplified processing.",
380
+ "topic": topic,
381
+ "fallback": True,
382
+ "result": {
383
+ "confidence_level": "low",
384
+ "summary": "Unable to process request fully. Please try again or simplify your query."
385
+ }
386
+ }
387
+
388
+ def _fallback_execute_workflow(self, context):
389
+ """Fallback function for workflow execution."""
390
+ # We can attempt direct coordination as a fallback
391
+ try:
392
+ if hasattr(self.coordinator_agent, "_direct_coordination"):
393
+ # Extract current topic and files from coordinator agent state
394
+ topic = self.coordinator_agent.current_topic
395
+ if topic and topic in self.coordinator_agent.workflow_state:
396
+ workflow = self.coordinator_agent.workflow_state[topic]
397
+ text_files = workflow.get("text_files", [])
398
+ image_files = workflow.get("image_files", [])
399
+
400
+ # Try direct coordination
401
+ return self.coordinator_agent._direct_coordination(topic, text_files, image_files)
402
+
403
+ # If we can't do direct coordination, return a basic error response
404
+ return {
405
+ "status": "error",
406
+ "message": "Workflow execution failed. Using fallback.",
407
+ "fallback": True
408
+ }
409
+ except Exception as e:
410
+ self.logger.error(f"Fallback for execute_workflow also failed: {str(e)}")
411
+ return {
412
+ "status": "critical_error",
413
+ "message": "Both primary and fallback execution failed."
414
+ }
415
+
416
+ #@with_error_handling("orchestrator", "create_session", lambda self: self.error_handler)
417
+ @with_error_handling("orchestrator", "create_session")
418
+ def create_session(self) -> str:
419
+ """Create a new session and return session ID."""
420
+ session_id = f"session_{int(time.time())}_{self.session_counter}"
421
+ self.session_counter += 1
422
+
423
+ self.active_sessions[session_id] = {
424
+ "created_at": datetime.now().isoformat(),
425
+ "status": "initialized",
426
+ "workflows": [],
427
+ "current_workflow": None
428
+ }
429
+
430
+ self.logger.info(f"Created new session: {session_id}")
431
+ return session_id
432
+
433
+ #@with_error_handling("orchestrator", "process_request", lambda self: self.error_handler)
434
+ @with_error_handling("orchestrator", "process_request")
435
+ def process_request(self, session_id: str, topic: str, text_files: List[str],
436
+ image_files: List[str]) -> Dict[str, Any]:
437
+ """
438
+ Process a user request within a session.
439
+ Coordinates the workflow through the coordinator agent.
440
+ """
441
+ if session_id not in self.active_sessions:
442
+ return {"error": f"Session {session_id} not found. Please create a new session."}
443
+
444
+ session = self.active_sessions[session_id]
445
+ session["status"] = "processing"
446
+
447
+ # Initialize workflow via coordinator
448
+ workflow_result = self.coordinator_agent.initialize_workflow(topic, text_files, image_files)
449
+
450
+ # Store workflow ID in session
451
+ workflow_id = workflow_result.get("workflow_id")
452
+ if workflow_id:
453
+ session["workflows"].append(workflow_id)
454
+ session["current_workflow"] = workflow_id
455
+
456
+ # Execute workflow with error handling
457
+ try:
458
+ # Try to execute with error handling
459
+ result = self._execute_workflow_with_error_handling()
460
+ except Exception as e:
461
+ # If that fails, try direct execution as a last resort
462
+ self.logger.error(f"Error executing workflow with error handling: {str(e)}")
463
+ result = self.coordinator_agent.execute_workflow()
464
+
465
+ # Update session status
466
+ session["status"] = "completed" if not result.get("error") else "error"
467
+ session["last_result"] = result
468
+
469
+ return result
470
+
471
+ def _execute_workflow_with_error_handling(self) -> Dict[str, Any]:
472
+ """Execute workflow with error handling."""
473
+ try:
474
+ result = self.coordinator_agent.execute_workflow()
475
+ self.error_handler.record_success("coordinator_agent", "execute_workflow")
476
+ return result
477
+ except Exception as e:
478
+ # Create context
479
+ context = {
480
+ "orchestrator": self,
481
+ "coordinator_agent": self.coordinator_agent
482
+ }
483
+
484
+ # Handle the error
485
+ handled, fallback_result = self.error_handler.handle_error(
486
+ "coordinator_agent", "execute_workflow", e, context)
487
+
488
+ if handled:
489
+ return fallback_result
490
+ else:
491
+ # Re-raise the exception if not handled
492
+ raise
493
+
494
+ #@with_error_handling("orchestrator", "get_session_status", lambda self: self.error_handler)
495
+ @with_error_handling("orchestrator", "get_session_status")
496
+ def get_session_status(self, session_id: str) -> Dict[str, Any]:
497
+ """Get the status of a session."""
498
+ if session_id not in self.active_sessions:
499
+ return {"error": f"Session {session_id} not found"}
500
+
501
+ session = self.active_sessions[session_id]
502
+
503
+ # If there's an active workflow, get its status
504
+ if session.get("current_workflow"):
505
+ try:
506
+ workflow_status = self.coordinator_agent.get_workflow_status(
507
+ session["current_workflow"])
508
+
509
+ return {
510
+ "session_id": session_id,
511
+ "status": session["status"],
512
+ "created_at": session["created_at"],
513
+ "workflows": session["workflows"],
514
+ "current_workflow": session["current_workflow"],
515
+ "workflow_status": workflow_status
516
+ }
517
+ except Exception as e:
518
+ # If getting workflow status fails, return basic session info
519
+ self.logger.error(f"Error getting workflow status: {str(e)}")
520
+ return {
521
+ "session_id": session_id,
522
+ "status": session["status"],
523
+ "created_at": session["created_at"],
524
+ "workflows": session["workflows"],
525
+ "error": "Failed to retrieve detailed workflow status"
526
+ }
527
+ else:
528
+ return {
529
+ "session_id": session_id,
530
+ "status": session["status"],
531
+ "created_at": session["created_at"],
532
+ "workflows": session["workflows"]
533
+ }
534
+
535
+ #@with_error_handling("orchestrator", "cleanup_session", lambda self: self.error_handler)
536
+ @with_error_handling("orchestrator", "cleanup_session")
537
+ def cleanup_session(self, session_id: str) -> Dict[str, Any]:
538
+ """Clean up resources for a session."""
539
+ if session_id not in self.active_sessions:
540
+ return {"error": f"Session {session_id} not found"}
541
+
542
+ session = self.active_sessions[session_id]
543
+
544
+ # Clean up any active workflows
545
+ if session.get("current_workflow"):
546
+ try:
547
+ self.coordinator_agent.cleanup_workflow(session["current_workflow"])
548
+ except Exception as e:
549
+ self.logger.error(f"Error cleaning up workflow: {str(e)}")
550
+ # Continue with session cleanup even if workflow cleanup fails
551
+
552
+ # Mark session as cleaned up
553
+ session["status"] = "cleaned_up"
554
+
555
+ return {
556
+ "session_id": session_id,
557
+ "status": "cleaned_up",
558
+ "message": "Session resources have been cleaned up"
559
+ }
560
+
561
+ #@with_error_handling("orchestrator", "get_sustainability_metrics", lambda self: self.error_handler)
562
+ @with_error_handling("orchestrator", "get_sustainability_metrics")
563
+ def get_sustainability_metrics(self, session_id: Optional[str] = None) -> Dict[str, Any]:
564
+ """
565
+ Get sustainability metrics for a session or the entire system.
566
+ If session_id is provided, returns metrics for that session only.
567
+ """
568
+ if not self.metrics_calculator:
569
+ return {"error": "Metrics calculator not available"}
570
+
571
+ if session_id:
572
+ # TODO: Implement session-specific metrics
573
+ # For now, return global metrics
574
+ return self.metrics_calculator.get_all_metrics()
575
+ else:
576
+ # Return global metrics
577
+ return self.metrics_calculator.get_all_metrics()
578
+
579
+ def get_error_report(self) -> Dict[str, Any]:
580
+ """Get error report from the error handler."""
581
+ if not self.error_handler:
582
+ return {"error": "Error handler not available"}
583
+
584
+ return self.error_handler.get_error_report()