# process_discovery_engine.py import numpy as np import pandas as pd from typing import Dict, List, Tuple, Optional from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity import spacy import json import re import networkx as nx from sklearn.cluster import DBSCAN class ProcessDiscoveryEngine: """ Discovers and analyzes business processes from various data sources including logs, documents, and recorded user activities. """ def __init__(self, config: Dict): """ Initialize the process discovery engine. Args: config: Configuration dictionary with parameters """ self.min_frequency = config.get('min_frequency', 0.05) self.time_threshold = config.get('time_threshold', 60) # seconds self.similarity_threshold = config.get('similarity_threshold', 0.75) self.process_graph = nx.DiGraph() def ingest_log_data(self, log_data: pd.DataFrame) -> bool: """ Ingest process log data from system logs. Args: log_data: DataFrame containing log entries with timestamp, user, action columns Returns: bool: Success status """ if 'timestamp' not in log_data.columns or 'action' not in log_data.columns: return False # Sort by timestamp sorted_logs = log_data.sort_values('timestamp') # Group by case_id if available if 'case_id' in sorted_logs.columns: case_groups = sorted_logs.groupby('case_id') for case_id, case_data in case_groups: self._process_sequence(case_data['action'].tolist(), source=f"log:{case_id}") else: # Try to identify sessions based on time gaps self._segment_and_process_logs(sorted_logs) return True def ingest_screen_recordings(self, recording_analysis: List[Dict]) -> bool: """ Ingest analyzed screen recording data. Args: recording_analysis: List of dictionaries containing screen activities Returns: bool: Success status """ for session in recording_analysis: if 'actions' in session and isinstance(session['actions'], list): action_sequence = [a['activity'] for a in session['actions'] if 'activity' in a] self._process_sequence(action_sequence, source=f"recording:{session.get('id', 'unknown')}") return True def _segment_and_process_logs(self, logs: pd.DataFrame) -> None: """ Segment logs into probable process instances based on time gaps. Args: logs: DataFrame of logs sorted by timestamp """ logs['timestamp'] = pd.to_datetime(logs['timestamp']) logs['time_diff'] = logs['timestamp'].diff().dt.total_seconds() # Mark new sequences where time difference exceeds threshold new_sequence = logs['time_diff'] > self.time_threshold logs['sequence_id'] = new_sequence.cumsum() # Process each sequence for seq_id, sequence in logs.groupby('sequence_id'): self._process_sequence(sequence['action'].tolist(), source=f"timegap:{seq_id}") def _process_sequence(self, actions: List[str], source: str) -> None: """ Process a sequence of actions into the process graph. Args: actions: List of action names in sequence source: Data source identifier """ for i in range(len(actions) - 1): current = actions[i] next_action = actions[i+1] # Add nodes if they don't exist if current not in self.process_graph: self.process_graph.add_node(current, count=0, sources=set()) if next_action not in self.process_graph: self.process_graph.add_node(next_action, count=0, sources=set()) # Update node data self.process_graph.nodes[current]['count'] += 1 self.process_graph.nodes[current]['sources'].add(source) # Add or update edge if self.process_graph.has_edge(current, next_action): self.process_graph[current][next_action]['weight'] += 1 self.process_graph[current][next_action]['sources'].add(source) else: self.process_graph.add_edge(current, next_action, weight=1, sources={source}) def discover_main_process_paths(self) -> List[Dict]: """ Discover the main process paths from the constructed graph. Returns: List of dictionaries describing main process paths """ # Filter edges by frequency total_transitions = sum(data['weight'] for _, _, data in self.process_graph.edges(data=True)) if total_transitions == 0: return [] min_edge_weight = total_transitions * self.min_frequency significant_edges = [(u, v) for u, v, d in self.process_graph.edges(data=True) if d['weight'] > min_edge_weight] # Create subgraph with only significant edges significant_graph = self.process_graph.edge_subgraph(significant_edges).copy() # Find all simple paths from potential start nodes to end nodes start_nodes = [n for n in significant_graph.nodes() if significant_graph.in_degree(n) == 0 or significant_graph.in_degree(n) < significant_graph.out_degree(n)] end_nodes = [n for n in significant_graph.nodes() if significant_graph.out_degree(n) == 0 or significant_graph.out_degree(n) < significant_graph.in_degree(n)] # If no clear start/end, use nodes with highest centrality if not start_nodes: centrality = nx.degree_centrality(significant_graph) start_nodes = [max(centrality, key=centrality.get)] if not end_nodes: centrality = nx.degree_centrality(significant_graph) end_nodes = [max(centrality, key=centrality.get)] # Find all paths between start and end nodes all_paths = [] for start in start_nodes: for end in end_nodes: try: paths = list(nx.all_simple_paths(significant_graph, start, end)) all_paths.extend(paths) except nx.NetworkXNoPath: continue # Calculate path frequency and return top paths path_data = [] for path in all_paths: # Calculate path strength as minimum edge weight along path edge_weights = [significant_graph[path[i]][path[i+1]]['weight'] for i in range(len(path)-1)] path_strength = min(edge_weights) if edge_weights else 0 path_data.append({ 'path': path, 'strength': path_strength, 'length': len(path), 'avg_edge_weight': sum(edge_weights) / len(edge_weights) if edge_weights else 0 }) # Sort by path strength descending path_data.sort(key=lambda x: x['strength'], reverse=True) return path_data def identify_process_variants(self) -> List[Dict]: """ Identify variants of the same basic process. Returns: List of process variant clusters """ if len(self.process_graph) < 2: return [] # Extract features for clustering paths = self.discover_main_process_paths() if not paths: return [] # Create feature vectors from paths all_activities = sorted(list(self.process_graph.nodes())) activity_indices = {act: i for i, act in enumerate(all_activities)} # Create feature vectors (activity presence and position) feature_vectors = [] for path_data in paths: path = path_data['path'] vector = np.zeros(len(all_activities) * 2) # Mark presence and relative position of activities for pos, activity in enumerate(path): idx = activity_indices[activity] vector[idx] = 1 # presence vector[idx + len(all_activities)] = pos / len(path) # relative position feature_vectors.append(vector) # Cluster paths using DBSCAN if len(feature_vectors) < 2: return [{'variant_id': 0, 'paths': paths}] clustering = DBSCAN(eps=0.3, min_samples=1).fit(feature_vectors) labels = clustering.labels_ # Group paths by cluster variants = {} for i, label in enumerate(labels): label_str = str(label) if label_str not in variants: variants[label_str] = [] variants[label_str].append(paths[i]) # Format result result = [ {'variant_id': variant_id, 'paths': variant_paths} for variant_id, variant_paths in variants.items() ] return result def get_process_stats(self) -> Dict: """ Get statistics about the discovered process. Returns: Dictionary with process statistics """ if not self.process_graph: return {"error": "No process data available"} stats = { "num_activities": len(self.process_graph.nodes()), "num_transitions": len(self.process_graph.edges()), "most_frequent_activities": [], "most_frequent_transitions": [], "process_complexity": 0, "data_sources": set() } # Most frequent activities activities = [(node, data['count']) for node, data in self.process_graph.nodes(data=True)] activities.sort(key=lambda x: x[1], reverse=True) stats["most_frequent_activities"] = activities[:10] # Most frequent transitions transitions = [(u, v, data['weight']) for u, v, data in self.process_graph.edges(data=True)] transitions.sort(key=lambda x: x[2], reverse=True) stats["most_frequent_transitions"] = transitions[:10] # Process complexity (using Control-Flow Complexity metric) stats["process_complexity"] = sum(self.process_graph.out_degree(n) for n in self.process_graph.nodes()) # Data sources for _, data in self.process_graph.nodes(data=True): if 'sources' in data: stats["data_sources"].update(data['sources']) stats["data_sources"] = list(stats["data_sources"]) return stats def export_process_model(self, format_type: str = 'bpmn') -> Dict: """ Export the discovered process in the specified format. Args: format_type: Output format ('bpmn', 'petri_net', or 'json') Returns: Dictionary with export data and metadata """ if format_type == 'json': nodes = [{"id": n, "count": data.get('count', 0)} for n, data in self.process_graph.nodes(data=True)] edges = [{"source": u, "target": v, "weight": data.get('weight', 0)} for u, v, data in self.process_graph.edges(data=True)] return { "format": "json", "process_model": { "nodes": nodes, "edges": edges } } elif format_type == 'bpmn': # Basic BPMN conversion (simplified) # In a real implementation, this would generate actual BPMN XML return { "format": "bpmn", "process_model": { "process_id": "discovered_process", "activities": list(self.process_graph.nodes()), "flows": [(u, v) for u, v in self.process_graph.edges()], "gateways": self._identify_potential_gateways() } } elif format_type == 'petri_net': # Basic Petri net conversion (simplified) return { "format": "petri_net", "process_model": { "places": self._generate_petri_net_places(), "transitions": list(self.process_graph.nodes()), "arcs": self._generate_petri_net_arcs() } } else: return {"error": f"Unsupported export format: {format_type}"} def _identify_potential_gateways(self) -> List[Dict]: """ Identify potential gateways in the process based on branching. Returns: List of potential gateway nodes """ gateways = [] for node in self.process_graph.nodes(): in_degree = self.process_graph.in_degree(node) out_degree = self.process_graph.out_degree(node) # Potential XOR-split (one input, multiple outputs) if in_degree == 1 and out_degree > 1: gateways.append({ "id": f"xor_split_{node}", "type": "exclusive_gateway", "direction": "split", "attached_to": node }) # Potential XOR-join (multiple inputs, one output) elif in_degree > 1 and out_degree == 1: gateways.append({ "id": f"xor_join_{node}", "type": "exclusive_gateway", "direction": "join", "attached_to": node }) # Potential AND-split/join or complex gateway elif in_degree > 1 and out_degree > 1: gateways.append({ "id": f"complex_{node}", "type": "complex_gateway", "direction": "mixed", "attached_to": node }) return gateways def _generate_petri_net_places(self) -> List[str]: """ Generate places for a Petri net representation. Returns: List of place IDs """ places = [] # Generate places between each pair of activities for u, v in self.process_graph.edges(): places.append(f"p_{u}_{v}") # Add start and end places start_nodes = [n for n in self.process_graph.nodes() if self.process_graph.in_degree(n) == 0] for node in start_nodes: places.append(f"p_start_{node}") end_nodes = [n for n in self.process_graph.nodes() if self.process_graph.out_degree(n) == 0] for node in end_nodes: places.append(f"p_{node}_end") return places def _generate_petri_net_arcs(self) -> List[Tuple[str, str]]: """ Generate arcs for a Petri net representation. Returns: List of (source, target) tuples representing arcs """ arcs = [] # Connect transitions through places for u, v in self.process_graph.edges(): place = f"p_{u}_{v}" arcs.append((u, place)) arcs.append((place, v)) # Connect start places to initial transitions start_nodes = [n for n in self.process_graph.nodes() if self.process_graph.in_degree(n) == 0] for node in start_nodes: arcs.append((f"p_start_{node}", node)) # Connect final transitions to end places end_nodes = [n for n in self.process_graph.nodes() if self.process_graph.out_degree(n) == 0] for node in end_nodes: arcs.append((node, f"p_{node}_end")) return arcs # requirements_analysis_module.py class RequirementsAnalysisModule: """ Analyzes business requirements and connects them to processes. Extracts structured data from natural language requirements. """ def __init__(self, config: Dict = None): """ Initialize the requirements analysis module. Args: config: Configuration dictionary """ self.config = config or {} # Load NLP model try: self.nlp = spacy.load("en_core_web_md") except: # Fallback to small model if medium not available self.nlp = spacy.load("en_core_web_sm") # Initialize requirements storage self.requirements = [] # Initialize taxonomy and patterns self._load_taxonomies() self._compile_requirement_patterns() def _load_taxonomies(self) -> None: """Load or initialize the business process taxonomy.""" # In production, this would load from a file or database self.process_taxonomy = { "financial": [ "invoice processing", "accounts payable", "accounts receivable", "payment processing", "financial reporting", "expense management" ], "hr": [ "onboarding", "offboarding", "payroll", "recruitment", "employee management", "benefits administration", "time tracking" ], "customer_service": [ "ticket management", "customer support", "inquiry handling", "complaint resolution", "feedback processing" ], "operations": [ "inventory management", "supply chain", "logistics", "order processing", "shipping", "receiving", "quality control" ], "sales": [ "lead management", "opportunity tracking", "quote generation", "contract management", "sales reporting", "commission calculation" ], "it": [ "access management", "incident management", "change management", "service request", "problem management", "release management" ] } # Complexity indicators for requirements self.complexity_indicators = { "high": [ "complex", "multiple systems", "integration", "decision tree", "exception handling", "compliance", "regulatory", "manual review", "approval workflow", "conditional logic", "business rules" ], "medium": [ "validation", "verification", "notification", "alert", "scheduled", "reporting", "dashboard", "data transformation" ], "low": [ "simple", "straightforward", "data entry", "form filling", "standard", "single system", "fixed path", "static rules" ] } def _compile_requirement_patterns(self) -> None: """Compile regex patterns for requirement extraction.""" # Action patterns self.action_patterns = [ r"(?:need|should|must|will|shall) (?:to )?([a-z]+)", r"responsible for ([a-z]+ing)", r"capability to ([a-z]+)", r"ability to ([a-z]+)" ] # System patterns self.system_patterns = [ r"(?:in|from|to|using|within) (?:the )?([A-Za-z0-9]+)(?: system| application| platform| software| tool)?", r"([A-Za-z0-9]+)(?: system| application| platform| software| tool)", r"([A-Za-z0-9]+) (?:database|interface|API|server)" ] # Frequency patterns self.frequency_patterns = [ r"(daily|weekly|monthly|quarterly|yearly|annually)", r"every ([0-9]+) (day|week|month|quarter|year)s?", r"([0-9]+) times per (day|week|month|year)" ] # Compile all patterns self.action_regex = [re.compile(pattern) for pattern in self.action_patterns] self.system_regex = [re.compile(pattern) for pattern in self.system_patterns] self.frequency_regex = [re.compile(pattern) for pattern in self.frequency_patterns] def analyze_text_requirement(self, requirement_text: str, source: str = None) -> Dict: """ Analyze a natural language requirement and extract structured information. Args: requirement_text: The text of the requirement source: Source of the requirement Returns: Dictionary with extracted requirement information """ # Parse with spaCy doc = self.nlp(requirement_text) # Basic requirement object requirement = { "id": f"REQ-{len(self.requirements) + 1}", "text": requirement_text, "source": source, "extracted": { "actions": self._extract_actions(doc, requirement_text), "systems": self._extract_systems(doc, requirement_text), "frequency": self._extract_frequency(requirement_text), "business_domain": self._classify_business_domain(doc), "complexity": self._assess_complexity(doc, requirement_text), "data_elements": self._extract_data_elements(doc) }, "automation_potential": None # Will be filled later } # Store the requirement self.requirements.append(requirement) return requirement def _extract_actions(self, doc, text: str) -> List[str]: """ Extract action verbs from requirement text. Args: doc: spaCy processed document text: Original text Returns: List of action verbs """ # Method 1: Use spaCy to find verbs verbs = [token.lemma_ for token in doc if token.pos_ == "VERB"] # Method 2: Use regex patterns pattern_matches = [] for pattern in self.action_regex: matches = pattern.findall(text.lower()) pattern_matches.extend(matches) # Combine and deduplicate all_actions = list(set(verbs + pattern_matches)) # Filter out common non-action verbs stopwords = ["be", "is", "are", "was", "were", "have", "has", "had"] filtered_actions = [v for v in all_actions if v not in stopwords and len(v) > 2] return filtered_actions def _extract_systems(self, doc, text: str) -> List[str]: """ Extract system names from requirement text. Args: doc: spaCy processed document text: Original text Returns: List of system names """ # Method 1: Named Entity Recognition for PRODUCT entities ner_systems = [ent.text for ent in doc.ents if ent.label_ in ["PRODUCT", "ORG", "GPE"]] # Method 2: Pattern matching pattern_systems = [] for pattern in self.system_regex: matches = pattern.findall(text) pattern_systems.extend(matches) # Combine results all_systems = list(set(ner_systems + pattern_systems)) # Filter out common false positives stopwords = ["system", "process", "application", "data", "information", "this", "the"] filtered_systems = [s for s in all_systems if s.lower() not in stopwords and len(s) > 2] return filtered_systems def _extract_frequency(self, text: str) -> Optional[str]: """ Extract frequency information from requirement text. Args: text: Requirement text Returns: Extracted frequency or None """ text_lower = text.lower() # Check all frequency patterns for pattern in self.frequency_regex: match = pattern.search(text_lower) if match: return match.group(0) # Check for specific frequency words frequency_words = ["daily", "weekly", "monthly", "quarterly", "annually", "yearly"] for word in frequency_words: if word in text_lower: return word return None def _classify_business_domain(self, doc) -> List[Tuple[str, float]]: """ Classify the business domain of the requirement. Args: doc: spaCy processed document Returns: List of (domain, confidence) tuples """ text = doc.text.lower() domain_scores = {} # Calculate score for each domain based on keyword matches for domain, keywords in self.process_taxonomy.items(): domain_score = 0 for keyword in keywords: if keyword in text: domain_score += 1 if domain_score > 0: # Normalize by number of keywords domain_scores[domain] = domain_score / len(keywords) # If no direct matches, use semantic similarity if not domain_scores: for domain, keywords in self.process_taxonomy.items(): # Calculate average similarity between doc and each keyword similarities = [doc.similarity(self.nlp(keyword)) for keyword in keywords] avg_similarity = sum(similarities) / len(similarities) if similarities else 0 if avg_similarity > 0.5: # Threshold for relevance domain_scores[domain] = avg_similarity # Sort by score and return sorted_domains = sorted(domain_scores.items(), key=lambda x: x[1], reverse=True) return sorted_domains def _assess_complexity(self, doc, text: str) -> str: """ Assess the complexity of the requirement. Args: doc: spaCy processed document text: Original text Returns: Complexity level ("high", "medium", or "low") """ text_lower = text.lower() # Count indicators for each complexity level scores = {level: 0 for level in self.complexity_indicators.keys()} for level, indicators in self.complexity_indicators.items(): for indicator in indicators: if indicator in text_lower: scores[level] += 1 # Check sentence structure complexity sentence_count = len(list(doc.sents)) avg_tokens_per_sentence = len(doc) / sentence_count if sentence_count > 0 else 0 # Adjust scores based on structural complexity if avg_tokens_per_sentence > 25: scores["high"] += 1 elif avg_tokens_per_sentence > 15: scores["medium"] += 1 # Check for conditional statements (if/then) if "if" in text_lower and ("then" in text_lower or "else" in text_lower): scores["high"] += 1 # Determine final complexity if scores["high"] > 0: return "high" elif scores["medium"] > 0: return "medium" else: return "low" def _extract_data_elements(self, doc) -> List[str]: """ Extract data elements from the requirement text. Args: doc: spaCy processed document Returns: List of data elements """ # Find noun chunks that could be data elements data_elements = [] for chunk in doc.noun_chunks: # Check if this looks like a data field if (any(token.pos_ == "NOUN" for token in chunk) and len(chunk) <= 4 and # Not too long not any(token.is_stop for token in chunk)): # Not all stopwords data_elements.append(chunk.text) # Look for specific data patterns data_patterns = [ (r"\b[A-Z][a-z]+ ID\b", "ID field"), (r"\b[A-Z][a-z]+ Number\b", "Number field"), (r"\b[A-Z][a-z]+ Code\b", "Code field"), (r"\b[A-Z][a-z]+ Date\b", "Date field"), (r"\bstatus\b", "Status field") ] for pattern, field_type in data_patterns: if re.search(pattern, doc.text): data_elements.append(field_type) return list(set(data_elements)) def analyze_requirements_batch(self, requirements: List[Dict]) -> List[Dict]: """ Analyze a batch of requirements and find relationships between them. Args: requirements: List of requirement dictionaries with 'text' field Returns: List of analyzed requirements """ # Process each requirement processed_requirements = [] for req in requirements: req_text = req.get('text', '') source = req.get('source', 'batch') processed = self.analyze_text_requirement(req_text, source) processed_requirements.append(processed) # Find relationships between requirements self._find_requirement_relationships(processed_requirements) return processed_requirements def _find_requirement_relationships(self, requirements: List[Dict]) -> None: """ Find and add relationships between requirements. Args: requirements: List of processed requirements """ if len(requirements) < 2: return # Extract text from requirements texts = [req["text"] for req in requirements] # Create TF-IDF matrix vectorizer = TfidfVectorizer(stop_words='english') tfidf_matrix = vectorizer.fit_transform(texts) # Calculate similarity matrix similarity_matrix = cosine_similarity(tfidf_matrix) # Add relationships to requirements for i, req in enumerate(requirements): related = [] for j, similarity in enumerate(similarity_matrix[i]): if i != j and similarity > 0.3: # Threshold for relationship related.append({ "id": requirements[j]["id"], "similarity": float(similarity), "relationship_type": self._determine_relationship_type(req, requirements[j]) }) # Sort by similarity related.sort(key=lambda x: x["similarity"], reverse=True) # Add to requirement req["related_requirements"] = related[:5] # Top 5 related requirements def _determine_relationship_type(self, req1: Dict, req2: Dict) -> str: """ Determine the type of relationship between two requirements. Args: req1: First requirement req2: Second requirement Returns: Relationship type string """ # Check for system relationships systems1 = set(req1["extracted"]["systems"]) systems2 = set(req2["extracted"]["systems"]) if systems1.intersection(systems2): return "same_system" # Check for business domain relationships domains1 = [d[0] for d in req1["extracted"]["business_domain"]] domains2 = [d[0] for d in req2["extracted"]["business_domain"]] if set(domains1).intersection(set(domains2)): return "same_domain" # Check for action relationships actions1 = set(req1["extracted"]["actions"]) actions2 = set(req2["extracted"]["actions"]) if actions1.intersection(actions2): return "similar_action" # Default relationship type return "related" def map_requirements_to_processes(self, requirements: List[Dict], process_models: List[Dict]) -> Dict: """ Map requirements to process models based on content matching. Args: requirements: List of analyzed requirements process_models: List of process model dictionaries Returns: Dictionary mapping process IDs to requirement IDs """ process_to_reqs = {} req_to_process = {} for process in process_models: process_id = process.get("id", "unknown") process_text = process.get("description", "") + " " + process.get("name", "") process_doc = self.nlp(process_text) # Find matching requirements matching_reqs = [] for req in requirements: req_text = req["text"] req_doc = self.nlp(req_text) # Calculate similarity similarity = process_doc.similarity(req_doc) if similarity > 0.6: # Threshold for matching matching_reqs.append({ "req_id": req["id"], "similarity": float(similarity) }) req_to_process[req["id"]] = process_id # Sort by similarity matching_reqs.sort(key=lambda x: x["similarity"], reverse=True) process_to_reqs[process_id] = matching_reqs return { "process_to_requirements": process_to_reqs, "requirement_to_process": req_to_process } def evaluate_automation_potential(self, requirement: Dict) -> Dict: """ Evaluate the automation potential of a requirement. Args: requirement: Analyzed requirement Returns: Automation potential assessment """ # Basic score starts at 5 out of 10 score = 5 # Complexity factor (high complexity decreases score) complexity = requirement["extracted"]["complexity"] if complexity == "high": score -= 2 elif complexity == "low": score += 2 # Action factor (certain actions are more automatable) automatable_actions = ["extract", "transfer", "copy", "move", "calculate", "update", "generate", "validate", "verify", "send", "notify", "schedule", "retrieve", "check"] for action in requirement["extracted"]["actions"]: if action in automatable_actions: score += 0.5 # System factor (presence of systems increases score) if requirement["extracted"]["systems"]: score += len(requirement["extracted"]["systems"]) * 0.5 # Data elements factor (more data elements suggests more structure) data_elements = requirement["extracted"]["data_elements"] if data_elements: score += min(len(data_elements) * 0.3, 2) # Cap at +2 # Cap score between 1-10 score = max(1, min(10, score)) # Determine category category = "high" if score >= 7.5 else "medium" if score >= 5 else "low" # Identify automation technology tech = self._recommend_automation_technology(requirement, score) return { "automation_score": round(score, 1), "automation_category": category, "recommended_technology": tech, "rationale": self._generate_automation_rationale(requirement, score, category) } def _recommend_automation_technology(self, requirement: Dict, score: float) -> str: """ Recommend suitable automation technology. Args: requirement: Analyzed requirement score: Automation score Returns: Recommended technology """ complexity = requirement["extracted"]["complexity"] actions = requirement["extracted"]["actions"] # Decision tree for technology recommendation if score >= 8: if any(a in actions for a in ["extract", "scrape", "read"]): return "RPA with OCR/Document Understanding" else: return "Traditional RPA" elif score >= 5: if complexity == "high": return "RPA with Human-in-the-Loop" elif any(a in actions for a in ["decide", "evaluate", "assess"]): return "RPA with Decision Automation" else: return "Traditional RPA" else: if any(a in actions for a in ["review", "approve"]): return "Workflow Automation" else: return "Partial Automation with Human Tasks" def _generate_automation_rationale(self, requirement: Dict, score: float, category: str) -> str: """ Generate explanation for automation assessment. Args: requirement: Analyzed requirement score: Automation score category: Automation category Returns: Rationale text """ complexity = requirement["extracted"]["complexity"] if category == "high": return (f"This requirement has {complexity} complexity but shows strong automation " f"potential due to clear structure and defined data elements. " f"Score of {score}/10 indicates this is a prime automation candidate.") elif category == "medium": return (f"This {complexity} complexity requirement has moderate automation potential. " f"Score of {score}/10 suggests partial automation with some human oversight.") else: return (f"The {complexity} complexity and ambiguous nature of this requirement " f"limits automation potential. Score of {score}/10 indicates this may " f"require significant human involvement or process redesign.") def assess_requirements_automation_potential(self, requirements: List[Dict]) -> List[Dict]: """ Assess automation potential for a batch of requirements. Args: requirements: List of analyzed requirements Returns: Requirements with automation assessment added """ for req in requirements: req["automation_potential"] = self.evaluate_automation_potential(req) return requirements def generate_requirements_report(self, requirements: List[Dict]) -> Dict: """ Generate a summary report of requirements analysis. Args: requirements: List of analyzed requirements Returns: Report dictionary """ # Count by complexity complexity_counts = {"high": 0, "medium": 0, "low": 0} for req in requirements: complexity = req["extracted"]["complexity"] complexity_counts[complexity] += 1 # Count by automation potential if all("automation_potential" in req for req in requirements): automation_counts = {"high": 0, "medium": 0, "low": 0} for req in requirements: category = req["automation_potential"]["automation_category"] automation_counts[category] += 1 else: automation_counts = None # Find common systems all_systems = [] for req in requirements: all_systems.extend(req["extracted"]["systems"]) system_counts = {} for system in all_systems: if system in system_counts: system_counts[system] += 1 else: system_counts[system] = 1 # Sort systems by frequency top_systems = sorted(system_counts.items(), key=lambda x: x[1], reverse=True)[:5] # Generate report report = { "total_requirements": len(requirements), "complexity_distribution": complexity_counts, "automation_potential": automation_counts, "top_systems": top_systems, "recommendations": self._generate_overall_recommendations(requirements) } return report def _generate_overall_recommendations(self, requirements: List[Dict]) -> List[str]: """ Generate overall recommendations based on requirements analysis. Args: requirements: List of analyzed requirements Returns: List of recommendation strings """ recommendations = [] # Check if automation assessment is available automation_available = all("automation_potential" in req for req in requirements) if automation_available: # Count high automation potential requirements high_potential = [r for r in requirements if r["automation_potential"]["automation_category"] == "high"] if len(high_potential) >= len(requirements) * 0.7: recommendations.append( "High automation potential across most requirements. " "Consider an end-to-end automation solution." ) elif len(high_potential) >= len(requirements) * 0.3: recommendations.append( "Significant automation potential in a subset of requirements. " "Consider a phased automation approach starting with high-potential areas." ) else: recommendations.append( "Limited automation potential in current requirements. " "Consider process redesign to increase automation potential." ) # Recommend technologies tech_counts = {} for req in requirements: tech = req["automation_potential"]["recommended_technology"] tech_counts[tech] = tech_counts.get(tech, 0) + 1 top_tech = max(tech_counts.items(), key=lambda x: x[1])[0] recommendations.append(f"Primary recommended technology: {top_tech}") # Requirements quality recommendations completeness_issues = False for req in requirements: if (not req["extracted"]["actions"] or not req["extracted"]["systems"] or not req["extracted"]["data_elements"]): completeness_issues = True break if completeness_issues: recommendations.append( "Some requirements lack necessary details. " "Consider refining requirements to specify actions, systems, and data elements." ) return recommendations Version 2 of 2