Spaces:
Sleeping
Sleeping
# process_discovery_engine.py | |
import numpy as np | |
import pandas as pd | |
from typing import Dict, List, Tuple, Optional | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.metrics.pairwise import cosine_similarity | |
import spacy | |
import json | |
import re | |
import networkx as nx | |
from sklearn.cluster import DBSCAN | |
class ProcessDiscoveryEngine: | |
""" | |
Discovers and analyzes business processes from various data sources | |
including logs, documents, and recorded user activities. | |
""" | |
def __init__(self, config: Dict): | |
""" | |
Initialize the process discovery engine. | |
Args: | |
config: Configuration dictionary with parameters | |
""" | |
self.min_frequency = config.get('min_frequency', 0.05) | |
self.time_threshold = config.get('time_threshold', 60) # seconds | |
self.similarity_threshold = config.get('similarity_threshold', 0.75) | |
self.process_graph = nx.DiGraph() | |
def ingest_log_data(self, log_data: pd.DataFrame) -> bool: | |
""" | |
Ingest process log data from system logs. | |
Args: | |
log_data: DataFrame containing log entries with timestamp, user, action columns | |
Returns: | |
bool: Success status | |
""" | |
if 'timestamp' not in log_data.columns or 'action' not in log_data.columns: | |
return False | |
# Sort by timestamp | |
sorted_logs = log_data.sort_values('timestamp') | |
# Group by case_id if available | |
if 'case_id' in sorted_logs.columns: | |
case_groups = sorted_logs.groupby('case_id') | |
for case_id, case_data in case_groups: | |
self._process_sequence(case_data['action'].tolist(), | |
source=f"log:{case_id}") | |
else: | |
# Try to identify sessions based on time gaps | |
self._segment_and_process_logs(sorted_logs) | |
return True | |
def ingest_screen_recordings(self, recording_analysis: List[Dict]) -> bool: | |
""" | |
Ingest analyzed screen recording data. | |
Args: | |
recording_analysis: List of dictionaries containing screen activities | |
Returns: | |
bool: Success status | |
""" | |
for session in recording_analysis: | |
if 'actions' in session and isinstance(session['actions'], list): | |
action_sequence = [a['activity'] for a in session['actions'] | |
if 'activity' in a] | |
self._process_sequence(action_sequence, | |
source=f"recording:{session.get('id', 'unknown')}") | |
return True | |
def _segment_and_process_logs(self, logs: pd.DataFrame) -> None: | |
""" | |
Segment logs into probable process instances based on time gaps. | |
Args: | |
logs: DataFrame of logs sorted by timestamp | |
""" | |
logs['timestamp'] = pd.to_datetime(logs['timestamp']) | |
logs['time_diff'] = logs['timestamp'].diff().dt.total_seconds() | |
# Mark new sequences where time difference exceeds threshold | |
new_sequence = logs['time_diff'] > self.time_threshold | |
logs['sequence_id'] = new_sequence.cumsum() | |
# Process each sequence | |
for seq_id, sequence in logs.groupby('sequence_id'): | |
self._process_sequence(sequence['action'].tolist(), | |
source=f"timegap:{seq_id}") | |
def _process_sequence(self, actions: List[str], source: str) -> None: | |
""" | |
Process a sequence of actions into the process graph. | |
Args: | |
actions: List of action names in sequence | |
source: Data source identifier | |
""" | |
for i in range(len(actions) - 1): | |
current = actions[i] | |
next_action = actions[i+1] | |
# Add nodes if they don't exist | |
if current not in self.process_graph: | |
self.process_graph.add_node(current, count=0, sources=set()) | |
if next_action not in self.process_graph: | |
self.process_graph.add_node(next_action, count=0, sources=set()) | |
# Update node data | |
self.process_graph.nodes[current]['count'] += 1 | |
self.process_graph.nodes[current]['sources'].add(source) | |
# Add or update edge | |
if self.process_graph.has_edge(current, next_action): | |
self.process_graph[current][next_action]['weight'] += 1 | |
self.process_graph[current][next_action]['sources'].add(source) | |
else: | |
self.process_graph.add_edge(current, next_action, | |
weight=1, sources={source}) | |
def discover_main_process_paths(self) -> List[Dict]: | |
""" | |
Discover the main process paths from the constructed graph. | |
Returns: | |
List of dictionaries describing main process paths | |
""" | |
# Filter edges by frequency | |
total_transitions = sum(data['weight'] for _, _, data in self.process_graph.edges(data=True)) | |
if total_transitions == 0: | |
return [] | |
min_edge_weight = total_transitions * self.min_frequency | |
significant_edges = [(u, v) for u, v, d in self.process_graph.edges(data=True) | |
if d['weight'] > min_edge_weight] | |
# Create subgraph with only significant edges | |
significant_graph = self.process_graph.edge_subgraph(significant_edges).copy() | |
# Find all simple paths from potential start nodes to end nodes | |
start_nodes = [n for n in significant_graph.nodes() | |
if significant_graph.in_degree(n) == 0 or | |
significant_graph.in_degree(n) < significant_graph.out_degree(n)] | |
end_nodes = [n for n in significant_graph.nodes() | |
if significant_graph.out_degree(n) == 0 or | |
significant_graph.out_degree(n) < significant_graph.in_degree(n)] | |
# If no clear start/end, use nodes with highest centrality | |
if not start_nodes: | |
centrality = nx.degree_centrality(significant_graph) | |
start_nodes = [max(centrality, key=centrality.get)] | |
if not end_nodes: | |
centrality = nx.degree_centrality(significant_graph) | |
end_nodes = [max(centrality, key=centrality.get)] | |
# Find all paths between start and end nodes | |
all_paths = [] | |
for start in start_nodes: | |
for end in end_nodes: | |
try: | |
paths = list(nx.all_simple_paths(significant_graph, start, end)) | |
all_paths.extend(paths) | |
except nx.NetworkXNoPath: | |
continue | |
# Calculate path frequency and return top paths | |
path_data = [] | |
for path in all_paths: | |
# Calculate path strength as minimum edge weight along path | |
edge_weights = [significant_graph[path[i]][path[i+1]]['weight'] | |
for i in range(len(path)-1)] | |
path_strength = min(edge_weights) if edge_weights else 0 | |
path_data.append({ | |
'path': path, | |
'strength': path_strength, | |
'length': len(path), | |
'avg_edge_weight': sum(edge_weights) / len(edge_weights) if edge_weights else 0 | |
}) | |
# Sort by path strength descending | |
path_data.sort(key=lambda x: x['strength'], reverse=True) | |
return path_data | |
def identify_process_variants(self) -> List[Dict]: | |
""" | |
Identify variants of the same basic process. | |
Returns: | |
List of process variant clusters | |
""" | |
if len(self.process_graph) < 2: | |
return [] | |
# Extract features for clustering | |
paths = self.discover_main_process_paths() | |
if not paths: | |
return [] | |
# Create feature vectors from paths | |
all_activities = sorted(list(self.process_graph.nodes())) | |
activity_indices = {act: i for i, act in enumerate(all_activities)} | |
# Create feature vectors (activity presence and position) | |
feature_vectors = [] | |
for path_data in paths: | |
path = path_data['path'] | |
vector = np.zeros(len(all_activities) * 2) | |
# Mark presence and relative position of activities | |
for pos, activity in enumerate(path): | |
idx = activity_indices[activity] | |
vector[idx] = 1 # presence | |
vector[idx + len(all_activities)] = pos / len(path) # relative position | |
feature_vectors.append(vector) | |
# Cluster paths using DBSCAN | |
if len(feature_vectors) < 2: | |
return [{'variant_id': 0, 'paths': paths}] | |
clustering = DBSCAN(eps=0.3, min_samples=1).fit(feature_vectors) | |
labels = clustering.labels_ | |
# Group paths by cluster | |
variants = {} | |
for i, label in enumerate(labels): | |
label_str = str(label) | |
if label_str not in variants: | |
variants[label_str] = [] | |
variants[label_str].append(paths[i]) | |
# Format result | |
result = [ | |
{'variant_id': variant_id, 'paths': variant_paths} | |
for variant_id, variant_paths in variants.items() | |
] | |
return result | |
def get_process_stats(self) -> Dict: | |
""" | |
Get statistics about the discovered process. | |
Returns: | |
Dictionary with process statistics | |
""" | |
if not self.process_graph: | |
return {"error": "No process data available"} | |
stats = { | |
"num_activities": len(self.process_graph.nodes()), | |
"num_transitions": len(self.process_graph.edges()), | |
"most_frequent_activities": [], | |
"most_frequent_transitions": [], | |
"process_complexity": 0, | |
"data_sources": set() | |
} | |
# Most frequent activities | |
activities = [(node, data['count']) | |
for node, data in self.process_graph.nodes(data=True)] | |
activities.sort(key=lambda x: x[1], reverse=True) | |
stats["most_frequent_activities"] = activities[:10] | |
# Most frequent transitions | |
transitions = [(u, v, data['weight']) | |
for u, v, data in self.process_graph.edges(data=True)] | |
transitions.sort(key=lambda x: x[2], reverse=True) | |
stats["most_frequent_transitions"] = transitions[:10] | |
# Process complexity (using Control-Flow Complexity metric) | |
stats["process_complexity"] = sum(self.process_graph.out_degree(n) for n in self.process_graph.nodes()) | |
# Data sources | |
for _, data in self.process_graph.nodes(data=True): | |
if 'sources' in data: | |
stats["data_sources"].update(data['sources']) | |
stats["data_sources"] = list(stats["data_sources"]) | |
return stats | |
def export_process_model(self, format_type: str = 'bpmn') -> Dict: | |
""" | |
Export the discovered process in the specified format. | |
Args: | |
format_type: Output format ('bpmn', 'petri_net', or 'json') | |
Returns: | |
Dictionary with export data and metadata | |
""" | |
if format_type == 'json': | |
nodes = [{"id": n, "count": data.get('count', 0)} | |
for n, data in self.process_graph.nodes(data=True)] | |
edges = [{"source": u, "target": v, "weight": data.get('weight', 0)} | |
for u, v, data in self.process_graph.edges(data=True)] | |
return { | |
"format": "json", | |
"process_model": { | |
"nodes": nodes, | |
"edges": edges | |
} | |
} | |
elif format_type == 'bpmn': | |
# Basic BPMN conversion (simplified) | |
# In a real implementation, this would generate actual BPMN XML | |
return { | |
"format": "bpmn", | |
"process_model": { | |
"process_id": "discovered_process", | |
"activities": list(self.process_graph.nodes()), | |
"flows": [(u, v) for u, v in self.process_graph.edges()], | |
"gateways": self._identify_potential_gateways() | |
} | |
} | |
elif format_type == 'petri_net': | |
# Basic Petri net conversion (simplified) | |
return { | |
"format": "petri_net", | |
"process_model": { | |
"places": self._generate_petri_net_places(), | |
"transitions": list(self.process_graph.nodes()), | |
"arcs": self._generate_petri_net_arcs() | |
} | |
} | |
else: | |
return {"error": f"Unsupported export format: {format_type}"} | |
def _identify_potential_gateways(self) -> List[Dict]: | |
""" | |
Identify potential gateways in the process based on branching. | |
Returns: | |
List of potential gateway nodes | |
""" | |
gateways = [] | |
for node in self.process_graph.nodes(): | |
in_degree = self.process_graph.in_degree(node) | |
out_degree = self.process_graph.out_degree(node) | |
# Potential XOR-split (one input, multiple outputs) | |
if in_degree == 1 and out_degree > 1: | |
gateways.append({ | |
"id": f"xor_split_{node}", | |
"type": "exclusive_gateway", | |
"direction": "split", | |
"attached_to": node | |
}) | |
# Potential XOR-join (multiple inputs, one output) | |
elif in_degree > 1 and out_degree == 1: | |
gateways.append({ | |
"id": f"xor_join_{node}", | |
"type": "exclusive_gateway", | |
"direction": "join", | |
"attached_to": node | |
}) | |
# Potential AND-split/join or complex gateway | |
elif in_degree > 1 and out_degree > 1: | |
gateways.append({ | |
"id": f"complex_{node}", | |
"type": "complex_gateway", | |
"direction": "mixed", | |
"attached_to": node | |
}) | |
return gateways | |
def _generate_petri_net_places(self) -> List[str]: | |
""" | |
Generate places for a Petri net representation. | |
Returns: | |
List of place IDs | |
""" | |
places = [] | |
# Generate places between each pair of activities | |
for u, v in self.process_graph.edges(): | |
places.append(f"p_{u}_{v}") | |
# Add start and end places | |
start_nodes = [n for n in self.process_graph.nodes() | |
if self.process_graph.in_degree(n) == 0] | |
for node in start_nodes: | |
places.append(f"p_start_{node}") | |
end_nodes = [n for n in self.process_graph.nodes() | |
if self.process_graph.out_degree(n) == 0] | |
for node in end_nodes: | |
places.append(f"p_{node}_end") | |
return places | |
def _generate_petri_net_arcs(self) -> List[Tuple[str, str]]: | |
""" | |
Generate arcs for a Petri net representation. | |
Returns: | |
List of (source, target) tuples representing arcs | |
""" | |
arcs = [] | |
# Connect transitions through places | |
for u, v in self.process_graph.edges(): | |
place = f"p_{u}_{v}" | |
arcs.append((u, place)) | |
arcs.append((place, v)) | |
# Connect start places to initial transitions | |
start_nodes = [n for n in self.process_graph.nodes() | |
if self.process_graph.in_degree(n) == 0] | |
for node in start_nodes: | |
arcs.append((f"p_start_{node}", node)) | |
# Connect final transitions to end places | |
end_nodes = [n for n in self.process_graph.nodes() | |
if self.process_graph.out_degree(n) == 0] | |
for node in end_nodes: | |
arcs.append((node, f"p_{node}_end")) | |
return arcs | |
# requirements_analysis_module.py | |
class RequirementsAnalysisModule: | |
""" | |
Analyzes business requirements and connects them to processes. | |
Extracts structured data from natural language requirements. | |
""" | |
def __init__(self, config: Dict = None): | |
""" | |
Initialize the requirements analysis module. | |
Args: | |
config: Configuration dictionary | |
""" | |
self.config = config or {} | |
# Load NLP model | |
try: | |
self.nlp = spacy.load("en_core_web_md") | |
except: | |
# Fallback to small model if medium not available | |
self.nlp = spacy.load("en_core_web_sm") | |
# Initialize requirements storage | |
self.requirements = [] | |
# Initialize taxonomy and patterns | |
self._load_taxonomies() | |
self._compile_requirement_patterns() | |
def _load_taxonomies(self) -> None: | |
"""Load or initialize the business process taxonomy.""" | |
# In production, this would load from a file or database | |
self.process_taxonomy = { | |
"financial": [ | |
"invoice processing", "accounts payable", "accounts receivable", | |
"payment processing", "financial reporting", "expense management" | |
], | |
"hr": [ | |
"onboarding", "offboarding", "payroll", "recruitment", | |
"employee management", "benefits administration", "time tracking" | |
], | |
"customer_service": [ | |
"ticket management", "customer support", "inquiry handling", | |
"complaint resolution", "feedback processing" | |
], | |
"operations": [ | |
"inventory management", "supply chain", "logistics", | |
"order processing", "shipping", "receiving", "quality control" | |
], | |
"sales": [ | |
"lead management", "opportunity tracking", "quote generation", | |
"contract management", "sales reporting", "commission calculation" | |
], | |
"it": [ | |
"access management", "incident management", "change management", | |
"service request", "problem management", "release management" | |
] | |
} | |
# Complexity indicators for requirements | |
self.complexity_indicators = { | |
"high": [ | |
"complex", "multiple systems", "integration", "decision tree", | |
"exception handling", "compliance", "regulatory", "manual review", | |
"approval workflow", "conditional logic", "business rules" | |
], | |
"medium": [ | |
"validation", "verification", "notification", "alert", | |
"scheduled", "reporting", "dashboard", "data transformation" | |
], | |
"low": [ | |
"simple", "straightforward", "data entry", "form filling", | |
"standard", "single system", "fixed path", "static rules" | |
] | |
} | |
def _compile_requirement_patterns(self) -> None: | |
"""Compile regex patterns for requirement extraction.""" | |
# Action patterns | |
self.action_patterns = [ | |
r"(?:need|should|must|will|shall) (?:to )?([a-z]+)", | |
r"responsible for ([a-z]+ing)", | |
r"capability to ([a-z]+)", | |
r"ability to ([a-z]+)" | |
] | |
# System patterns | |
self.system_patterns = [ | |
r"(?:in|from|to|using|within) (?:the )?([A-Za-z0-9]+)(?: system| application| platform| software| tool)?", | |
r"([A-Za-z0-9]+)(?: system| application| platform| software| tool)", | |
r"([A-Za-z0-9]+) (?:database|interface|API|server)" | |
] | |
# Frequency patterns | |
self.frequency_patterns = [ | |
r"(daily|weekly|monthly|quarterly|yearly|annually)", | |
r"every ([0-9]+) (day|week|month|quarter|year)s?", | |
r"([0-9]+) times per (day|week|month|year)" | |
] | |
# Compile all patterns | |
self.action_regex = [re.compile(pattern) for pattern in self.action_patterns] | |
self.system_regex = [re.compile(pattern) for pattern in self.system_patterns] | |
self.frequency_regex = [re.compile(pattern) for pattern in self.frequency_patterns] | |
def analyze_text_requirement(self, requirement_text: str, source: str = None) -> Dict: | |
""" | |
Analyze a natural language requirement and extract structured information. | |
Args: | |
requirement_text: The text of the requirement | |
source: Source of the requirement | |
Returns: | |
Dictionary with extracted requirement information | |
""" | |
# Parse with spaCy | |
doc = self.nlp(requirement_text) | |
# Basic requirement object | |
requirement = { | |
"id": f"REQ-{len(self.requirements) + 1}", | |
"text": requirement_text, | |
"source": source, | |
"extracted": { | |
"actions": self._extract_actions(doc, requirement_text), | |
"systems": self._extract_systems(doc, requirement_text), | |
"frequency": self._extract_frequency(requirement_text), | |
"business_domain": self._classify_business_domain(doc), | |
"complexity": self._assess_complexity(doc, requirement_text), | |
"data_elements": self._extract_data_elements(doc) | |
}, | |
"automation_potential": None # Will be filled later | |
} | |
# Store the requirement | |
self.requirements.append(requirement) | |
return requirement | |
def _extract_actions(self, doc, text: str) -> List[str]: | |
""" | |
Extract action verbs from requirement text. | |
Args: | |
doc: spaCy processed document | |
text: Original text | |
Returns: | |
List of action verbs | |
""" | |
# Method 1: Use spaCy to find verbs | |
verbs = [token.lemma_ for token in doc if token.pos_ == "VERB"] | |
# Method 2: Use regex patterns | |
pattern_matches = [] | |
for pattern in self.action_regex: | |
matches = pattern.findall(text.lower()) | |
pattern_matches.extend(matches) | |
# Combine and deduplicate | |
all_actions = list(set(verbs + pattern_matches)) | |
# Filter out common non-action verbs | |
stopwords = ["be", "is", "are", "was", "were", "have", "has", "had"] | |
filtered_actions = [v for v in all_actions if v not in stopwords and len(v) > 2] | |
return filtered_actions | |
def _extract_systems(self, doc, text: str) -> List[str]: | |
""" | |
Extract system names from requirement text. | |
Args: | |
doc: spaCy processed document | |
text: Original text | |
Returns: | |
List of system names | |
""" | |
# Method 1: Named Entity Recognition for PRODUCT entities | |
ner_systems = [ent.text for ent in doc.ents | |
if ent.label_ in ["PRODUCT", "ORG", "GPE"]] | |
# Method 2: Pattern matching | |
pattern_systems = [] | |
for pattern in self.system_regex: | |
matches = pattern.findall(text) | |
pattern_systems.extend(matches) | |
# Combine results | |
all_systems = list(set(ner_systems + pattern_systems)) | |
# Filter out common false positives | |
stopwords = ["system", "process", "application", "data", "information", "this", "the"] | |
filtered_systems = [s for s in all_systems if s.lower() not in stopwords and len(s) > 2] | |
return filtered_systems | |
def _extract_frequency(self, text: str) -> Optional[str]: | |
""" | |
Extract frequency information from requirement text. | |
Args: | |
text: Requirement text | |
Returns: | |
Extracted frequency or None | |
""" | |
text_lower = text.lower() | |
# Check all frequency patterns | |
for pattern in self.frequency_regex: | |
match = pattern.search(text_lower) | |
if match: | |
return match.group(0) | |
# Check for specific frequency words | |
frequency_words = ["daily", "weekly", "monthly", "quarterly", "annually", "yearly"] | |
for word in frequency_words: | |
if word in text_lower: | |
return word | |
return None | |
def _classify_business_domain(self, doc) -> List[Tuple[str, float]]: | |
""" | |
Classify the business domain of the requirement. | |
Args: | |
doc: spaCy processed document | |
Returns: | |
List of (domain, confidence) tuples | |
""" | |
text = doc.text.lower() | |
domain_scores = {} | |
# Calculate score for each domain based on keyword matches | |
for domain, keywords in self.process_taxonomy.items(): | |
domain_score = 0 | |
for keyword in keywords: | |
if keyword in text: | |
domain_score += 1 | |
if domain_score > 0: | |
# Normalize by number of keywords | |
domain_scores[domain] = domain_score / len(keywords) | |
# If no direct matches, use semantic similarity | |
if not domain_scores: | |
for domain, keywords in self.process_taxonomy.items(): | |
# Calculate average similarity between doc and each keyword | |
similarities = [doc.similarity(self.nlp(keyword)) for keyword in keywords] | |
avg_similarity = sum(similarities) / len(similarities) if similarities else 0 | |
if avg_similarity > 0.5: # Threshold for relevance | |
domain_scores[domain] = avg_similarity | |
# Sort by score and return | |
sorted_domains = sorted(domain_scores.items(), key=lambda x: x[1], reverse=True) | |
return sorted_domains | |
def _assess_complexity(self, doc, text: str) -> str: | |
""" | |
Assess the complexity of the requirement. | |
Args: | |
doc: spaCy processed document | |
text: Original text | |
Returns: | |
Complexity level ("high", "medium", or "low") | |
""" | |
text_lower = text.lower() | |
# Count indicators for each complexity level | |
scores = {level: 0 for level in self.complexity_indicators.keys()} | |
for level, indicators in self.complexity_indicators.items(): | |
for indicator in indicators: | |
if indicator in text_lower: | |
scores[level] += 1 | |
# Check sentence structure complexity | |
sentence_count = len(list(doc.sents)) | |
avg_tokens_per_sentence = len(doc) / sentence_count if sentence_count > 0 else 0 | |
# Adjust scores based on structural complexity | |
if avg_tokens_per_sentence > 25: | |
scores["high"] += 1 | |
elif avg_tokens_per_sentence > 15: | |
scores["medium"] += 1 | |
# Check for conditional statements (if/then) | |
if "if" in text_lower and ("then" in text_lower or "else" in text_lower): | |
scores["high"] += 1 | |
# Determine final complexity | |
if scores["high"] > 0: | |
return "high" | |
elif scores["medium"] > 0: | |
return "medium" | |
else: | |
return "low" | |
def _extract_data_elements(self, doc) -> List[str]: | |
""" | |
Extract data elements from the requirement text. | |
Args: | |
doc: spaCy processed document | |
Returns: | |
List of data elements | |
""" | |
# Find noun chunks that could be data elements | |
data_elements = [] | |
for chunk in doc.noun_chunks: | |
# Check if this looks like a data field | |
if (any(token.pos_ == "NOUN" for token in chunk) and | |
len(chunk) <= 4 and # Not too long | |
not any(token.is_stop for token in chunk)): # Not all stopwords | |
data_elements.append(chunk.text) | |
# Look for specific data patterns | |
data_patterns = [ | |
(r"\b[A-Z][a-z]+ ID\b", "ID field"), | |
(r"\b[A-Z][a-z]+ Number\b", "Number field"), | |
(r"\b[A-Z][a-z]+ Code\b", "Code field"), | |
(r"\b[A-Z][a-z]+ Date\b", "Date field"), | |
(r"\bstatus\b", "Status field") | |
] | |
for pattern, field_type in data_patterns: | |
if re.search(pattern, doc.text): | |
data_elements.append(field_type) | |
return list(set(data_elements)) | |
def analyze_requirements_batch(self, requirements: List[Dict]) -> List[Dict]: | |
""" | |
Analyze a batch of requirements and find relationships between them. | |
Args: | |
requirements: List of requirement dictionaries with 'text' field | |
Returns: | |
List of analyzed requirements | |
""" | |
# Process each requirement | |
processed_requirements = [] | |
for req in requirements: | |
req_text = req.get('text', '') | |
source = req.get('source', 'batch') | |
processed = self.analyze_text_requirement(req_text, source) | |
processed_requirements.append(processed) | |
# Find relationships between requirements | |
self._find_requirement_relationships(processed_requirements) | |
return processed_requirements | |
def _find_requirement_relationships(self, requirements: List[Dict]) -> None: | |
""" | |
Find and add relationships between requirements. | |
Args: | |
requirements: List of processed requirements | |
""" | |
if len(requirements) < 2: | |
return | |
# Extract text from requirements | |
texts = [req["text"] for req in requirements] | |
# Create TF-IDF matrix | |
vectorizer = TfidfVectorizer(stop_words='english') | |
tfidf_matrix = vectorizer.fit_transform(texts) | |
# Calculate similarity matrix | |
similarity_matrix = cosine_similarity(tfidf_matrix) | |
# Add relationships to requirements | |
for i, req in enumerate(requirements): | |
related = [] | |
for j, similarity in enumerate(similarity_matrix[i]): | |
if i != j and similarity > 0.3: # Threshold for relationship | |
related.append({ | |
"id": requirements[j]["id"], | |
"similarity": float(similarity), | |
"relationship_type": self._determine_relationship_type(req, requirements[j]) | |
}) | |
# Sort by similarity | |
related.sort(key=lambda x: x["similarity"], reverse=True) | |
# Add to requirement | |
req["related_requirements"] = related[:5] # Top 5 related requirements | |
def _determine_relationship_type(self, req1: Dict, req2: Dict) -> str: | |
""" | |
Determine the type of relationship between two requirements. | |
Args: | |
req1: First requirement | |
req2: Second requirement | |
Returns: | |
Relationship type string | |
""" | |
# Check for system relationships | |
systems1 = set(req1["extracted"]["systems"]) | |
systems2 = set(req2["extracted"]["systems"]) | |
if systems1.intersection(systems2): | |
return "same_system" | |
# Check for business domain relationships | |
domains1 = [d[0] for d in req1["extracted"]["business_domain"]] | |
domains2 = [d[0] for d in req2["extracted"]["business_domain"]] | |
if set(domains1).intersection(set(domains2)): | |
return "same_domain" | |
# Check for action relationships | |
actions1 = set(req1["extracted"]["actions"]) | |
actions2 = set(req2["extracted"]["actions"]) | |
if actions1.intersection(actions2): | |
return "similar_action" | |
# Default relationship type | |
return "related" | |
def map_requirements_to_processes(self, requirements: List[Dict], process_models: List[Dict]) -> Dict: | |
""" | |
Map requirements to process models based on content matching. | |
Args: | |
requirements: List of analyzed requirements | |
process_models: List of process model dictionaries | |
Returns: | |
Dictionary mapping process IDs to requirement IDs | |
""" | |
process_to_reqs = {} | |
req_to_process = {} | |
for process in process_models: | |
process_id = process.get("id", "unknown") | |
process_text = process.get("description", "") + " " + process.get("name", "") | |
process_doc = self.nlp(process_text) | |
# Find matching requirements | |
matching_reqs = [] | |
for req in requirements: | |
req_text = req["text"] | |
req_doc = self.nlp(req_text) | |
# Calculate similarity | |
similarity = process_doc.similarity(req_doc) | |
if similarity > 0.6: # Threshold for matching | |
matching_reqs.append({ | |
"req_id": req["id"], | |
"similarity": float(similarity) | |
}) | |
req_to_process[req["id"]] = process_id | |
# Sort by similarity | |
matching_reqs.sort(key=lambda x: x["similarity"], reverse=True) | |
process_to_reqs[process_id] = matching_reqs | |
return { | |
"process_to_requirements": process_to_reqs, | |
"requirement_to_process": req_to_process | |
} | |
def evaluate_automation_potential(self, requirement: Dict) -> Dict: | |
""" | |
Evaluate the automation potential of a requirement. | |
Args: | |
requirement: Analyzed requirement | |
Returns: | |
Automation potential assessment | |
""" | |
# Basic score starts at 5 out of 10 | |
score = 5 | |
# Complexity factor (high complexity decreases score) | |
complexity = requirement["extracted"]["complexity"] | |
if complexity == "high": | |
score -= 2 | |
elif complexity == "low": | |
score += 2 | |
# Action factor (certain actions are more automatable) | |
automatable_actions = ["extract", "transfer", "copy", "move", "calculate", | |
"update", "generate", "validate", "verify", "send", | |
"notify", "schedule", "retrieve", "check"] | |
for action in requirement["extracted"]["actions"]: | |
if action in automatable_actions: | |
score += 0.5 | |
# System factor (presence of systems increases score) | |
if requirement["extracted"]["systems"]: | |
score += len(requirement["extracted"]["systems"]) * 0.5 | |
# Data elements factor (more data elements suggests more structure) | |
data_elements = requirement["extracted"]["data_elements"] | |
if data_elements: | |
score += min(len(data_elements) * 0.3, 2) # Cap at +2 | |
# Cap score between 1-10 | |
score = max(1, min(10, score)) | |
# Determine category | |
category = "high" if score >= 7.5 else "medium" if score >= 5 else "low" | |
# Identify automation technology | |
tech = self._recommend_automation_technology(requirement, score) | |
return { | |
"automation_score": round(score, 1), | |
"automation_category": category, | |
"recommended_technology": tech, | |
"rationale": self._generate_automation_rationale(requirement, score, category) | |
} | |
def _recommend_automation_technology(self, requirement: Dict, score: float) -> str: | |
""" | |
Recommend suitable automation technology. | |
Args: | |
requirement: Analyzed requirement | |
score: Automation score | |
Returns: | |
Recommended technology | |
""" | |
complexity = requirement["extracted"]["complexity"] | |
actions = requirement["extracted"]["actions"] | |
# Decision tree for technology recommendation | |
if score >= 8: | |
if any(a in actions for a in ["extract", "scrape", "read"]): | |
return "RPA with OCR/Document Understanding" | |
else: | |
return "Traditional RPA" | |
elif score >= 5: | |
if complexity == "high": | |
return "RPA with Human-in-the-Loop" | |
elif any(a in actions for a in ["decide", "evaluate", "assess"]): | |
return "RPA with Decision Automation" | |
else: | |
return "Traditional RPA" | |
else: | |
if any(a in actions for a in ["review", "approve"]): | |
return "Workflow Automation" | |
else: | |
return "Partial Automation with Human Tasks" | |
def _generate_automation_rationale(self, requirement: Dict, score: float, category: str) -> str: | |
""" | |
Generate explanation for automation assessment. | |
Args: | |
requirement: Analyzed requirement | |
score: Automation score | |
category: Automation category | |
Returns: | |
Rationale text | |
""" | |
complexity = requirement["extracted"]["complexity"] | |
if category == "high": | |
return (f"This requirement has {complexity} complexity but shows strong automation " | |
f"potential due to clear structure and defined data elements. " | |
f"Score of {score}/10 indicates this is a prime automation candidate.") | |
elif category == "medium": | |
return (f"This {complexity} complexity requirement has moderate automation potential. " | |
f"Score of {score}/10 suggests partial automation with some human oversight.") | |
else: | |
return (f"The {complexity} complexity and ambiguous nature of this requirement " | |
f"limits automation potential. Score of {score}/10 indicates this may " | |
f"require significant human involvement or process redesign.") | |
def assess_requirements_automation_potential(self, requirements: List[Dict]) -> List[Dict]: | |
""" | |
Assess automation potential for a batch of requirements. | |
Args: | |
requirements: List of analyzed requirements | |
Returns: | |
Requirements with automation assessment added | |
""" | |
for req in requirements: | |
req["automation_potential"] = self.evaluate_automation_potential(req) | |
return requirements | |
def generate_requirements_report(self, requirements: List[Dict]) -> Dict: | |
""" | |
Generate a summary report of requirements analysis. | |
Args: | |
requirements: List of analyzed requirements | |
Returns: | |
Report dictionary | |
""" | |
# Count by complexity | |
complexity_counts = {"high": 0, "medium": 0, "low": 0} | |
for req in requirements: | |
complexity = req["extracted"]["complexity"] | |
complexity_counts[complexity] += 1 | |
# Count by automation potential | |
if all("automation_potential" in req for req in requirements): | |
automation_counts = {"high": 0, "medium": 0, "low": 0} | |
for req in requirements: | |
category = req["automation_potential"]["automation_category"] | |
automation_counts[category] += 1 | |
else: | |
automation_counts = None | |
# Find common systems | |
all_systems = [] | |
for req in requirements: | |
all_systems.extend(req["extracted"]["systems"]) | |
system_counts = {} | |
for system in all_systems: | |
if system in system_counts: | |
system_counts[system] += 1 | |
else: | |
system_counts[system] = 1 | |
# Sort systems by frequency | |
top_systems = sorted(system_counts.items(), key=lambda x: x[1], reverse=True)[:5] | |
# Generate report | |
report = { | |
"total_requirements": len(requirements), | |
"complexity_distribution": complexity_counts, | |
"automation_potential": automation_counts, | |
"top_systems": top_systems, | |
"recommendations": self._generate_overall_recommendations(requirements) | |
} | |
return report | |
def _generate_overall_recommendations(self, requirements: List[Dict]) -> List[str]: | |
""" | |
Generate overall recommendations based on requirements analysis. | |
Args: | |
requirements: List of analyzed requirements | |
Returns: | |
List of recommendation strings | |
""" | |
recommendations = [] | |
# Check if automation assessment is available | |
automation_available = all("automation_potential" in req for req in requirements) | |
if automation_available: | |
# Count high automation potential requirements | |
high_potential = [r for r in requirements | |
if r["automation_potential"]["automation_category"] == "high"] | |
if len(high_potential) >= len(requirements) * 0.7: | |
recommendations.append( | |
"High automation potential across most requirements. " | |
"Consider an end-to-end automation solution." | |
) | |
elif len(high_potential) >= len(requirements) * 0.3: | |
recommendations.append( | |
"Significant automation potential in a subset of requirements. " | |
"Consider a phased automation approach starting with high-potential areas." | |
) | |
else: | |
recommendations.append( | |
"Limited automation potential in current requirements. " | |
"Consider process redesign to increase automation potential." | |
) | |
# Recommend technologies | |
tech_counts = {} | |
for req in requirements: | |
tech = req["automation_potential"]["recommended_technology"] | |
tech_counts[tech] = tech_counts.get(tech, 0) + 1 | |
top_tech = max(tech_counts.items(), key=lambda x: x[1])[0] | |
recommendations.append(f"Primary recommended technology: {top_tech}") | |
# Requirements quality recommendations | |
completeness_issues = False | |
for req in requirements: | |
if (not req["extracted"]["actions"] or | |
not req["extracted"]["systems"] or | |
not req["extracted"]["data_elements"]): | |
completeness_issues = True | |
break | |
if completeness_issues: | |
recommendations.append( | |
"Some requirements lack necessary details. " | |
"Consider refining requirements to specify actions, systems, and data elements." | |
) | |
return recommendations | |
Version 2 of 2 | |