USER-GNEXUSES's picture
Update app.py
18b6d8a verified
raw
history blame
44.5 kB
# process_discovery_engine.py
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple, Optional
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import spacy
import json
import re
import networkx as nx
from sklearn.cluster import DBSCAN
class ProcessDiscoveryEngine:
"""
Discovers and analyzes business processes from various data sources
including logs, documents, and recorded user activities.
"""
def __init__(self, config: Dict):
"""
Initialize the process discovery engine.
Args:
config: Configuration dictionary with parameters
"""
self.min_frequency = config.get('min_frequency', 0.05)
self.time_threshold = config.get('time_threshold', 60) # seconds
self.similarity_threshold = config.get('similarity_threshold', 0.75)
self.process_graph = nx.DiGraph()
def ingest_log_data(self, log_data: pd.DataFrame) -> bool:
"""
Ingest process log data from system logs.
Args:
log_data: DataFrame containing log entries with timestamp, user, action columns
Returns:
bool: Success status
"""
if 'timestamp' not in log_data.columns or 'action' not in log_data.columns:
return False
# Sort by timestamp
sorted_logs = log_data.sort_values('timestamp')
# Group by case_id if available
if 'case_id' in sorted_logs.columns:
case_groups = sorted_logs.groupby('case_id')
for case_id, case_data in case_groups:
self._process_sequence(case_data['action'].tolist(),
source=f"log:{case_id}")
else:
# Try to identify sessions based on time gaps
self._segment_and_process_logs(sorted_logs)
return True
def ingest_screen_recordings(self, recording_analysis: List[Dict]) -> bool:
"""
Ingest analyzed screen recording data.
Args:
recording_analysis: List of dictionaries containing screen activities
Returns:
bool: Success status
"""
for session in recording_analysis:
if 'actions' in session and isinstance(session['actions'], list):
action_sequence = [a['activity'] for a in session['actions']
if 'activity' in a]
self._process_sequence(action_sequence,
source=f"recording:{session.get('id', 'unknown')}")
return True
def _segment_and_process_logs(self, logs: pd.DataFrame) -> None:
"""
Segment logs into probable process instances based on time gaps.
Args:
logs: DataFrame of logs sorted by timestamp
"""
logs['timestamp'] = pd.to_datetime(logs['timestamp'])
logs['time_diff'] = logs['timestamp'].diff().dt.total_seconds()
# Mark new sequences where time difference exceeds threshold
new_sequence = logs['time_diff'] > self.time_threshold
logs['sequence_id'] = new_sequence.cumsum()
# Process each sequence
for seq_id, sequence in logs.groupby('sequence_id'):
self._process_sequence(sequence['action'].tolist(),
source=f"timegap:{seq_id}")
def _process_sequence(self, actions: List[str], source: str) -> None:
"""
Process a sequence of actions into the process graph.
Args:
actions: List of action names in sequence
source: Data source identifier
"""
for i in range(len(actions) - 1):
current = actions[i]
next_action = actions[i+1]
# Add nodes if they don't exist
if current not in self.process_graph:
self.process_graph.add_node(current, count=0, sources=set())
if next_action not in self.process_graph:
self.process_graph.add_node(next_action, count=0, sources=set())
# Update node data
self.process_graph.nodes[current]['count'] += 1
self.process_graph.nodes[current]['sources'].add(source)
# Add or update edge
if self.process_graph.has_edge(current, next_action):
self.process_graph[current][next_action]['weight'] += 1
self.process_graph[current][next_action]['sources'].add(source)
else:
self.process_graph.add_edge(current, next_action,
weight=1, sources={source})
def discover_main_process_paths(self) -> List[Dict]:
"""
Discover the main process paths from the constructed graph.
Returns:
List of dictionaries describing main process paths
"""
# Filter edges by frequency
total_transitions = sum(data['weight'] for _, _, data in self.process_graph.edges(data=True))
if total_transitions == 0:
return []
min_edge_weight = total_transitions * self.min_frequency
significant_edges = [(u, v) for u, v, d in self.process_graph.edges(data=True)
if d['weight'] > min_edge_weight]
# Create subgraph with only significant edges
significant_graph = self.process_graph.edge_subgraph(significant_edges).copy()
# Find all simple paths from potential start nodes to end nodes
start_nodes = [n for n in significant_graph.nodes()
if significant_graph.in_degree(n) == 0 or
significant_graph.in_degree(n) < significant_graph.out_degree(n)]
end_nodes = [n for n in significant_graph.nodes()
if significant_graph.out_degree(n) == 0 or
significant_graph.out_degree(n) < significant_graph.in_degree(n)]
# If no clear start/end, use nodes with highest centrality
if not start_nodes:
centrality = nx.degree_centrality(significant_graph)
start_nodes = [max(centrality, key=centrality.get)]
if not end_nodes:
centrality = nx.degree_centrality(significant_graph)
end_nodes = [max(centrality, key=centrality.get)]
# Find all paths between start and end nodes
all_paths = []
for start in start_nodes:
for end in end_nodes:
try:
paths = list(nx.all_simple_paths(significant_graph, start, end))
all_paths.extend(paths)
except nx.NetworkXNoPath:
continue
# Calculate path frequency and return top paths
path_data = []
for path in all_paths:
# Calculate path strength as minimum edge weight along path
edge_weights = [significant_graph[path[i]][path[i+1]]['weight']
for i in range(len(path)-1)]
path_strength = min(edge_weights) if edge_weights else 0
path_data.append({
'path': path,
'strength': path_strength,
'length': len(path),
'avg_edge_weight': sum(edge_weights) / len(edge_weights) if edge_weights else 0
})
# Sort by path strength descending
path_data.sort(key=lambda x: x['strength'], reverse=True)
return path_data
def identify_process_variants(self) -> List[Dict]:
"""
Identify variants of the same basic process.
Returns:
List of process variant clusters
"""
if len(self.process_graph) < 2:
return []
# Extract features for clustering
paths = self.discover_main_process_paths()
if not paths:
return []
# Create feature vectors from paths
all_activities = sorted(list(self.process_graph.nodes()))
activity_indices = {act: i for i, act in enumerate(all_activities)}
# Create feature vectors (activity presence and position)
feature_vectors = []
for path_data in paths:
path = path_data['path']
vector = np.zeros(len(all_activities) * 2)
# Mark presence and relative position of activities
for pos, activity in enumerate(path):
idx = activity_indices[activity]
vector[idx] = 1 # presence
vector[idx + len(all_activities)] = pos / len(path) # relative position
feature_vectors.append(vector)
# Cluster paths using DBSCAN
if len(feature_vectors) < 2:
return [{'variant_id': 0, 'paths': paths}]
clustering = DBSCAN(eps=0.3, min_samples=1).fit(feature_vectors)
labels = clustering.labels_
# Group paths by cluster
variants = {}
for i, label in enumerate(labels):
label_str = str(label)
if label_str not in variants:
variants[label_str] = []
variants[label_str].append(paths[i])
# Format result
result = [
{'variant_id': variant_id, 'paths': variant_paths}
for variant_id, variant_paths in variants.items()
]
return result
def get_process_stats(self) -> Dict:
"""
Get statistics about the discovered process.
Returns:
Dictionary with process statistics
"""
if not self.process_graph:
return {"error": "No process data available"}
stats = {
"num_activities": len(self.process_graph.nodes()),
"num_transitions": len(self.process_graph.edges()),
"most_frequent_activities": [],
"most_frequent_transitions": [],
"process_complexity": 0,
"data_sources": set()
}
# Most frequent activities
activities = [(node, data['count'])
for node, data in self.process_graph.nodes(data=True)]
activities.sort(key=lambda x: x[1], reverse=True)
stats["most_frequent_activities"] = activities[:10]
# Most frequent transitions
transitions = [(u, v, data['weight'])
for u, v, data in self.process_graph.edges(data=True)]
transitions.sort(key=lambda x: x[2], reverse=True)
stats["most_frequent_transitions"] = transitions[:10]
# Process complexity (using Control-Flow Complexity metric)
stats["process_complexity"] = sum(self.process_graph.out_degree(n) for n in self.process_graph.nodes())
# Data sources
for _, data in self.process_graph.nodes(data=True):
if 'sources' in data:
stats["data_sources"].update(data['sources'])
stats["data_sources"] = list(stats["data_sources"])
return stats
def export_process_model(self, format_type: str = 'bpmn') -> Dict:
"""
Export the discovered process in the specified format.
Args:
format_type: Output format ('bpmn', 'petri_net', or 'json')
Returns:
Dictionary with export data and metadata
"""
if format_type == 'json':
nodes = [{"id": n, "count": data.get('count', 0)}
for n, data in self.process_graph.nodes(data=True)]
edges = [{"source": u, "target": v, "weight": data.get('weight', 0)}
for u, v, data in self.process_graph.edges(data=True)]
return {
"format": "json",
"process_model": {
"nodes": nodes,
"edges": edges
}
}
elif format_type == 'bpmn':
# Basic BPMN conversion (simplified)
# In a real implementation, this would generate actual BPMN XML
return {
"format": "bpmn",
"process_model": {
"process_id": "discovered_process",
"activities": list(self.process_graph.nodes()),
"flows": [(u, v) for u, v in self.process_graph.edges()],
"gateways": self._identify_potential_gateways()
}
}
elif format_type == 'petri_net':
# Basic Petri net conversion (simplified)
return {
"format": "petri_net",
"process_model": {
"places": self._generate_petri_net_places(),
"transitions": list(self.process_graph.nodes()),
"arcs": self._generate_petri_net_arcs()
}
}
else:
return {"error": f"Unsupported export format: {format_type}"}
def _identify_potential_gateways(self) -> List[Dict]:
"""
Identify potential gateways in the process based on branching.
Returns:
List of potential gateway nodes
"""
gateways = []
for node in self.process_graph.nodes():
in_degree = self.process_graph.in_degree(node)
out_degree = self.process_graph.out_degree(node)
# Potential XOR-split (one input, multiple outputs)
if in_degree == 1 and out_degree > 1:
gateways.append({
"id": f"xor_split_{node}",
"type": "exclusive_gateway",
"direction": "split",
"attached_to": node
})
# Potential XOR-join (multiple inputs, one output)
elif in_degree > 1 and out_degree == 1:
gateways.append({
"id": f"xor_join_{node}",
"type": "exclusive_gateway",
"direction": "join",
"attached_to": node
})
# Potential AND-split/join or complex gateway
elif in_degree > 1 and out_degree > 1:
gateways.append({
"id": f"complex_{node}",
"type": "complex_gateway",
"direction": "mixed",
"attached_to": node
})
return gateways
def _generate_petri_net_places(self) -> List[str]:
"""
Generate places for a Petri net representation.
Returns:
List of place IDs
"""
places = []
# Generate places between each pair of activities
for u, v in self.process_graph.edges():
places.append(f"p_{u}_{v}")
# Add start and end places
start_nodes = [n for n in self.process_graph.nodes()
if self.process_graph.in_degree(n) == 0]
for node in start_nodes:
places.append(f"p_start_{node}")
end_nodes = [n for n in self.process_graph.nodes()
if self.process_graph.out_degree(n) == 0]
for node in end_nodes:
places.append(f"p_{node}_end")
return places
def _generate_petri_net_arcs(self) -> List[Tuple[str, str]]:
"""
Generate arcs for a Petri net representation.
Returns:
List of (source, target) tuples representing arcs
"""
arcs = []
# Connect transitions through places
for u, v in self.process_graph.edges():
place = f"p_{u}_{v}"
arcs.append((u, place))
arcs.append((place, v))
# Connect start places to initial transitions
start_nodes = [n for n in self.process_graph.nodes()
if self.process_graph.in_degree(n) == 0]
for node in start_nodes:
arcs.append((f"p_start_{node}", node))
# Connect final transitions to end places
end_nodes = [n for n in self.process_graph.nodes()
if self.process_graph.out_degree(n) == 0]
for node in end_nodes:
arcs.append((node, f"p_{node}_end"))
return arcs
# requirements_analysis_module.py
class RequirementsAnalysisModule:
"""
Analyzes business requirements and connects them to processes.
Extracts structured data from natural language requirements.
"""
def __init__(self, config: Dict = None):
"""
Initialize the requirements analysis module.
Args:
config: Configuration dictionary
"""
self.config = config or {}
# Load NLP model
try:
self.nlp = spacy.load("en_core_web_md")
except:
# Fallback to small model if medium not available
self.nlp = spacy.load("en_core_web_sm")
# Initialize requirements storage
self.requirements = []
# Initialize taxonomy and patterns
self._load_taxonomies()
self._compile_requirement_patterns()
def _load_taxonomies(self) -> None:
"""Load or initialize the business process taxonomy."""
# In production, this would load from a file or database
self.process_taxonomy = {
"financial": [
"invoice processing", "accounts payable", "accounts receivable",
"payment processing", "financial reporting", "expense management"
],
"hr": [
"onboarding", "offboarding", "payroll", "recruitment",
"employee management", "benefits administration", "time tracking"
],
"customer_service": [
"ticket management", "customer support", "inquiry handling",
"complaint resolution", "feedback processing"
],
"operations": [
"inventory management", "supply chain", "logistics",
"order processing", "shipping", "receiving", "quality control"
],
"sales": [
"lead management", "opportunity tracking", "quote generation",
"contract management", "sales reporting", "commission calculation"
],
"it": [
"access management", "incident management", "change management",
"service request", "problem management", "release management"
]
}
# Complexity indicators for requirements
self.complexity_indicators = {
"high": [
"complex", "multiple systems", "integration", "decision tree",
"exception handling", "compliance", "regulatory", "manual review",
"approval workflow", "conditional logic", "business rules"
],
"medium": [
"validation", "verification", "notification", "alert",
"scheduled", "reporting", "dashboard", "data transformation"
],
"low": [
"simple", "straightforward", "data entry", "form filling",
"standard", "single system", "fixed path", "static rules"
]
}
def _compile_requirement_patterns(self) -> None:
"""Compile regex patterns for requirement extraction."""
# Action patterns
self.action_patterns = [
r"(?:need|should|must|will|shall) (?:to )?([a-z]+)",
r"responsible for ([a-z]+ing)",
r"capability to ([a-z]+)",
r"ability to ([a-z]+)"
]
# System patterns
self.system_patterns = [
r"(?:in|from|to|using|within) (?:the )?([A-Za-z0-9]+)(?: system| application| platform| software| tool)?",
r"([A-Za-z0-9]+)(?: system| application| platform| software| tool)",
r"([A-Za-z0-9]+) (?:database|interface|API|server)"
]
# Frequency patterns
self.frequency_patterns = [
r"(daily|weekly|monthly|quarterly|yearly|annually)",
r"every ([0-9]+) (day|week|month|quarter|year)s?",
r"([0-9]+) times per (day|week|month|year)"
]
# Compile all patterns
self.action_regex = [re.compile(pattern) for pattern in self.action_patterns]
self.system_regex = [re.compile(pattern) for pattern in self.system_patterns]
self.frequency_regex = [re.compile(pattern) for pattern in self.frequency_patterns]
def analyze_text_requirement(self, requirement_text: str, source: str = None) -> Dict:
"""
Analyze a natural language requirement and extract structured information.
Args:
requirement_text: The text of the requirement
source: Source of the requirement
Returns:
Dictionary with extracted requirement information
"""
# Parse with spaCy
doc = self.nlp(requirement_text)
# Basic requirement object
requirement = {
"id": f"REQ-{len(self.requirements) + 1}",
"text": requirement_text,
"source": source,
"extracted": {
"actions": self._extract_actions(doc, requirement_text),
"systems": self._extract_systems(doc, requirement_text),
"frequency": self._extract_frequency(requirement_text),
"business_domain": self._classify_business_domain(doc),
"complexity": self._assess_complexity(doc, requirement_text),
"data_elements": self._extract_data_elements(doc)
},
"automation_potential": None # Will be filled later
}
# Store the requirement
self.requirements.append(requirement)
return requirement
def _extract_actions(self, doc, text: str) -> List[str]:
"""
Extract action verbs from requirement text.
Args:
doc: spaCy processed document
text: Original text
Returns:
List of action verbs
"""
# Method 1: Use spaCy to find verbs
verbs = [token.lemma_ for token in doc if token.pos_ == "VERB"]
# Method 2: Use regex patterns
pattern_matches = []
for pattern in self.action_regex:
matches = pattern.findall(text.lower())
pattern_matches.extend(matches)
# Combine and deduplicate
all_actions = list(set(verbs + pattern_matches))
# Filter out common non-action verbs
stopwords = ["be", "is", "are", "was", "were", "have", "has", "had"]
filtered_actions = [v for v in all_actions if v not in stopwords and len(v) > 2]
return filtered_actions
def _extract_systems(self, doc, text: str) -> List[str]:
"""
Extract system names from requirement text.
Args:
doc: spaCy processed document
text: Original text
Returns:
List of system names
"""
# Method 1: Named Entity Recognition for PRODUCT entities
ner_systems = [ent.text for ent in doc.ents
if ent.label_ in ["PRODUCT", "ORG", "GPE"]]
# Method 2: Pattern matching
pattern_systems = []
for pattern in self.system_regex:
matches = pattern.findall(text)
pattern_systems.extend(matches)
# Combine results
all_systems = list(set(ner_systems + pattern_systems))
# Filter out common false positives
stopwords = ["system", "process", "application", "data", "information", "this", "the"]
filtered_systems = [s for s in all_systems if s.lower() not in stopwords and len(s) > 2]
return filtered_systems
def _extract_frequency(self, text: str) -> Optional[str]:
"""
Extract frequency information from requirement text.
Args:
text: Requirement text
Returns:
Extracted frequency or None
"""
text_lower = text.lower()
# Check all frequency patterns
for pattern in self.frequency_regex:
match = pattern.search(text_lower)
if match:
return match.group(0)
# Check for specific frequency words
frequency_words = ["daily", "weekly", "monthly", "quarterly", "annually", "yearly"]
for word in frequency_words:
if word in text_lower:
return word
return None
def _classify_business_domain(self, doc) -> List[Tuple[str, float]]:
"""
Classify the business domain of the requirement.
Args:
doc: spaCy processed document
Returns:
List of (domain, confidence) tuples
"""
text = doc.text.lower()
domain_scores = {}
# Calculate score for each domain based on keyword matches
for domain, keywords in self.process_taxonomy.items():
domain_score = 0
for keyword in keywords:
if keyword in text:
domain_score += 1
if domain_score > 0:
# Normalize by number of keywords
domain_scores[domain] = domain_score / len(keywords)
# If no direct matches, use semantic similarity
if not domain_scores:
for domain, keywords in self.process_taxonomy.items():
# Calculate average similarity between doc and each keyword
similarities = [doc.similarity(self.nlp(keyword)) for keyword in keywords]
avg_similarity = sum(similarities) / len(similarities) if similarities else 0
if avg_similarity > 0.5: # Threshold for relevance
domain_scores[domain] = avg_similarity
# Sort by score and return
sorted_domains = sorted(domain_scores.items(), key=lambda x: x[1], reverse=True)
return sorted_domains
def _assess_complexity(self, doc, text: str) -> str:
"""
Assess the complexity of the requirement.
Args:
doc: spaCy processed document
text: Original text
Returns:
Complexity level ("high", "medium", or "low")
"""
text_lower = text.lower()
# Count indicators for each complexity level
scores = {level: 0 for level in self.complexity_indicators.keys()}
for level, indicators in self.complexity_indicators.items():
for indicator in indicators:
if indicator in text_lower:
scores[level] += 1
# Check sentence structure complexity
sentence_count = len(list(doc.sents))
avg_tokens_per_sentence = len(doc) / sentence_count if sentence_count > 0 else 0
# Adjust scores based on structural complexity
if avg_tokens_per_sentence > 25:
scores["high"] += 1
elif avg_tokens_per_sentence > 15:
scores["medium"] += 1
# Check for conditional statements (if/then)
if "if" in text_lower and ("then" in text_lower or "else" in text_lower):
scores["high"] += 1
# Determine final complexity
if scores["high"] > 0:
return "high"
elif scores["medium"] > 0:
return "medium"
else:
return "low"
def _extract_data_elements(self, doc) -> List[str]:
"""
Extract data elements from the requirement text.
Args:
doc: spaCy processed document
Returns:
List of data elements
"""
# Find noun chunks that could be data elements
data_elements = []
for chunk in doc.noun_chunks:
# Check if this looks like a data field
if (any(token.pos_ == "NOUN" for token in chunk) and
len(chunk) <= 4 and # Not too long
not any(token.is_stop for token in chunk)): # Not all stopwords
data_elements.append(chunk.text)
# Look for specific data patterns
data_patterns = [
(r"\b[A-Z][a-z]+ ID\b", "ID field"),
(r"\b[A-Z][a-z]+ Number\b", "Number field"),
(r"\b[A-Z][a-z]+ Code\b", "Code field"),
(r"\b[A-Z][a-z]+ Date\b", "Date field"),
(r"\bstatus\b", "Status field")
]
for pattern, field_type in data_patterns:
if re.search(pattern, doc.text):
data_elements.append(field_type)
return list(set(data_elements))
def analyze_requirements_batch(self, requirements: List[Dict]) -> List[Dict]:
"""
Analyze a batch of requirements and find relationships between them.
Args:
requirements: List of requirement dictionaries with 'text' field
Returns:
List of analyzed requirements
"""
# Process each requirement
processed_requirements = []
for req in requirements:
req_text = req.get('text', '')
source = req.get('source', 'batch')
processed = self.analyze_text_requirement(req_text, source)
processed_requirements.append(processed)
# Find relationships between requirements
self._find_requirement_relationships(processed_requirements)
return processed_requirements
def _find_requirement_relationships(self, requirements: List[Dict]) -> None:
"""
Find and add relationships between requirements.
Args:
requirements: List of processed requirements
"""
if len(requirements) < 2:
return
# Extract text from requirements
texts = [req["text"] for req in requirements]
# Create TF-IDF matrix
vectorizer = TfidfVectorizer(stop_words='english')
tfidf_matrix = vectorizer.fit_transform(texts)
# Calculate similarity matrix
similarity_matrix = cosine_similarity(tfidf_matrix)
# Add relationships to requirements
for i, req in enumerate(requirements):
related = []
for j, similarity in enumerate(similarity_matrix[i]):
if i != j and similarity > 0.3: # Threshold for relationship
related.append({
"id": requirements[j]["id"],
"similarity": float(similarity),
"relationship_type": self._determine_relationship_type(req, requirements[j])
})
# Sort by similarity
related.sort(key=lambda x: x["similarity"], reverse=True)
# Add to requirement
req["related_requirements"] = related[:5] # Top 5 related requirements
def _determine_relationship_type(self, req1: Dict, req2: Dict) -> str:
"""
Determine the type of relationship between two requirements.
Args:
req1: First requirement
req2: Second requirement
Returns:
Relationship type string
"""
# Check for system relationships
systems1 = set(req1["extracted"]["systems"])
systems2 = set(req2["extracted"]["systems"])
if systems1.intersection(systems2):
return "same_system"
# Check for business domain relationships
domains1 = [d[0] for d in req1["extracted"]["business_domain"]]
domains2 = [d[0] for d in req2["extracted"]["business_domain"]]
if set(domains1).intersection(set(domains2)):
return "same_domain"
# Check for action relationships
actions1 = set(req1["extracted"]["actions"])
actions2 = set(req2["extracted"]["actions"])
if actions1.intersection(actions2):
return "similar_action"
# Default relationship type
return "related"
def map_requirements_to_processes(self, requirements: List[Dict], process_models: List[Dict]) -> Dict:
"""
Map requirements to process models based on content matching.
Args:
requirements: List of analyzed requirements
process_models: List of process model dictionaries
Returns:
Dictionary mapping process IDs to requirement IDs
"""
process_to_reqs = {}
req_to_process = {}
for process in process_models:
process_id = process.get("id", "unknown")
process_text = process.get("description", "") + " " + process.get("name", "")
process_doc = self.nlp(process_text)
# Find matching requirements
matching_reqs = []
for req in requirements:
req_text = req["text"]
req_doc = self.nlp(req_text)
# Calculate similarity
similarity = process_doc.similarity(req_doc)
if similarity > 0.6: # Threshold for matching
matching_reqs.append({
"req_id": req["id"],
"similarity": float(similarity)
})
req_to_process[req["id"]] = process_id
# Sort by similarity
matching_reqs.sort(key=lambda x: x["similarity"], reverse=True)
process_to_reqs[process_id] = matching_reqs
return {
"process_to_requirements": process_to_reqs,
"requirement_to_process": req_to_process
}
def evaluate_automation_potential(self, requirement: Dict) -> Dict:
"""
Evaluate the automation potential of a requirement.
Args:
requirement: Analyzed requirement
Returns:
Automation potential assessment
"""
# Basic score starts at 5 out of 10
score = 5
# Complexity factor (high complexity decreases score)
complexity = requirement["extracted"]["complexity"]
if complexity == "high":
score -= 2
elif complexity == "low":
score += 2
# Action factor (certain actions are more automatable)
automatable_actions = ["extract", "transfer", "copy", "move", "calculate",
"update", "generate", "validate", "verify", "send",
"notify", "schedule", "retrieve", "check"]
for action in requirement["extracted"]["actions"]:
if action in automatable_actions:
score += 0.5
# System factor (presence of systems increases score)
if requirement["extracted"]["systems"]:
score += len(requirement["extracted"]["systems"]) * 0.5
# Data elements factor (more data elements suggests more structure)
data_elements = requirement["extracted"]["data_elements"]
if data_elements:
score += min(len(data_elements) * 0.3, 2) # Cap at +2
# Cap score between 1-10
score = max(1, min(10, score))
# Determine category
category = "high" if score >= 7.5 else "medium" if score >= 5 else "low"
# Identify automation technology
tech = self._recommend_automation_technology(requirement, score)
return {
"automation_score": round(score, 1),
"automation_category": category,
"recommended_technology": tech,
"rationale": self._generate_automation_rationale(requirement, score, category)
}
def _recommend_automation_technology(self, requirement: Dict, score: float) -> str:
"""
Recommend suitable automation technology.
Args:
requirement: Analyzed requirement
score: Automation score
Returns:
Recommended technology
"""
complexity = requirement["extracted"]["complexity"]
actions = requirement["extracted"]["actions"]
# Decision tree for technology recommendation
if score >= 8:
if any(a in actions for a in ["extract", "scrape", "read"]):
return "RPA with OCR/Document Understanding"
else:
return "Traditional RPA"
elif score >= 5:
if complexity == "high":
return "RPA with Human-in-the-Loop"
elif any(a in actions for a in ["decide", "evaluate", "assess"]):
return "RPA with Decision Automation"
else:
return "Traditional RPA"
else:
if any(a in actions for a in ["review", "approve"]):
return "Workflow Automation"
else:
return "Partial Automation with Human Tasks"
def _generate_automation_rationale(self, requirement: Dict, score: float, category: str) -> str:
"""
Generate explanation for automation assessment.
Args:
requirement: Analyzed requirement
score: Automation score
category: Automation category
Returns:
Rationale text
"""
complexity = requirement["extracted"]["complexity"]
if category == "high":
return (f"This requirement has {complexity} complexity but shows strong automation "
f"potential due to clear structure and defined data elements. "
f"Score of {score}/10 indicates this is a prime automation candidate.")
elif category == "medium":
return (f"This {complexity} complexity requirement has moderate automation potential. "
f"Score of {score}/10 suggests partial automation with some human oversight.")
else:
return (f"The {complexity} complexity and ambiguous nature of this requirement "
f"limits automation potential. Score of {score}/10 indicates this may "
f"require significant human involvement or process redesign.")
def assess_requirements_automation_potential(self, requirements: List[Dict]) -> List[Dict]:
"""
Assess automation potential for a batch of requirements.
Args:
requirements: List of analyzed requirements
Returns:
Requirements with automation assessment added
"""
for req in requirements:
req["automation_potential"] = self.evaluate_automation_potential(req)
return requirements
def generate_requirements_report(self, requirements: List[Dict]) -> Dict:
"""
Generate a summary report of requirements analysis.
Args:
requirements: List of analyzed requirements
Returns:
Report dictionary
"""
# Count by complexity
complexity_counts = {"high": 0, "medium": 0, "low": 0}
for req in requirements:
complexity = req["extracted"]["complexity"]
complexity_counts[complexity] += 1
# Count by automation potential
if all("automation_potential" in req for req in requirements):
automation_counts = {"high": 0, "medium": 0, "low": 0}
for req in requirements:
category = req["automation_potential"]["automation_category"]
automation_counts[category] += 1
else:
automation_counts = None
# Find common systems
all_systems = []
for req in requirements:
all_systems.extend(req["extracted"]["systems"])
system_counts = {}
for system in all_systems:
if system in system_counts:
system_counts[system] += 1
else:
system_counts[system] = 1
# Sort systems by frequency
top_systems = sorted(system_counts.items(), key=lambda x: x[1], reverse=True)[:5]
# Generate report
report = {
"total_requirements": len(requirements),
"complexity_distribution": complexity_counts,
"automation_potential": automation_counts,
"top_systems": top_systems,
"recommendations": self._generate_overall_recommendations(requirements)
}
return report
def _generate_overall_recommendations(self, requirements: List[Dict]) -> List[str]:
"""
Generate overall recommendations based on requirements analysis.
Args:
requirements: List of analyzed requirements
Returns:
List of recommendation strings
"""
recommendations = []
# Check if automation assessment is available
automation_available = all("automation_potential" in req for req in requirements)
if automation_available:
# Count high automation potential requirements
high_potential = [r for r in requirements
if r["automation_potential"]["automation_category"] == "high"]
if len(high_potential) >= len(requirements) * 0.7:
recommendations.append(
"High automation potential across most requirements. "
"Consider an end-to-end automation solution."
)
elif len(high_potential) >= len(requirements) * 0.3:
recommendations.append(
"Significant automation potential in a subset of requirements. "
"Consider a phased automation approach starting with high-potential areas."
)
else:
recommendations.append(
"Limited automation potential in current requirements. "
"Consider process redesign to increase automation potential."
)
# Recommend technologies
tech_counts = {}
for req in requirements:
tech = req["automation_potential"]["recommended_technology"]
tech_counts[tech] = tech_counts.get(tech, 0) + 1
top_tech = max(tech_counts.items(), key=lambda x: x[1])[0]
recommendations.append(f"Primary recommended technology: {top_tech}")
# Requirements quality recommendations
completeness_issues = False
for req in requirements:
if (not req["extracted"]["actions"] or
not req["extracted"]["systems"] or
not req["extracted"]["data_elements"]):
completeness_issues = True
break
if completeness_issues:
recommendations.append(
"Some requirements lack necessary details. "
"Consider refining requirements to specify actions, systems, and data elements."
)
return recommendations
Version 2 of 2