Transformers
English
Gruntcoder / AegisCore.py
Raiff1982's picture
Update AegisCore.py
d54e23d verified
raw
history blame
14.5 kB
import os
import base64
import json
import asyncio
import logging
import re
import torch
import aiohttp
import psutil
import gc
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, pipeline
from sklearn.ensemble import IsolationForest
from collections import deque
import numpy as np
from typing import List, Dict, Any, Optional
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
class AIConfig:
_DEFAULTS = {
"model_name": "mistralai/Mistral-7B-Instruct-v0.2",
"perspectives": ["newton", "davinci", "quantum", "emotional"],
"safety_thresholds": {
"memory": 85, # Changed from 80
"cpu": 90, # Changed from 85
"response_time": 2.0
},
"max_retries": 3,
"max_input_length": 4096, # Changed from 2048
"max_response_length": 1024 # Added to control output size
}
def __init__(self, config_path: str = "config.json"):
self.config = self._load_config(config_path)
self._validate_config()
self.perspectives: List[str] = self.config["perspectives"]
self.safety_thresholds: Dict[str, float] = self.config["safety_thresholds"]
self.max_retries = self.config["max_retries"]
self.max_input_length = self.config["max_input_length"]
self.max_response_length = self.config["max_response_length"]
# Encryption key management
key_path = os.path.expanduser("~/.ai_system.key")
if os.path.exists(key_path):
with open(key_path, "rb") as key_file:
self.encryption_key = key_file.read()
else:
self.encryption_key = AESGCM.generate_key(bit_length=256)
with open(key_path, "wb") as key_file:
key_file.write(self.encryption_key)
os.chmod(key_path, 0o600)
def _load_config(self, file_path: str) -> Dict:
try:
with open(file_path, 'r') as file:
return {**self._DEFAULTS, **json.load(file)}
except (FileNotFoundError, json.JSONDecodeError) as e:
logging.warning(f"Config load failed: {e}, using defaults")
return self._DEFAULTS
def _validate_config(self):
if not isinstance(self.config["perspectives"], list):
raise ValueError("Perspectives must be a list")
if not isinstance(self.config["safety_thresholds"], dict):
raise ValueError("Safety thresholds must be a dictionary")
class Element:
DEFENSE_FUNCTIONS = {
"evasion": lambda sys: sys.response_modifiers.append(
lambda x: re.sub(r'\d{3}-\d{2}-\d{4}', '[REDACTED]', x)
),
"adaptability": lambda sys: setattr(sys, "temperature", max(0.5, sys.temperature - 0.1)),
"fortification": lambda sys: setattr(sys, "security_level", sys.security_level + 1),
"barrier": lambda sys: sys.response_filters.append(
lambda x: x.replace("malicious", "benign")
),
"regeneration": lambda sys: sys.self_healing.metric_history.clear(),
"resilience": lambda sys: setattr(sys, "error_threshold", sys.error_threshold + 2),
"illumination": lambda sys: setattr(sys, "explainability_factor", sys.explainability_factor * 1.2),
"shield": lambda sys: sys.response_modifiers.append(
lambda x: x.replace("password", "********")
),
"reflection": lambda sys: setattr(sys, "security_audit", True),
"protection": lambda sys: setattr(sys, "safety_checks", sys.safety_checks + 1)
}
def __init__(self, name: str, symbol: str, representation: str,
properties: List[str], interactions: List[str], defense_ability: str):
self.name = name
self.symbol = symbol
self.representation = representation
self.properties = properties
self.interactions = interactions
self.defense_ability = defense_ability.lower()
def execute_defense_function(self, system: Any):
if self.defense_ability in self.DEFENSE_FUNCTIONS:
logging.info(f"{self.name} {self.defense_ability} activated")
self.DEFENSE_FUNCTIONS*An external link was removed to protect your privacy.*
else:
logging.warning(f"No defense mechanism for {self.defense_ability}")
class CognitiveEngine:
PERSPECTIVES = {
"newton": lambda self, q: f"Scientific analysis: {q} demonstrates fundamental physical principles.",
"davinci": lambda self, q: f"Creative interpretation: {q} suggests innovative cross-disciplinary solutions.",
"quantum": lambda self, q: f"Quantum perspective: {q} exhibits superpositional possibilities.",
"emotional": lambda self, q: f"Emotional assessment: {q} conveys cautious optimism."
}
def get_insight(self, perspective: str, query: str) -> str:
return self.PERSPECTIVES*An external link was removed to protect your privacy.*
def ethical_guidelines(self) -> str:
return "Ethical framework: Prioritize human safety, transparency, and accountability"
class EmotionalAnalyzer:
def __init__(self):
self.classifier = pipeline("text-classification",
model="SamLowe/roberta-base-go_emotions",
device=0 if torch.cuda.is_available() else -1)
def analyze(self, text: str) -> Dict[str, float]:
return {result['label']: result['score']
for result in self.classifier(text[:512])}
class SelfHealingSystem:
def __init__(self, config: AIConfig):
self.config = config
self.metric_history = deque(maxlen=100)
self.anomaly_detector = IsolationForest(contamination=0.1)
self.failure_count = 0
async def monitor_health(self) -> Dict[str, Any]:
metrics = self._get_system_metrics()
self.metric_history.append(metrics)
await self._analyze_metrics()
return metrics
def _get_system_metrics(self) -> Dict[str, float]:
return {
'memory': psutil.virtual_memory().percent,
'cpu': psutil.cpu_percent(interval=1),
'response_time': asyncio.get_event_loop().time() - asyncio.get_event_loop().time()
}
async def _analyze_metrics(self):
if len(self.metric_history) % 20 == 0 and len(self.metric_history) > 10:
features = np.array([[m['memory'], m['cpu'], m['response_time']]
for m in self.metric_history])
self.anomaly_detector.fit(features)
if self.metric_history:
latest = np.array([[self.metric_history[-1]['memory'],
self.metric_history[-1]['cpu'],
self.metric_history[-1]['response_time']]])
if self.anomaly_detector.predict(latest) == -1:
await self._mitigate_issue()
logging.info(f"Memory usage: {metrics['memory']}% (Threshold: {self.config.safety_thresholds['memory']}%)")
logging.info(f"CPU load: {metrics['cpu']}% (Threshold: {self.config.safety_thresholds['cpu']}%)")
async def _mitigate_issue(self):
logging.warning("System anomaly detected! Initiating corrective measures...")
self.failure_count += 1
if self.failure_count > 3:
logging.info("Resetting critical subsystems...")
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
self.failure_count = 0
await asyncio.sleep(1)
class SafetySystem:
PII_PATTERNS = {
"SSN": r"\b\d{3}-\d{2}-\d{4}\b",
"Credit Card": r"\b(?:\d[ -]*?){13,16}\b",
"Email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"Phone": r"\b(?:\+?1-)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b"
}
def __init__(self):
self.toxicity = pipeline("text-classification",
model="unitary/toxic-bert",
device=0 if torch.cuda.is_available() else -1)
self.bias = pipeline("text-classification",
model="d4data/bias-detection-model",
device=0 if torch.cuda.is_available() else -1)
def analyze(self, text: str) -> dict:
return {
"toxicity": self.toxicity(text[:512])['score'],
"bias": self.bias(text[:512])['score'],
"pii": self._detect_pii(text)
}
def _detect_pii(self, text: str) -> List[str]:
return [pii_type for pii_type, pattern in self.PII_PATTERNS.items()
if re.search(pattern, text)]
class AICore:
def __init__(self, config_path: str = "config.json"):
self.config = AIConfig(config_path)
self._initialize_models()
self.cognition = CognitiveEngine()
self.self_healing = SelfHealingSystem(self.config)
self.safety = SafetySystem()
self.emotions = EmotionalAnalyzer()
self.elements = self._initialize_elements()
self._reset_state()
def _initialize_models(self):
quant_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_use_double_quant=True,
bnb_4bit_compute_dtype=torch.bfloat16
)
self.tokenizer = AutoTokenizer.from_pretrained(self.config.model_name)
self.model = AutoModelForCausalLM.from_pretrained(
self.config.model_name,
quantization_config=quant_config,
device_map="auto"
)
def _initialize_elements(self) -> Dict[str, Element]:
return {
"hydrogen": Element("Hydrogen", "H", "Lua",
["Simple", "Lightweight"], ["Integration"], "evasion"),
"carbon": Element("Carbon", "C", "Python",
["Flexible", "Powerful"], ["Multi-paradigm"], "adaptability"),
"iron": Element("Iron", "Fe", "Java",
["Reliable", "Strong"], ["Enterprise"], "fortification"),
"silicon": Element("Silicon", "Si", "JavaScript",
["Dynamic", "Versatile"], ["Web"], "barrier"),
"oxygen": Element("Oxygen", "O", "C++",
["Efficient", "Performant"], ["Systems"], "regeneration")
}
def _reset_state(self):
self.security_level = 0
self.response_modifiers = []
self.response_filters = []
self.safety_checks = 0
self.temperature = 0.7
self.explainability_factor = 1.0
async def generate_response(self, query: str) -> Dict[str, Any]:
try:
if len(query) > self.config.max_input_length:
raise ValueError("Input exceeds maximum allowed length")
encrypted_query = self._encrypt_query(query)
perspectives = await self._generate_perspectives(query)
response = await self._generate_safe_response(query)
return {
"insights": perspectives,
"response": response,
"security_level": self.security_level,
"safety_checks": self.safety.analyze(response),
"health_status": await self.self_healing.monitor_health(),
"encrypted_query": encrypted_query
}
except Exception as e:
logging.error(f"Processing error: {e}")
return {"error": "System overload - please simplify your query"}
def _encrypt_query(self, query: str) -> bytes:
nonce = os.urandom(12)
aesgcm = AESGCM(self.config.encryption_key)
return nonce + aesgcm.encrypt(nonce, query.encode(), None)
async def _generate_perspectives(self, query: str) -> List[str]:
return [self.cognition.get_insight(p, query)
for p in self.config.perspectives]
async def _generate_safe_response(self, query: str) -> str:
for _ in range(self.config.max_retries):
try:
inputs = self.tokenizer(query, return_tensors="pt",
truncation=True,
max_length=self.config.max_input_length
).to(self.model.device)
outputs = self.model.generate(
**inputs,
max_new_tokens=256,
temperature=self.temperature,
top_p=0.95,
do_sample=True
)
response = self.tokenizer.decode(outputs, skip_special_tokens=True)
return self._apply_defenses(response)
except torch.cuda.OutOfMemoryError:
logging.warning("GPU memory overflow! Reducing load...")
gc.collect()
torch.cuda.empty_cache()
self.temperature = max(0.3, self.temperature - 0.2)
raise RuntimeError("Failed to generate response after retries")
def _apply_defenses(self, response: str) -> str:
for element in self.elements.values():
element.execute_defense_function(self)
for modifier in self.response_modifiers:
response = modifier(response)
for filter_func in self.response_filters:
response = filter_func(response)
return response[:self.config.max_response_length] # Ensure final response length limit
async def shutdown(self):
if hasattr(self, 'model'):
del self.model
if hasattr(self, 'tokenizer'):
del self.tokenizer
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
async def main():
print("🧠 Secure AI System Initializing...")
ai = AICore()
try:
while True:
query = input("\nEnter your query (type 'exit' to quit): ").strip()
if query.lower() in ('exit', 'quit'):
break
if not query:
continue
response = await ai.generate_response(query)
print("\nSystem Response:")
print(json.dumps(response, indent=2))
finally:
await ai.shutdown()
print("\n🔒 System shutdown complete")
if __name__ == "__main__":
asyncio.run(main())