This document provides advanced guidance for developers and power users, including details on monitoring, testing, and extending the AOD system. For installation and basic configuration, please see the main README.md.
Operating System:
- Linux kernel 5.15+ with eBPF support
- Note: Future eBPF scripts will require kernel 6.8+
Permissions:
- Root access required for eBPF program loading and system monitoring
Python Version:
- Python 3.9+ (based on project configuration)
The core AOD system requires specific Python packages listed in requirements.txt:
# Core monitoring dependencies
pandas>=1.5.0 # Data analysis and manipulation
matplotlib>=3.5.0 # Plotting and visualization
numpy>=1.21.0 # Numerical computing and array operations
# Fast compression library
zstandard>=0.19.0 # Log compression
# Configuration parsing
PyYAML>=6.0 # YAML configuration file parsingDevelopment Dependencies (in requirements-dev.txt):
pytest>=7.0.0 # Unit testing framework
black>=22.0.0 # Code formatting
flake8>=4.0.0 # Code lintingThe AODv2 system includes several standalone tools for performance monitoring and data analysis.
Monitor CPU and memory usage over time:
# Monitor for 5 minutes (default)
python3 monitor.py
# Monitor for 10 minutes with 2-second intervals
python3 monitor.py 2 10
# Monitor for 30 minutes with 5-second intervals
python3 monitor.py 5 30Output: system_usage_YYYYMMDD_HHMMSS.csv
Monitor AOD log directory growth:
# Monitor every 5 seconds indefinitely
python3 disk_monitor.py
# Monitor every 10 seconds for 30 minutes
python3 disk_monitor.py 10 30
# Quick 5-minute test
python3 disk_monitor.py 2 5Output: disk_usage_YYYYMMDD_HHMMSS.csv
compare.py (Basic):
Generate side-by-side performance comparisons:
# Compare two system monitoring runs
python3 compare.py system_usage_baseline.csv system_usage_with_aod.csvOutput: comparison_graph.png with statistics
csv_range_compare.py (Advanced):
Compare two CSV files with range-based analysis (0-200s, 200-500s, 500-800s).
Note that these ranges can be configurable, for now its hardcoded.
The ranges you want to use will depend on the workload you are running and what kind of time ranges you want to analyze.
# Analyze both CPU and Memory
python3 csv_range_compare.py baseline.csv test.csv
# Analyze specific column
python3 csv_range_compare.py without_aod.csv with_aod.csv CPU_PercentFeatures:
- Automatic column detection: Finds CPU and Memory columns automatically
- Range-based analysis: Separate statistics for different time periods
- Visual comparisons: Generates detailed plots for each range
- Comprehensive metrics: Average, max, min, standard deviation, and percentage changes
- Flexible input: Supports various timestamp formats and column naming
Visualize disk usage growth over time:
# Plot single monitoring session
python3 disk_plot.py disk_usage_20241224_120000.csv
# Compare disk usage between two sessions
python3 disk_plot.py disk_baseline.csv disk_with_aod.csvOutput: disk_usage_plot.png or disk_usage_comparison.png
This workflow details how to accurately measure the performance impact of AODv2 on your system while it's under a specific workload. The key is to compare system behavior with and without AOD running.
Step 1: Establish a Baseline (Without AOD)
First, measure your system's performance under your chosen workload without AOD running. This gives you a baseline to compare against.
- Action: In separate terminals, start your workload and the monitoring scripts.
- Example:
# Terminal 1: Start your workload (e.g., a file transfer script, database query, etc.) ./run_my_workload.sh # Terminal 2: Monitor system CPU and Memory python3 tests/monitor.py 10 10 # --> Let's rename the output CSV to baseline_system_usage.csv # Terminal 3: Monitor disk usage (optional for baseline, but good practice) python3 tests/disk_monitor.py 10 10 # --> Let's rename the output CSV to baseline_disk_usage.csv
- Result: You will have baseline performance data, primarily
baseline_system_usage.csv.
Step 2: Measure Performance with AOD
Now, repeat the exact same test, but with the AOD service running concurrently.
- Action: In separate terminals, start the AOD service, your workload, and the monitoring scripts.
- Example:
# Terminal 1: Start the AOD service as root sudo python3 src/Controller.py # Terminal 2: Start the same workload as before ./run_my_workload.sh # Terminal 3: Monitor system CPU and Memory python3 tests/monitor.py 10 10 # --> Let's rename the output CSV to aod_system_usage.csv # Terminal 4: Monitor AOD's disk usage python3 tests/disk_monitor.py 10 10 # --> Let's rename the output CSV to aod_disk_usage.csv
- Result: You will have performance data with AOD active (
aod_system_usage.csv) and data on its disk activity (aod_disk_usage.csv).
Step 3: Compare the Results
With both sets of CSV files, you can now use the provided analysis tools to generate a clear comparison.
-
To Compare CPU and Memory Impact: Use
compare.pyor the more advancedcsv_range_compare.pywith your system usage files.# Generate a visual comparison of system performance python3 tests/compare.py baseline_system_usage.csv aod_system_usage.csvThis will create a
comparison_graph.pngshowing the performance difference. -
To Analyze Disk Usage: Use
disk_plot.pyto visualize how much disk space AOD used during the test.# Generate a plot of AOD's disk space consumption python3 tests/disk_plot.py aod_disk_usage.csvThis will create a
disk_usage_plot.png.
A robust testing strategy is essential for maintaining the quality and reliability of AOD.
The project uses pytest for unit testing individual components.
# Install testing dependencies
pip3 install pytest
# Run all unit tests
pytest tests/Currently, the project lacks a dedicated integration testing suite. This is a critical area for future improvement.
Integration tests would verify that the components work together as expected in a realistic environment. A typical test case would involve:
- Starting the
linux_diagnosticsservice. - Using a workload to trigger a specific anomaly (e.g., high latency).
- Asserting that the
AnomalyWatchercorrectly detects the anomaly. - Verifying that the
EventDispatchertriggers the configuredQuickActions. - Checking the output directory to ensure the correct diagnostic files were created and have the expected content.
The AOD system is designed to be extensible. You can add new anomaly detectors and diagnostic actions to tailor it to your specific needs.
Let's walk through the process using the error anomaly and smbiosnoop eBPF tool as a concrete example.
1.1 Obtain or Create Your eBPF Tool
First, you need an eBPF tool that will produce the data for your anomaly detection. For this example, we'll use a hypothetical smbiosnoop tool that traces SMB I/O error codes and prints them to standard output.
eBPF tools typically:
- Monitor kernel events (syscalls, network packets, file I/O, etc.)
- Filter relevant events based on criteria
- Output structured data (often line-by-line text or JSON)
1.2 Place the eBPF Binary
Place your compiled eBPF tool in the src/bin/ directory:
# Example: Copy your compiled eBPF tool
cp /path/to/your/smbiosnoop src/bin/smbiosnoop
chmod +x src/bin/smbiosnoopNaming Convention: Use descriptive names that indicate the tool's purpose (e.g., smbiosnoop).
1.3 Test the eBPF Tool Independently
Before integrating with AOD, verify your eBPF tool works correctly:
# Test the tool directly
sudo src/bin/smbiosnoop
1.4 Understand the Output Format
Document the output format of your eBPF tool. This will be crucial for writing the anomaly detection logic later.
Key Requirements: (not sure abt this)
- Tool must emit events continuously to the shared memory (/dev/shm) which the event dispatcher reads from
- Events must follow the same format as
EventClass specified in shared_data.py - Output format must be consistent and parseable
- Tool should run indefinitely until killed (like
smbsloweraoddoes)
The Controller class (src/Controller.py) manages the execution of eBPF tools. You need to register your new tool and define how to build its command line.
2.1 Register the Tool in __init__ Method
Add your tool to the tool_cmd_builders dictionary:
# src/Controller.py (in __init__ method around line 63)
def __init__(self, config_path: str):
# ...existing code...
self.tool_cmd_builders = {
"smbslower": self._get_smbsloweraod_cmd,
"smbiosnoop": self._get_smbiosnoop_cmd, # Add this line
}
# ...existing code...2.2 Create the Command Builder Function
Add a method to build the command for executing your eBPF tool. Place this method with the other command builders (around line 136):
# src/Controller.py
def _get_smbiosnoop_cmd(self) -> list[str]:
"""Build command for smbiosnoop eBPF tool."""
# Get configuration for error anomaly if it exists
error_anomaly = self.config.guardian.anomalies.get("error")
# Build the command array
ebpf_binary_path = os.path.join(os.path.dirname(__file__), "bin", "smbiosnoop")
cmd = [ebpf_binary_path]
# Add parameters based on configuration (optional)
if error_anomaly:
# Example: Add specific error codes to track
track_codes = error_anomaly.get("track_codes", [])
if track_codes:
cmd.extend(["-e", ",".join(track_codes)])
return cmd2.3 Command Builder Best Practices
- Always return a list of strings (command + arguments)
- Use
os.path.join(os.path.dirname(__file__), "bin", "tool_name")for the binary path - Extract relevant configuration from
self.config.guardian.anomalies - Handle missing configuration gracefully with sensible defaults
- Follow the pattern used by
_get_smbsloweraod_cmd()for consistency
Key Reference: Look at the existing _get_smbsloweraod_cmd() method in src/Controller.py (lines 136-151) to see how the current eBPF tool is integrated.
Anomaly handlers analyze batches of events from eBPF tools and determine if an anomaly has occurred. Each handler implements specific detection logic for a particular type of anomaly.
3.1 Create the Handler File
Create a new Python file in src/handlers/ following the naming convention {anomaly_type}_anomaly_handler.py:
# Create the new handler file
touch src/handlers/error_anomaly_handler.py3.2 Implement the Handler Class
Use this template for your handler:
# src/handlers/error_anomaly_handler.py
import logging
import numpy as np
from base.AnomalyHandlerBase import AnomalyHandler
logger = logging.getLogger(__name__)
class ErrorAnomalyHandler(AnomalyHandler):
"""Detects anomalies based on your specific criteria."""
def __init__(self, config):
super().__init__(config)
# Initialize anything you need from self.config
# e.g., thresholds, tracking parameters, etc.
def detect(self, events_batch: np.ndarray) -> bool:
"""
Analyze events_batch and return True if anomaly detected.
Args:
events_batch: NumPy structured array with parsed eBPF tool output
Returns:
bool: True if anomaly detected, False otherwise
"""
# Add your detection logic here
# Return True if anomaly detected, False otherwise
return FalseImplementation Notes:
- Configuration: Access config values via
self.config.your_field - Data Structure:
events_batchcontains parsed eBPF tool output as NumPy structured array - Performance: Keep detection logic efficient as it runs frequently. Use numpy as much as possible for performance.
To make your new anomaly handler available to the system, you need to register it in three places.
4.1 Add the Anomaly Type to the Enum
First, add your new anomaly type to the enum in src/utils/anomaly_type.py:
# src/utils/anomaly_type.py
class AnomalyType(Enum):
"""Enumeration for different types of anomalies that can be detected."""
LATENCY = "latency"
ERROR = "error" # Add this line if not already present
# Add more types as needed
ANOMALY_TYPE_TO_TOOL_ID = {
AnomalyType.LATENCY: 0,
AnomalyType.ERROR: 1, # Add this line with unique ID, find out what is the tool id for smbiosnoop in the smbiosnoop output (1 is just an example, it might be wrong)
# Add more as needed
}4.2 Import and Register in AnomalyWatcher
In src/AnomalyWatcher.py, add the import and register your handler:
# src/AnomalyWatcher.py (around lines 10-14)
from utils.anomaly_type import AnomalyType, ANOMALY_TYPE_TO_TOOL_ID
from handlers.latency_anomaly_handler import LatencyAnomalyHandler
from handlers.error_anomaly_handler import ErrorAnomalyHandler # Add this import
from base.AnomalyHandlerBase import AnomalyHandler
# ...existing code...
# Maps enum to anomaly handler classes (around lines 19-23)
ANOMALY_HANDLER_REGISTRY = {
AnomalyType.LATENCY: LatencyAnomalyHandler,
AnomalyType.ERROR: ErrorAnomalyHandler, # Add this line
# Add more types here as needed
}4.3 Verification
After making these changes, verify the registration worked:
# Check for syntax errors
python3 -m py_compile src/utils/anomaly_type.py
python3 -m py_compile src/AnomalyWatcher.py
python3 -m py_compile src/handlers/error_anomaly_handler.py
# Test that imports work correctly
python3 -c "from src.handlers.error_anomaly_handler import ErrorAnomalyHandler; print('Handler import successful')"Registration Pattern: The system uses this three-tier registration:
- Enum: Defines the anomaly type constant
- Tool ID: Maps anomaly types to unique integer IDs (used internally)
- Registry: Maps enum values to handler classes for instantiation
The configuration defines how your new anomaly type behaves, what tools it uses, and what actions it triggers. The example error anomaly configuration is already present in config/config.yaml.
5.1 Review Existing Configuration
Check the current error anomaly configuration in config/config.yaml (lines 13-28):
# config/config.yaml
guardian:
anomalies:
error:
type: "Error" # Must match the enum value (ERROR = "error")(case-insensitive)
tool: "smbiosnoop" # Must match key in Controller.tool_cmd_builders
mode: "trackonly" # Options: all, trackonly, excludeonly
acceptable_count: 10 # Number of tracked errors to trigger anomaly
track_codes: # Error codes to track
- EACCES # Access denied
- EAGAIN # Resource temporarily unavailable
- EIO # I/O error
actions: # Quick actions to trigger on anomaly
- dmesg
- journalctl5.2 Configuration Fields Explained
Required Fields (Mandatory):
type: Must match your AnomalyType enum value (case-insensitive) (covered in Step 4.1)tool: Must match a key in Controller's init fileself.tool_cmd_buildersdictionary (covered in Step 2.1)actions: List of QuickActions to execute when anomaly detected (see "Adding a New Quick Action" section)
Optional Fields:
mode: Filtering mode for the eBPF tool (all,trackonly,excludeonly) - defaults to "all"acceptable_count: Threshold for triggering anomaly detection - defaults vary by handler- Custom fields: Handler-specific configuration (e.g.,
track_codesfor error anomaly)
Minimal Configuration Example:
guardian:
anomalies:
your_anomaly:
type: "YourType" # Required: matches AnomalyType enum
tool: "your_ebpf_tool" # Required: matches Controller tool key
actions: # Required: at least one action
- dmesg5.3 Customize for Your Use Case
Modify the configuration to match your specific requirements:
# Example: Custom error tracking configuration
guardian:
anomalies:
error:
type: "Error"
tool: "smbiosnoop" # Your eBPF tool name
mode: "trackonly"
acceptable_count: 5 # Lower threshold for more sensitive detection
track_codes: # Add/remove error codes as needed
- EACCES
- EIO
- ENOSPC # No space left on device
- ETIMEDOUT # Connection timed out
actions:
- dmesg # Kernel messages
- journalctl # System logs
- debugdata # Custom diagnostic data5.4 Configuration Validation
Test your configuration syntax and integration:
# Validate YAML syntax
python3 -c "import yaml; yaml.safe_load(open('config/config.yaml'))"
# Test that AOD can parse the configuration
python3 -c "from src.ConfigManager import ConfigManager; ConfigManager('config/config.yaml')"
# Test configuration loading for your specific anomaly
python3 -c "
from src.ConfigManager import ConfigManager
config = ConfigManager('config/config.yaml')
print('Your anomaly config:', config.guardian.anomalies.get('your_anomaly_name'))
"Key Integration Points: The configuration values are accessed in your handler via direct attributes on self.config:
self.config.type- Anomaly type stringself.config.tool- eBPF tool nameself.config.acceptable_count- Threshold for triggering anomalyself.config.track- Dictionary of tracked items (varies by anomaly type)self.config.actions- List of QuickActions to executeself.config.default_threshold_ms- Optional default threshold (may be None)
Example: In latency_anomaly_handler.py, the config is accessed as:
self.acceptable_count = self.config.acceptable_count
for smb_cmd_id, threshold in self.config.track.items():
# Process tracked commands and thresholdsNote: For the error anomaly example, ConfigManager integration is already complete. Skip this step if using the error example.
If you're adding a completely new anomaly type (not error), you need to add ConfigManager support to parse and validate your custom configuration fields.
6.1 Add to Configuration Dispatch
Add your anomaly type to the dispatch dictionary in _get_track_for_anomaly method:
# src/ConfigManager.py (around lines 79-85)
dispatch = {
AnomalyType.LATENCY: self._get_latency_track_cmds,
AnomalyType.ERROR: self._get_error_track_cmds,
AnomalyType.YOUR_TYPE: self._get_your_type_track_cmds, # Add this line
# Add more types here as needed
}6.2 Implement Track Commands Method
Create a method to parse your anomaly's configuration. Study _get_latency_track_cmds and _get_error_track_cmds in ConfigManager for reference:
# src/ConfigManager.py
def _get_your_type_track_cmds(self, anomaly):
"""Parse and validate your anomaly type tracking commands from config."""
# 1. Extract configuration fields from YAML
track_items = anomaly.get("track_items", [])
exclude_items = anomaly.get("exclude_items", [])
mode = anomaly.get("mode", "all")
# 2. Normalize track/exclude lists based on mode
track_items, exclude_items = self._normalize_track_and_exclude(
mode, track_items, exclude_items, "your_anomaly_type"
)
# 3. Validate items against your predefined list
all_valid_items = ["item1", "item2", "item3"] # Your predefined items, can specify in shared_data.py and extract from there also
self._validate_cmds(all_valid_items, track_items, exclude_items)
# can use or create other validation functions like _validate_smb_thresholds if needed
# 4. Build and return the tracking configuration
# This becomes self.config.track in your handler
return self._build_your_item_map(mode, track_items, exclude_items)
# look at helpers to see if you can reuse any of them for your anomaly type6.3 Available Helper Functions
The ConfigManager provides several pre-existing helper functions you can reuse:
Validation Functions:
# Check items exist in predefined list, no duplicates, no overlap
self._validate_cmds(all_items, track_items, exclude_items)
# Check individual list for validity and duplicates
self._check_codes(items, all_valid_items, "item type")
# Validate numerical thresholds (for latency-like patterns)
self._validate_smb_thresholds(track_commands) # Checks threshold >= 0Mode-Based Filtering:
# Handle trackonly/excludeonly/all modes with warnings
track_items, exclude_items = self._normalize_track_and_exclude(
mode, track_items, exclude_items, "your_anomaly_type"
)
# Build simple index mapping based on mode (like error codes)
return self._get_track_codes(mode, ALL_YOUR_ITEMS, track_items, exclude_items)6.4 Implementation Patterns
Choose the pattern that best fits your anomaly type (or create a new one):
Pattern A: Simple Item List (like error codes):
def _get_your_type_track_cmds(self, anomaly):
"""For simple list-based tracking (like error codes)."""
track_codes = anomaly.get("track_codes", [])
exclude_codes = anomaly.get("exclude_codes", [])
mode = anomaly.get("mode", "all")
# Normalize and validate
track_codes, exclude_codes = self._normalize_track_and_exclude(
mode, track_codes, exclude_codes, "your_type"
)
self._validate_cmds(ALL_YOUR_CODES, track_codes, exclude_codes)
# Use existing helper for index mapping
return self._get_track_codes(mode, ALL_YOUR_CODES, track_codes, exclude_codes)Pattern B: Complex Threshold Mapping (like latency commands):
def _get_your_type_track_cmds(self, anomaly):
"""For complex items with thresholds/parameters."""
track_commands = anomaly.get("track_commands", [])
exclude_commands = anomaly.get("exclude_commands", [])
mode = anomaly.get("mode", "all")
default_threshold = anomaly.get("default_threshold", 100)
# Normalize and validate
track_commands, exclude_commands = self._normalize_track_and_exclude(
mode, track_commands, exclude_commands, "your_type"
)
# Custom validation for your command structure
self._validate_your_commands(track_commands, exclude_commands)
# eg. self._validate_smb_thresholds(track_commands, exclude_commands)
# Build custom mapping
return self._build_your_command_map(mode, track_commands, exclude_commands, default_threshold)6.5 Real Implementation Examples
Here's how the existing patterns work:
Error Anomaly (Simple pattern):
def _get_error_track_cmds(self, anomaly):
track_codes = anomaly.get("track_codes", [])
exclude_codes = anomaly.get("exclude_codes", [])
error_mode = anomaly.get("mode", "all")
# Uses ALL_ERROR_CODES = list(errno.errorcode.values())
track_codes, exclude_codes = self._normalize_track_and_exclude(
error_mode, track_codes, exclude_codes, "error"
)
self._validate_cmds(list(ALL_ERROR_CODES), track_codes, exclude_codes)
# Returns: {error_index: None, ...}
return self._get_track_codes(error_mode, ALL_ERROR_CODES, track_codes, exclude_codes)Latency Anomaly (Complex pattern):
def _get_latency_track_cmds(self, anomaly):
track_commands = anomaly.get("track_commands", [])
exclude_commands = anomaly.get("exclude_commands", [])
latency_mode = anomaly.get("mode", "all")
default_threshold = anomaly.get("default_threshold_ms", 10)
# Custom validation for SMB commands
self._validate_smb_commands(track_commands, exclude_commands)
# Returns: {cmd_id: threshold_ms, ...}
return self._build_latency_command_map(
latency_mode, track_commands, exclude_commands, default_threshold
)6.6 Helper Function Details
_normalize_track_and_exclude(mode, track_items, exclude_items, type_name):
- Handles
trackonly,excludeonly,allmodes - Issues warnings and clears irrelevant lists
- Returns normalized
(track_items, exclude_items)tuple
_validate_cmds(all_items, track_items, exclude_items):
- Validates items exist in
all_itemslist - Checks for duplicates within each list
- Ensures no overlap between track and exclude lists
_check_codes(codes, all_codes, code_type):
- Validates individual list against predefined values
- Warns about duplicates within the list
- Raises ValueError for invalid codes
_get_track_codes(mode, all_codes, track_codes, exclude_codes):
- Builds simple
{index: None}mapping based on mode - Uses
ALL_ERROR_CODES.index(code)for indexing - Handles trackonly/excludeonly/all logic automatically
_validate_smb_thresholds(track_commands):
- Validates threshold values are numbers >= 0
- Used for latency-style configurations with thresholds
The returned value becomes self.config.track in your anomaly handler.
Quick Actions are simple, standalone diagnostic commands that are triggered when any anomaly is detected.
Create a new file in src/handlers/, for example, LsofQuickAction.py:
# src/handlers/LsofQuickAction.py
import subprocess
from pathlib import Path
from base.QuickAction import QuickAction
class LsofQuickAction(QuickAction):
"""Captures network connections using lsof command."""
def __init__(self, batches_root: str, params=None):
"""Initialize the QuickAction.
Args:
batches_root (str): Root directory for log batches.
params: Optional parameters for the action.
"""
super().__init__(batches_root, "lsof.log")
# Add any self.params if needed
def get_command(self) -> tuple:
"""Return command to execute.
Returns:
tuple: (command_array, execution_type)
execution_type: "cmd" for subprocess, "cat" for simple file operation
"""
return ["lsof", "-i"], "cmd"Import and register your QuickAction in src/LogCollector.py:
# src/LogCollector.py
# ...existing imports...
from handlers.LsofQuickAction import LsofQuickAction
class LogCollector:
def __init__(self, controller):
# ...existing code...
self.action_factory = {
"journalctl": lambda: JournalctlQuickAction(self.aod_output_dir, self.anomaly_interval),
# ...existing actions...
"syslogs": lambda: SysLogsQuickAction(self.aod_output_dir, num_lines=100),
"lsof": lambda: LsofQuickAction(self.aod_output_dir, params), # Add this line
}
# ...existing code...Add your new action to the watcher: actions: list in config/config.yaml:
# config/config.yaml
watcher:
actions:
- dmesg
- journalctl
- lsof # Add your new action here