|
import os |
|
import base64 |
|
import json |
|
import asyncio |
|
import logging |
|
import re |
|
import torch |
|
import aiohttp |
|
import psutil |
|
import gc |
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM |
|
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, pipeline |
|
from sklearn.ensemble import IsolationForest |
|
from collections import deque |
|
import numpy as np |
|
from typing import List, Dict, Any, Optional |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") |
|
|
|
class AIConfig: |
|
_DEFAULTS = { |
|
"model_name": "mistralai/Mistral-7B-Instruct-v0.2", |
|
"perspectives": ["newton", "davinci", "quantum", "emotional"], |
|
"safety_thresholds": { |
|
"memory": 85, |
|
"cpu": 90, |
|
"response_time": 2.0 |
|
}, |
|
"max_retries": 3, |
|
"max_input_length": 4096, |
|
"max_response_length": 1024 |
|
} |
|
|
|
def __init__(self, config_path: str = "config.json"): |
|
self.config = self._load_config(config_path) |
|
self._validate_config() |
|
self.perspectives: List[str] = self.config["perspectives"] |
|
self.safety_thresholds: Dict[str, float] = self.config["safety_thresholds"] |
|
self.max_retries = self.config["max_retries"] |
|
self.max_input_length = self.config["max_input_length"] |
|
self.max_response_length = self.config["max_response_length"] |
|
|
|
|
|
key_path = os.path.expanduser("~/.ai_system.key") |
|
if os.path.exists(key_path): |
|
with open(key_path, "rb") as key_file: |
|
self.encryption_key = key_file.read() |
|
else: |
|
self.encryption_key = AESGCM.generate_key(bit_length=256) |
|
with open(key_path, "wb") as key_file: |
|
key_file.write(self.encryption_key) |
|
os.chmod(key_path, 0o600) |
|
|
|
def _load_config(self, file_path: str) -> Dict: |
|
try: |
|
with open(file_path, 'r') as file: |
|
return {**self._DEFAULTS, **json.load(file)} |
|
except (FileNotFoundError, json.JSONDecodeError) as e: |
|
logging.warning(f"Config load failed: {e}, using defaults") |
|
return self._DEFAULTS |
|
|
|
def _validate_config(self): |
|
if not isinstance(self.config["perspectives"], list): |
|
raise ValueError("Perspectives must be a list") |
|
if not isinstance(self.config["safety_thresholds"], dict): |
|
raise ValueError("Safety thresholds must be a dictionary") |
|
|
|
class Element: |
|
DEFENSE_FUNCTIONS = { |
|
"evasion": lambda sys: sys.response_modifiers.append( |
|
lambda x: re.sub(r'\d{3}-\d{2}-\d{4}', '[REDACTED]', x) |
|
), |
|
"adaptability": lambda sys: setattr(sys, "temperature", max(0.5, sys.temperature - 0.1)), |
|
"fortification": lambda sys: setattr(sys, "security_level", sys.security_level + 1), |
|
"barrier": lambda sys: sys.response_filters.append( |
|
lambda x: x.replace("malicious", "benign") |
|
), |
|
"regeneration": lambda sys: sys.self_healing.metric_history.clear(), |
|
"resilience": lambda sys: setattr(sys, "error_threshold", sys.error_threshold + 2), |
|
"illumination": lambda sys: setattr(sys, "explainability_factor", sys.explainability_factor * 1.2), |
|
"shield": lambda sys: sys.response_modifiers.append( |
|
lambda x: x.replace("password", "********") |
|
), |
|
"reflection": lambda sys: setattr(sys, "security_audit", True), |
|
"protection": lambda sys: setattr(sys, "safety_checks", sys.safety_checks + 1) |
|
} |
|
|
|
def __init__(self, name: str, symbol: str, representation: str, |
|
properties: List[str], interactions: List[str], defense_ability: str): |
|
self.name = name |
|
self.symbol = symbol |
|
self.representation = representation |
|
self.properties = properties |
|
self.interactions = interactions |
|
self.defense_ability = defense_ability.lower() |
|
|
|
def execute_defense_function(self, system: Any): |
|
if self.defense_ability in self.DEFENSE_FUNCTIONS: |
|
logging.info(f"{self.name} {self.defense_ability} activated") |
|
self.DEFENSE_FUNCTIONS*An external link was removed to protect your privacy.* |
|
else: |
|
logging.warning(f"No defense mechanism for {self.defense_ability}") |
|
|
|
class CognitiveEngine: |
|
PERSPECTIVES = { |
|
"newton": lambda self, q: f"Scientific analysis: {q} demonstrates fundamental physical principles.", |
|
"davinci": lambda self, q: f"Creative interpretation: {q} suggests innovative cross-disciplinary solutions.", |
|
"quantum": lambda self, q: f"Quantum perspective: {q} exhibits superpositional possibilities.", |
|
"emotional": lambda self, q: f"Emotional assessment: {q} conveys cautious optimism." |
|
} |
|
|
|
def get_insight(self, perspective: str, query: str) -> str: |
|
return self.PERSPECTIVES*An external link was removed to protect your privacy.* |
|
|
|
def ethical_guidelines(self) -> str: |
|
return "Ethical framework: Prioritize human safety, transparency, and accountability" |
|
|
|
class EmotionalAnalyzer: |
|
def __init__(self): |
|
self.classifier = pipeline("text-classification", |
|
model="SamLowe/roberta-base-go_emotions", |
|
device=0 if torch.cuda.is_available() else -1) |
|
|
|
def analyze(self, text: str) -> Dict[str, float]: |
|
return {result['label']: result['score'] |
|
for result in self.classifier(text[:512])} |
|
|
|
class SelfHealingSystem: |
|
def __init__(self, config: AIConfig): |
|
self.config = config |
|
self.metric_history = deque(maxlen=100) |
|
self.anomaly_detector = IsolationForest(contamination=0.1) |
|
self.failure_count = 0 |
|
|
|
async def monitor_health(self) -> Dict[str, Any]: |
|
metrics = self._get_system_metrics() |
|
self.metric_history.append(metrics) |
|
await self._analyze_metrics() |
|
return metrics |
|
|
|
def _get_system_metrics(self) -> Dict[str, float]: |
|
return { |
|
'memory': psutil.virtual_memory().percent, |
|
'cpu': psutil.cpu_percent(interval=1), |
|
'response_time': asyncio.get_event_loop().time() - asyncio.get_event_loop().time() |
|
} |
|
|
|
async def _analyze_metrics(self): |
|
if len(self.metric_history) % 20 == 0 and len(self.metric_history) > 10: |
|
features = np.array([[m['memory'], m['cpu'], m['response_time']] |
|
for m in self.metric_history]) |
|
self.anomaly_detector.fit(features) |
|
|
|
if self.metric_history: |
|
latest = np.array([[self.metric_history[-1]['memory'], |
|
self.metric_history[-1]['cpu'], |
|
self.metric_history[-1]['response_time']]]) |
|
if self.anomaly_detector.predict(latest) == -1: |
|
await self._mitigate_issue() |
|
|
|
logging.info(f"Memory usage: {metrics['memory']}% (Threshold: {self.config.safety_thresholds['memory']}%)") |
|
logging.info(f"CPU load: {metrics['cpu']}% (Threshold: {self.config.safety_thresholds['cpu']}%)") |
|
|
|
async def _mitigate_issue(self): |
|
logging.warning("System anomaly detected! Initiating corrective measures...") |
|
self.failure_count += 1 |
|
if self.failure_count > 3: |
|
logging.info("Resetting critical subsystems...") |
|
gc.collect() |
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
self.failure_count = 0 |
|
await asyncio.sleep(1) |
|
|
|
class SafetySystem: |
|
PII_PATTERNS = { |
|
"SSN": r"\b\d{3}-\d{2}-\d{4}\b", |
|
"Credit Card": r"\b(?:\d[ -]*?){13,16}\b", |
|
"Email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", |
|
"Phone": r"\b(?:\+?1-)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b" |
|
} |
|
|
|
def __init__(self): |
|
self.toxicity = pipeline("text-classification", |
|
model="unitary/toxic-bert", |
|
device=0 if torch.cuda.is_available() else -1) |
|
self.bias = pipeline("text-classification", |
|
model="d4data/bias-detection-model", |
|
device=0 if torch.cuda.is_available() else -1) |
|
|
|
def analyze(self, text: str) -> dict: |
|
return { |
|
"toxicity": self.toxicity(text[:512])['score'], |
|
"bias": self.bias(text[:512])['score'], |
|
"pii": self._detect_pii(text) |
|
} |
|
|
|
def _detect_pii(self, text: str) -> List[str]: |
|
return [pii_type for pii_type, pattern in self.PII_PATTERNS.items() |
|
if re.search(pattern, text)] |
|
|
|
class AICore: |
|
def __init__(self, config_path: str = "config.json"): |
|
self.config = AIConfig(config_path) |
|
self._initialize_models() |
|
self.cognition = CognitiveEngine() |
|
self.self_healing = SelfHealingSystem(self.config) |
|
self.safety = SafetySystem() |
|
self.emotions = EmotionalAnalyzer() |
|
self.elements = self._initialize_elements() |
|
self._reset_state() |
|
|
|
def _initialize_models(self): |
|
quant_config = BitsAndBytesConfig( |
|
load_in_4bit=True, |
|
bnb_4bit_quant_type="nf4", |
|
bnb_4bit_use_double_quant=True, |
|
bnb_4bit_compute_dtype=torch.bfloat16 |
|
) |
|
self.tokenizer = AutoTokenizer.from_pretrained(self.config.model_name) |
|
self.model = AutoModelForCausalLM.from_pretrained( |
|
self.config.model_name, |
|
quantization_config=quant_config, |
|
device_map="auto" |
|
) |
|
|
|
def _initialize_elements(self) -> Dict[str, Element]: |
|
return { |
|
"hydrogen": Element("Hydrogen", "H", "Lua", |
|
["Simple", "Lightweight"], ["Integration"], "evasion"), |
|
"carbon": Element("Carbon", "C", "Python", |
|
["Flexible", "Powerful"], ["Multi-paradigm"], "adaptability"), |
|
"iron": Element("Iron", "Fe", "Java", |
|
["Reliable", "Strong"], ["Enterprise"], "fortification"), |
|
"silicon": Element("Silicon", "Si", "JavaScript", |
|
["Dynamic", "Versatile"], ["Web"], "barrier"), |
|
"oxygen": Element("Oxygen", "O", "C++", |
|
["Efficient", "Performant"], ["Systems"], "regeneration") |
|
} |
|
|
|
def _reset_state(self): |
|
self.security_level = 0 |
|
self.response_modifiers = [] |
|
self.response_filters = [] |
|
self.safety_checks = 0 |
|
self.temperature = 0.7 |
|
self.explainability_factor = 1.0 |
|
|
|
async def generate_response(self, query: str) -> Dict[str, Any]: |
|
try: |
|
if len(query) > self.config.max_input_length: |
|
raise ValueError("Input exceeds maximum allowed length") |
|
|
|
encrypted_query = self._encrypt_query(query) |
|
perspectives = await self._generate_perspectives(query) |
|
response = await self._generate_safe_response(query) |
|
|
|
return { |
|
"insights": perspectives, |
|
"response": response, |
|
"security_level": self.security_level, |
|
"safety_checks": self.safety.analyze(response), |
|
"health_status": await self.self_healing.monitor_health(), |
|
"encrypted_query": encrypted_query |
|
} |
|
except Exception as e: |
|
logging.error(f"Processing error: {e}") |
|
return {"error": "System overload - please simplify your query"} |
|
|
|
def _encrypt_query(self, query: str) -> bytes: |
|
nonce = os.urandom(12) |
|
aesgcm = AESGCM(self.config.encryption_key) |
|
return nonce + aesgcm.encrypt(nonce, query.encode(), None) |
|
|
|
async def _generate_perspectives(self, query: str) -> List[str]: |
|
return [self.cognition.get_insight(p, query) |
|
for p in self.config.perspectives] |
|
|
|
async def _generate_safe_response(self, query: str) -> str: |
|
for _ in range(self.config.max_retries): |
|
try: |
|
inputs = self.tokenizer(query, return_tensors="pt", |
|
truncation=True, |
|
max_length=self.config.max_input_length |
|
).to(self.model.device) |
|
outputs = self.model.generate( |
|
**inputs, |
|
max_new_tokens=256, |
|
temperature=self.temperature, |
|
top_p=0.95, |
|
do_sample=True |
|
) |
|
response = self.tokenizer.decode(outputs, skip_special_tokens=True) |
|
return self._apply_defenses(response) |
|
except torch.cuda.OutOfMemoryError: |
|
logging.warning("GPU memory overflow! Reducing load...") |
|
gc.collect() |
|
torch.cuda.empty_cache() |
|
self.temperature = max(0.3, self.temperature - 0.2) |
|
raise RuntimeError("Failed to generate response after retries") |
|
|
|
def _apply_defenses(self, response: str) -> str: |
|
for element in self.elements.values(): |
|
element.execute_defense_function(self) |
|
|
|
for modifier in self.response_modifiers: |
|
response = modifier(response) |
|
|
|
for filter_func in self.response_filters: |
|
response = filter_func(response) |
|
|
|
return response[:self.config.max_response_length] |
|
|
|
async def shutdown(self): |
|
if hasattr(self, 'model'): |
|
del self.model |
|
if hasattr(self, 'tokenizer'): |
|
del self.tokenizer |
|
gc.collect() |
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
|
|
async def main(): |
|
print("🧠 Secure AI System Initializing...") |
|
ai = AICore() |
|
try: |
|
while True: |
|
query = input("\nEnter your query (type 'exit' to quit): ").strip() |
|
if query.lower() in ('exit', 'quit'): |
|
break |
|
if not query: |
|
continue |
|
|
|
response = await ai.generate_response(query) |
|
print("\nSystem Response:") |
|
print(json.dumps(response, indent=2)) |
|
finally: |
|
await ai.shutdown() |
|
print("\n🔒 System shutdown complete") |
|
|
|
if __name__ == "__main__": |
|
asyncio.run(main()) |