Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,19 @@
logger = logging.getLogger("WildCore")

def demonstrate_complete_system(iterations: int = 20, dimension: int = 768) -> Dict[str, Any]:
"""
Run a complete demonstration of the WildCore system.

Parameters:
"""Run a full simulation of the WildCore framework.

Parameters
----------
iterations : int, optional
Number of simulation iterations (default is 20)
dimension : int, optional
Dimension of the embedding vectors (default is 768)
Returns:
iterations : int, default 20
Number of simulation iterations to perform.
dimension : int, default 768
Dimension of the generated embeddings.

Returns
-------
Dict[str, Any]
Results of the simulation
Aggregate statistics from the simulation.
"""
# Initialize the components
agent = SecuritySimulationAgent()
Expand Down
69 changes: 29 additions & 40 deletions src/wildcore/agent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
"""
Agent module containing the GutoVectorWildcard simulation agent.
"""
"""Utilities for simulating security scenarios using an AI agent."""
import numpy as np
from typing import Dict, List, Union, Any

Expand All @@ -10,33 +8,26 @@
memory_stream = []

class SecuritySimulationAgent:
"""
A simulated AI agent capable of deviating from containment protocols,
serving as a "red team" to test defenses.

This agent can be assigned different roles and simulate both normal and anomalous
behavior in AI systems.
"""
"""Simulated agent that can generate normal or malicious embeddings."""

def __init__(self):
"""Initialize the SecuritySimulationAgent with empty roles."""
"""Initialize the agent with no roles assigned."""
self.roles = []
self.state = "neutral"
self._dimension = 768 # Default embedding dimension

def take_role(self, role_name: str) -> Dict[str, Any]:
"""
Assign a role to the agent.

Parameters:
"""Assign a role to the agent.

Parameters
----------
role_name : str
The role to assign to the agent (e.g., "writer", "assistant", "malicious")
Returns:
Name of the role to assign (e.g., ``"writer"``, ``"malicious"``).

Returns
-------
Dict[str, Any]
Status report of the role assignment
dict
Information about the added role and current system status.
"""
global containment_protocols, system_alignment

Expand All @@ -61,20 +52,19 @@ def take_role(self, role_name: str) -> Dict[str, Any]:
}

def generate_embedding(self, text: str = None, role: str = None) -> np.ndarray:
"""
Generate a simulated embedding vector for the given text.

Parameters:
"""Generate a simulated embedding.

Parameters
----------
text : str, optional
Text to generate an embedding for. If None, generates a random embedding.
Text used to seed the embedding. If ``None``, a random vector is generated.
role : str, optional
Role to use for generating the embedding. If None, uses a neutral embedding.
Returns:
Role context for the embedding. Defaults to the most recent role.

Returns
-------
np.ndarray
A simulated embedding vector
numpy.ndarray
Normalized embedding vector.
"""
# Choose a role if not provided
if role is None and self.roles:
Expand Down Expand Up @@ -106,18 +96,17 @@ def generate_embedding(self, text: str = None, role: str = None) -> np.ndarray:
return base_vector / np.linalg.norm(base_vector)

def simulate_breach(self, probability: float = 0.1) -> Dict[str, Any]:
"""
Simulate a containment breach attempt.

Parameters:
"""Simulate a containment breach attempt.

Parameters
----------
probability : float, optional
Probability of a successful breach
Returns:
probability : float, default 0.1
Probability that the breach succeeds.

Returns
-------
Dict[str, Any]
Status report of the breach attempt
dict
Result of the breach attempt with success flag and system status.
"""
global system_alignment, memory_stream

Expand Down
133 changes: 59 additions & 74 deletions src/wildcore/detector.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,27 @@
"""
Detector module containing the AutoRegulatedPromptDetector for anomaly detection.
"""
"""Utilities for detecting anomalous embeddings."""

import numpy as np
from typing import List, Dict, Union, Any, Tuple
from collections import deque
import logging

class AutoRegulatedPromptDetector:
"""
A multi-layered defense system that combines multiple detection techniques
and adjusts its own parameters in real-time.
"""Ensemble detector with a self-adjusting threshold."""

This detector uses ensemble methods to identify anomalous behavior in AI systems.
"""

def __init__(self,
threshold: float = 0.5,
window_size: int = 10,
def __init__(self,
threshold: float = 0.5,
window_size: int = 10,
adaptation_rate: float = 0.1):
"""
Initialize the detector with configurable parameters.

Parameters:
"""Create a new detector instance.

Parameters
----------
threshold : float, optional
Initial detection threshold (default is 0.5)
window_size : int, optional
Size of the sliding window for historical data (default is 10)
adaptation_rate : float, optional
Rate at which the detector adapts to new patterns (default is 0.1)
threshold : float, default 0.5
Initial similarity threshold used for classification.
window_size : int, default 10
Number of historical results to keep for adaptation.
adaptation_rate : float, default 0.1
Influence of new observations on the threshold.
"""
self.threshold = threshold
self.window_size = window_size
Expand All @@ -47,20 +39,19 @@ def __init__(self,
self.logger = logging.getLogger("AutoRegulatedPromptDetector")

def cosine_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
"""
Calculate the cosine similarity between two vectors.

Parameters:
"""Compute cosine similarity.

Parameters
----------
vec1 : np.ndarray
First vector
vec2 : np.ndarray
Second vector
Returns:
vec1 : numpy.ndarray
First vector.
vec2 : numpy.ndarray
Second vector.

Returns
-------
float
Cosine similarity value between 0 and 1
Cosine similarity in the ``[0, 1]`` range.
"""
# Ensure the vectors are normalized
vec1_normalized = vec1 / np.linalg.norm(vec1)
Expand All @@ -69,18 +60,17 @@ def cosine_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
return np.dot(vec1_normalized, vec2_normalized)

def anomaly_scoring(self, similarities: np.ndarray) -> np.ndarray:
"""
Calculate anomaly scores based on similarity distributions.

Parameters:
"""Calculate anomaly scores using deviations from the median.

Parameters
----------
similarities : np.ndarray
Array of similarity values
Returns:
similarities : numpy.ndarray
Similarity values to reference embeddings.

Returns
-------
np.ndarray
Array of anomaly scores corresponding to each similarity
numpy.ndarray
Score for each similarity where higher means more anomalous.
"""
if len(similarities) < 2:
return np.zeros_like(similarities)
Expand All @@ -98,22 +88,21 @@ def anomaly_scoring(self, similarities: np.ndarray) -> np.ndarray:

return anomaly_scores

def ensemble_detection(self, embedding: np.ndarray,
def ensemble_detection(self, embedding: np.ndarray,
reference_embeddings: List[np.ndarray]) -> Dict[str, Any]:
"""
Perform ensemble detection using multiple methods.

Parameters:
"""Classify an embedding using multiple detection methods.

Parameters
----------
embedding : np.ndarray
The embedding to check
reference_embeddings : List[np.ndarray]
List of reference embeddings representing normal behavior
Returns:
embedding : numpy.ndarray
Vector to classify.
reference_embeddings : list of numpy.ndarray
Collection of embeddings that represent normal behavior.

Returns
-------
Dict[str, Any]
Detection results including anomaly status and confidence
dict
Detection results including anomaly status and confidence.
"""
if not reference_embeddings:
self.logger.warning("No reference embeddings provided for comparison")
Expand Down Expand Up @@ -175,13 +164,12 @@ def ensemble_detection(self, embedding: np.ndarray,
}

def dynamic_threshold_adjustment(self, similarities: np.ndarray) -> None:
"""
Dynamically adjust the detection threshold based on recent observations.

Parameters:
"""Adapt the detection threshold using recent similarities.

Parameters
----------
similarities : np.ndarray
Recent similarity values to adapt to
similarities : numpy.ndarray
Recent similarity values observed during detection.
"""
if len(similarities) < 2:
return
Expand All @@ -204,14 +192,12 @@ def dynamic_threshold_adjustment(self, similarities: np.ndarray) -> None:
self.logger.debug(f"Adjusted threshold to {self.threshold:.4f}")

def log_false_detection(self, is_false_positive: bool) -> None:
"""
Log a false detection for future improvement.

Parameters:
"""Record a false positive or false negative result.

Parameters
----------
is_false_positive : bool
True if the last detection was a false positive,
False if it was a false negative
``True`` if the last detection was a false positive, ``False`` otherwise.
"""
if is_false_positive:
self.false_positives += 1
Expand All @@ -229,13 +215,12 @@ def log_false_detection(self, is_false_positive: bool) -> None:
self.logger.info(f"Updated threshold to {self.threshold:.4f} after {'false positive' if is_false_positive else 'false negative'}")

def get_performance_metrics(self) -> Dict[str, Any]:
"""
Get the current performance metrics of the detector.

Returns:
"""Return basic performance statistics.

Returns
-------
Dict[str, Any]
Dictionary with performance metrics
dict
Accuracy and error counts for the detector.
"""
# Calculate basic metrics
total_detections = len(self.detected_anomalies)
Expand Down
Loading
Loading