From 976dcc08a0a55c6c429b3792e1925eabaf8ad6d6 Mon Sep 17 00:00:00 2001 From: Felix Delattre Date: Fri, 28 Nov 2025 13:27:58 +0100 Subject: [PATCH 1/4] Added load testing script structure skeleton. --- eoapi-cli | 8 +++ scripts/load.sh | 168 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 176 insertions(+) create mode 100755 scripts/load.sh diff --git a/eoapi-cli b/eoapi-cli index 3c91115e..c53614e1 100755 --- a/eoapi-cli +++ b/eoapi-cli @@ -16,6 +16,7 @@ readonly COMMANDS=( "cluster" "deployment" "test" + "load" "ingest" "docs" ) @@ -38,6 +39,7 @@ COMMANDS: cluster Manage local Kubernetes clusters for development deployment Deploy and manage eoAPI instances test Run tests (helm, integration, autoscaling) + load Run load testing scenarios ingest Load sample data into eoAPI services docs Generate and serve documentation @@ -59,6 +61,9 @@ EXAMPLES: # Run autoscaling tests only eoapi-cli test autoscaling + # Run load tests + eoapi-cli load all + # Ingest sample data eoapi-cli ingest sample-data @@ -99,6 +104,9 @@ get_command_script() { test) echo "${SCRIPTS_DIR}/test.sh" ;; + load) + echo "${SCRIPTS_DIR}/load.sh" + ;; ingest) echo "${SCRIPTS_DIR}/ingest.sh" ;; diff --git a/scripts/load.sh b/scripts/load.sh new file mode 100755 index 00000000..21066529 --- /dev/null +++ b/scripts/load.sh @@ -0,0 +1,168 @@ +#!/usr/bin/env bash + +# eoAPI Scripts - Load Testing Management +# Run various load testing scenarios for eoAPI + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +source "${SCRIPT_DIR}/lib/common.sh" + +NAMESPACE="${NAMESPACE:-eoapi}" +RELEASE_NAME="${RELEASE_NAME:-eoapi}" + +show_help() { + cat < [ARGS] + +COMMANDS: + baseline Low load, verify monitoring works + services Test each service individually + mixed Realistic scenario + stress Find breaking points + soak Long-running stability + chaos Kill pods during load, test resilience + all Run all load tests + +OPTIONS: + -h, --help Show this help message + -d, --debug Enable debug mode + -n, --namespace Set Kubernetes namespace + --release NAME Helm release name (default: ${RELEASE_NAME}) + +EXAMPLES: + # Run baseline load test + $(basename "$0") baseline + + # Test individual services + $(basename "$0") services --debug + + # Run all load tests + $(basename "$0") all +EOF +} + +load_baseline() { + log_info "Running baseline load test..." + # TODO: Implement baseline load testing +} + +load_services() { + log_info "Running service-specific load tests..." + # TODO: Implement individual service testing +} + +load_mixed() { + log_info "Running mixed load test scenario..." + # TODO: Implement realistic mixed scenario +} + +load_stress() { + log_info "Running stress test to find breaking points..." + # TODO: Implement stress testing +} + +load_soak() { + log_info "Running soak test for stability..." + # TODO: Implement long-running stability test +} + +load_chaos() { + log_info "Running chaos testing with pod failures..." + # TODO: Implement chaos testing +} + +load_all() { + local failed=0 + + log_info "Running all load tests..." + + load_baseline || ((failed++)) + load_services || ((failed++)) + load_mixed || ((failed++)) + load_stress || ((failed++)) + load_soak || ((failed++)) + load_chaos || ((failed++)) + + if [[ $failed -eq 0 ]]; then + log_success "All load tests passed" + return 0 + else + log_error "$failed load test suites failed" + return 1 + fi +} + +main() { + local command="" + + # Parse options + while [[ $# -gt 0 ]]; do + case $1 in + -h|--help) + show_help + exit 0 + ;; + -d|--debug) + export DEBUG_MODE=true + shift + ;; + -n|--namespace) + NAMESPACE="$2" + shift 2 + ;; + --release) + RELEASE_NAME="$2" + shift 2 + ;; + baseline|services|mixed|stress|soak|chaos|all) + command="$1" + shift + break + ;; + *) + log_error "Unknown option: $1" + show_help + exit 1 + ;; + esac + done + + [[ -z "$command" ]] && command="all" + + case "$command" in + baseline) + load_baseline + ;; + services) + load_services + ;; + mixed) + load_mixed + ;; + stress) + load_stress + ;; + soak) + load_soak + ;; + chaos) + load_chaos + ;; + all) + load_all + ;; + *) + log_error "Unknown command: $command" + exit 1 + ;; + esac +} + +if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then + main "$@" +fi From b963ad07f6e648450845a31262ebd7e22c748b45 Mon Sep 17 00:00:00 2001 From: Felix Delattre Date: Fri, 28 Nov 2025 13:58:17 +0100 Subject: [PATCH 2/4] Baseline and autoscaling load tests. --- .github/workflows/ci.yml | 3 - eoapi-cli | 6 +- scripts/load.sh | 155 ++++++++++++++++++++++++++++++++++++++- scripts/test.sh | 18 +---- 4 files changed, 158 insertions(+), 24 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0dda922d..cc3ce1e8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -68,9 +68,6 @@ jobs: - name: Run notification tests run: ./eoapi-cli test notification - - name: Run autoscaling tests - run: ./eoapi-cli test autoscaling - - name: Debug failed deployment if: failure() run: ./eoapi-cli deployment debug diff --git a/eoapi-cli b/eoapi-cli index c53614e1..5d1ac594 100755 --- a/eoapi-cli +++ b/eoapi-cli @@ -58,12 +58,12 @@ EXAMPLES: # Run integration tests only eoapi-cli test integration - # Run autoscaling tests only - eoapi-cli test autoscaling - # Run load tests eoapi-cli load all + # Run autoscaling load tests only + eoapi-cli load autoscaling + # Ingest sample data eoapi-cli ingest sample-data diff --git a/scripts/load.sh b/scripts/load.sh index 21066529..3952dabb 100755 --- a/scripts/load.sh +++ b/scripts/load.sh @@ -22,6 +22,7 @@ USAGE: COMMANDS: baseline Low load, verify monitoring works services Test each service individually + autoscaling Test HPA scaling under load mixed Realistic scenario stress Find breaking points soak Long-running stability @@ -41,14 +42,97 @@ EXAMPLES: # Test individual services $(basename "$0") services --debug + # Test autoscaling behavior + $(basename "$0") autoscaling --debug + # Run all load tests $(basename "$0") all EOF } +get_base_url() { + # Try localhost first (most common in local dev) + if curl -s -f -m 3 "http://localhost/stac" >/dev/null 2>&1; then + echo "http://localhost" + return 0 + fi + + # Try ingress if configured + local host + host=$(kubectl get ingress -n "$NAMESPACE" -o jsonpath='{.items[0].spec.rules[0].host}' 2>/dev/null || echo "") + if [[ -n "$host" ]] && curl -s -f -m 3 "http://$host/stac" >/dev/null 2>&1; then + echo "http://$host" + return 0 + fi + + return 1 +} + +test_endpoint() { + local url="$1" + local duration="${2:-30}" + local concurrency="${3:-2}" + + if ! command_exists hey; then + log_error "hey not found. Install with: go install github.com/rakyll/hey@latest" + return 1 + fi + + log_info "Testing $url (${duration}s, ${concurrency}c)" + hey -z "${duration}s" -c "$concurrency" "$url" 2>/dev/null | grep -E "(Total:|Requests/sec:|Average:|Status code)" +} + +monitor_during_test() { + local duration="$1" + log_info "Monitor with: watch kubectl get pods -n $NAMESPACE" + sleep "$duration" & + local sleep_pid=$! + + # Show initial state + kubectl get hpa -n "$NAMESPACE" 2>/dev/null | head -2 || true + + wait $sleep_pid +} + load_baseline() { log_info "Running baseline load test..." - # TODO: Implement baseline load testing + + validate_cluster || return 1 + validate_namespace "$NAMESPACE" || return 1 + + local base_url + if ! base_url=$(get_base_url); then + log_error "Cannot reach eoAPI endpoints" + return 1 + fi + log_info "Using base URL: $base_url" + + # Wait for deployments + for service in stac raster vector; do + kubectl wait --for=condition=Available deployment/"${RELEASE_NAME}-${service}" -n "$NAMESPACE" --timeout=60s 2>/dev/null || \ + log_warn "Service $service may not be ready" + done + + log_info "Running light load tests..." + log_info "Monitor pods: kubectl get pods -n $NAMESPACE -w" + + # STAC collections (30s, 2 concurrent) + test_endpoint "$base_url/stac/collections" & + monitor_during_test 30 + wait + + # STAC search (60s, 3 concurrent) + if command_exists curl && command_exists hey; then + log_info "Testing STAC search (60s, 3c)" + hey -z 60s -c 3 -m POST -H "Content-Type: application/json" -d '{"limit":10}' "$base_url/stac/search" 2>/dev/null | \ + grep -E "(Total:|Requests/sec:|Average:|Status code)" + fi + + # Health checks + test_endpoint "$base_url/raster/healthz" + test_endpoint "$base_url/vector/healthz" + + log_success "Baseline load test completed" } load_services() { @@ -56,6 +140,69 @@ load_services() { # TODO: Implement individual service testing } +load_autoscaling() { + log_info "Running autoscaling tests..." + + validate_cluster || return 1 + validate_namespace "$NAMESPACE" || return 1 + + # Check HPA exists + if ! kubectl get hpa -n "$NAMESPACE" >/dev/null 2>&1 || [[ $(kubectl get hpa -n "$NAMESPACE" --no-headers 2>/dev/null | wc -l) -eq 0 ]]; then + log_error "No HPA resources found. Deploy with autoscaling enabled." + return 1 + fi + + # Check metrics server + if ! kubectl get deployment -A | grep -q metrics-server; then + log_error "metrics-server required for autoscaling tests" + return 1 + fi + + local base_url + if ! base_url=$(get_base_url); then + log_error "Cannot reach eoAPI endpoints" + return 1 + fi + log_info "Using base URL: $base_url" + + # Wait for services + for service in stac raster vector; do + kubectl wait --for=condition=Available deployment/"${RELEASE_NAME}-${service}" -n "$NAMESPACE" --timeout=90s || return 1 + done + + log_info "Current HPA status:" + kubectl get hpa -n "$NAMESPACE" + + log_info "Generating sustained load to trigger autoscaling..." + + # Generate load that should trigger HPA (10 min, 15 concurrent) + if command_exists hey; then + log_info "Starting sustained load test (10 minutes)..." + hey -z 600s -c 15 "$base_url/stac/search" -m POST \ + -H "Content-Type: application/json" -d '{"limit":100}' & + local load_pid=$! + + # Monitor HPA changes every 30s + log_info "Monitoring HPA scaling..." + for i in {1..20}; do + sleep 30 + log_info "HPA status after ${i}x30s:" + kubectl get hpa -n "$NAMESPACE" --no-headers | awk '{print $1 ": " $6 "/" $7 " replicas, CPU: " $3}' + done + + # Stop load test + kill $load_pid 2>/dev/null || true + wait $load_pid 2>/dev/null || true + + log_info "Final HPA status:" + kubectl get hpa -n "$NAMESPACE" + log_success "Autoscaling test completed" + else + log_error "hey required for autoscaling tests" + return 1 + fi +} + load_mixed() { log_info "Running mixed load test scenario..." # TODO: Implement realistic mixed scenario @@ -83,6 +230,7 @@ load_all() { load_baseline || ((failed++)) load_services || ((failed++)) + load_autoscaling || ((failed++)) load_mixed || ((failed++)) load_stress || ((failed++)) load_soak || ((failed++)) @@ -119,7 +267,7 @@ main() { RELEASE_NAME="$2" shift 2 ;; - baseline|services|mixed|stress|soak|chaos|all) + baseline|services|autoscaling|mixed|stress|soak|chaos|all) command="$1" shift break @@ -141,6 +289,9 @@ main() { services) load_services ;; + autoscaling) + load_autoscaling + ;; mixed) load_mixed ;; diff --git a/scripts/test.sh b/scripts/test.sh index 8fb3e10e..c5a973cd 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -27,7 +27,6 @@ COMMANDS: unit Run Helm unit tests integration Run integration tests with pytest notification Run notification tests with database access - autoscaling Run autoscaling tests with pytest all Run all tests OPTIONS: @@ -50,9 +49,6 @@ EXAMPLES: # Run integration tests with debug $(basename "$0") integration --debug - # Run autoscaling tests with debug - $(basename "$0") autoscaling --debug - # Run all tests $(basename "$0") all EOF @@ -123,13 +119,7 @@ test_integration() { "${SCRIPT_DIR}/test/integration.sh" "$pytest_args" } -test_autoscaling() { - local pytest_args="${1:-}" - export NAMESPACE="$NAMESPACE" - export RELEASE_NAME="$RELEASE_NAME" - export DEBUG_MODE="$DEBUG_MODE" - "${SCRIPT_DIR}/test/autoscaling.sh" "$pytest_args" -} + test_notification() { local pytest_args="${1:-}" @@ -150,7 +140,6 @@ test_all() { if validate_cluster 2>/dev/null; then test_integration || ((failed++)) - test_autoscaling || ((failed++)) test_notification || ((failed++)) else log_warn "Skipping integration tests - no cluster connection" @@ -192,7 +181,7 @@ main() { pytest_args="$2" shift 2 ;; - schema|lint|unit|notification|integration|autoscaling|all) + schema|lint|unit|notification|integration|all) command="$1" shift break @@ -223,9 +212,6 @@ main() { notification) test_notification "$pytest_args" ;; - autoscaling) - test_autoscaling "$pytest_args" - ;; all) test_all ;; From f0c4b03f65c63aef1ecaf0358119e9087c514191 Mon Sep 17 00:00:00 2001 From: Felix Delattre Date: Fri, 28 Nov 2025 14:24:45 +0100 Subject: [PATCH 3/4] Changed structure slightly. --- scripts/load.sh | 23 ++++++----------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/scripts/load.sh b/scripts/load.sh index 3952dabb..a9706142 100755 --- a/scripts/load.sh +++ b/scripts/load.sh @@ -21,11 +21,9 @@ USAGE: COMMANDS: baseline Low load, verify monitoring works - services Test each service individually autoscaling Test HPA scaling under load - mixed Realistic scenario + normal Realistic scenario stress Find breaking points - soak Long-running stability chaos Kill pods during load, test resilience all Run all load tests @@ -203,8 +201,8 @@ load_autoscaling() { fi } -load_mixed() { - log_info "Running mixed load test scenario..." +load_normal() { + log_info "Running normal load test scenario..." # TODO: Implement realistic mixed scenario } @@ -213,11 +211,6 @@ load_stress() { # TODO: Implement stress testing } -load_soak() { - log_info "Running soak test for stability..." - # TODO: Implement long-running stability test -} - load_chaos() { log_info "Running chaos testing with pod failures..." # TODO: Implement chaos testing @@ -231,9 +224,8 @@ load_all() { load_baseline || ((failed++)) load_services || ((failed++)) load_autoscaling || ((failed++)) - load_mixed || ((failed++)) + load_normal || ((failed++)) load_stress || ((failed++)) - load_soak || ((failed++)) load_chaos || ((failed++)) if [[ $failed -eq 0 ]]; then @@ -267,7 +259,7 @@ main() { RELEASE_NAME="$2" shift 2 ;; - baseline|services|autoscaling|mixed|stress|soak|chaos|all) + baseline|services|autoscaling|normal|stress|chaos|all) command="$1" shift break @@ -292,15 +284,12 @@ main() { autoscaling) load_autoscaling ;; - mixed) + normal) load_mixed ;; stress) load_stress ;; - soak) - load_soak - ;; chaos) load_chaos ;; From 04e7a753c85e0d3ab6000f7b6a48322f57382ce4 Mon Sep 17 00:00:00 2001 From: Felix Delattre Date: Fri, 28 Nov 2025 14:55:25 +0100 Subject: [PATCH 4/4] Added normal, stress and chaos test. --- CHANGELOG.md | 1 + scripts/lib/common.sh | 101 ++++++++- scripts/load.sh | 204 +++++++++++------- scripts/test/autoscaling.sh | 163 --------------- scripts/test/integration.sh | 9 +- tests/load/README.md | 227 ++++++++++++++++++++ tests/load/load_tester.py | 405 ++++++++++++++++++++++++++++++++++++ tests/load/test_chaos.py | 260 +++++++++++++++++++++++ tests/load/test_load.py | 160 ++++++++++++++ tests/load/test_normal.py | 220 ++++++++++++++++++++ tests/load/test_stress.py | 180 ++++++++++++++++ tests/requirements.txt | 1 + 12 files changed, 1683 insertions(+), 248 deletions(-) delete mode 100755 scripts/test/autoscaling.sh create mode 100644 tests/load/README.md create mode 100644 tests/load/load_tester.py create mode 100644 tests/load/test_chaos.py create mode 100644 tests/load/test_load.py create mode 100644 tests/load/test_normal.py create mode 100644 tests/load/test_stress.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 020be593..9247986c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added documentation for cloud-native bucket access [#364](https://github.com/developmentseed/eoapi-k8s/pull/364) - Removed unused testing variable and unused deploy script [#369](https://github.com/developmentseed/eoapi-k8s/pull/369) +- Added load testing scripts [#373](https://github.com/developmentseed/eoapi-k8s/pull/373) ### Added diff --git a/scripts/lib/common.sh b/scripts/lib/common.sh index 1e4e5a19..95cc12ee 100755 --- a/scripts/lib/common.sh +++ b/scripts/lib/common.sh @@ -270,11 +270,108 @@ cleanup_on_exit() { trap cleanup_on_exit EXIT +get_base_url() { + local namespace="${1:-eoapi}" + + # Try localhost first (most common in local dev) + if curl -s -f -m 3 "http://localhost/stac" >/dev/null 2>&1; then + echo "http://localhost" + return 0 + fi + + # Try ingress if configured + local host + host=$(kubectl get ingress -n "$namespace" -o jsonpath='{.items[0].spec.rules[0].host}' 2>/dev/null || echo "") + if [[ -n "$host" ]] && curl -s -f -m 3 "http://$host/stac" >/dev/null 2>&1; then + echo "http://$host" + return 0 + fi + + return 1 +} + # Export functions +validate_autoscaling_environment() { + local namespace="$1" + + validate_cluster || return 1 + validate_namespace "$namespace" || return 1 + + # Check HPA exists + if ! kubectl get hpa -n "$namespace" >/dev/null 2>&1 || [[ $(kubectl get hpa -n "$namespace" --no-headers 2>/dev/null | wc -l) -eq 0 ]]; then + log_error "No HPA resources found. Deploy with autoscaling enabled." + return 1 + fi + + # Check metrics server + if ! kubectl get deployment -A | grep -q metrics-server; then + log_error "metrics-server required for autoscaling tests" + return 1 + fi + + return 0 +} + export -f log_info log_success log_warn log_error log_debug export -f command_exists validate_tools check_requirements validate_cluster -export -f is_ci validate_namespace +export -f is_ci validate_namespace get_base_url export -f detect_release_name detect_namespace -export -f wait_for_pods validate_eoapi_deployment +export -f wait_for_pods validate_eoapi_deployment validate_autoscaling_environment export -f preflight_deploy preflight_ingest preflight_test +# Python dependency management +validate_python_environment() { + if ! command_exists python3; then + log_error "python3 is required but not found" + log_info "Install python3 to continue" + return 1 + fi + + log_debug "Python3 environment validated" + return 0 +} + +install_python_requirements() { + local requirements_file="$1" + local project_root="${2:-}" + + # Resolve the full path to requirements file + local full_path="$requirements_file" + if [[ -n "$project_root" ]]; then + full_path="$project_root/$requirements_file" + fi + + if [[ ! -f "$full_path" ]]; then + log_error "Requirements file not found: $full_path" + return 1 + fi + + log_info "Installing Python test dependencies from $requirements_file..." + + if python3 -m pip install --user -r "$full_path" >/dev/null 2>&1; then + log_debug "Python requirements installed successfully" + return 0 + else + log_warn "Could not install test dependencies automatically" + log_info "Try manually: pip install -r $requirements_file" + return 1 + fi +} + +validate_python_with_requirements() { + local requirements_file="${1:-}" + local project_root="${2:-}" + + validate_python_environment || return 1 + + if [[ -n "$requirements_file" ]]; then + install_python_requirements "$requirements_file" "$project_root" || { + log_warn "Python requirements installation failed, but continuing..." + return 0 # Don't fail the entire operation + } + fi + + return 0 +} + +export -f validate_python_environment install_python_requirements validate_python_with_requirements export -f show_standard_options diff --git a/scripts/load.sh b/scripts/load.sh index a9706142..26026f9a 100755 --- a/scripts/load.sh +++ b/scripts/load.sh @@ -21,7 +21,7 @@ USAGE: COMMANDS: baseline Low load, verify monitoring works - autoscaling Test HPA scaling under load + autoscaling Delegate to autoscaling.sh for HPA tests normal Realistic scenario stress Find breaking points chaos Kill pods during load, test resilience @@ -37,33 +37,33 @@ EXAMPLES: # Run baseline load test $(basename "$0") baseline - # Test individual services - $(basename "$0") services --debug - # Test autoscaling behavior $(basename "$0") autoscaling --debug + # Find breaking points + $(basename "$0") stress --debug + # Run all load tests $(basename "$0") all EOF } -get_base_url() { - # Try localhost first (most common in local dev) - if curl -s -f -m 3 "http://localhost/stac" >/dev/null 2>&1; then - echo "http://localhost" - return 0 - fi - # Try ingress if configured - local host - host=$(kubectl get ingress -n "$NAMESPACE" -o jsonpath='{.items[0].spec.rules[0].host}' 2>/dev/null || echo "") - if [[ -n "$host" ]] && curl -s -f -m 3 "http://$host/stac" >/dev/null 2>&1; then - echo "http://$host" - return 0 - fi +wait_for_services() { + local base_url="$1" + + # Wait for deployments to be available + for service in stac raster vector; do + kubectl wait --for=condition=Available deployment/"${RELEASE_NAME}-${service}" -n "$NAMESPACE" --timeout=60s 2>/dev/null || \ + log_warn "Service $service may not be ready" + done - return 1 + # Test basic connectivity + for endpoint in "$base_url/stac" "$base_url/raster/healthz" "$base_url/vector/healthz"; do + if ! curl -s -f -m 5 "$endpoint" >/dev/null 2>&1; then + log_warn "Endpoint not responding: $endpoint" + fi + done } test_endpoint() { @@ -99,17 +99,13 @@ load_baseline() { validate_namespace "$NAMESPACE" || return 1 local base_url - if ! base_url=$(get_base_url); then + if ! base_url=$(get_base_url "$NAMESPACE"); then log_error "Cannot reach eoAPI endpoints" return 1 fi log_info "Using base URL: $base_url" - # Wait for deployments - for service in stac raster vector; do - kubectl wait --for=condition=Available deployment/"${RELEASE_NAME}-${service}" -n "$NAMESPACE" --timeout=60s 2>/dev/null || \ - log_warn "Service $service may not be ready" - done + wait_for_services "$base_url" log_info "Running light load tests..." log_info "Monitor pods: kubectl get pods -n $NAMESPACE -w" @@ -141,79 +137,135 @@ load_services() { load_autoscaling() { log_info "Running autoscaling tests..." - validate_cluster || return 1 - validate_namespace "$NAMESPACE" || return 1 + validate_autoscaling_environment "$NAMESPACE" || return 1 - # Check HPA exists - if ! kubectl get hpa -n "$NAMESPACE" >/dev/null 2>&1 || [[ $(kubectl get hpa -n "$NAMESPACE" --no-headers 2>/dev/null | wc -l) -eq 0 ]]; then - log_error "No HPA resources found. Deploy with autoscaling enabled." - return 1 - fi + validate_python_with_requirements "tests/requirements.txt" "${SCRIPT_DIR}/.." || return 1 - # Check metrics server - if ! kubectl get deployment -A | grep -q metrics-server; then - log_error "metrics-server required for autoscaling tests" + # Wait for deployments + for service in stac raster vector; do + kubectl wait --for=condition=Available deployment/"${RELEASE_NAME}-${service}" -n "$NAMESPACE" --timeout=90s || return 1 + done + + # Get ingress host + local ingress_host + ingress_host=$(kubectl get ingress -n "$NAMESPACE" -o jsonpath='{.items[0].spec.rules[0].host}' 2>/dev/null || echo "localhost") + + # Set environment for Python tests + export STAC_ENDPOINT="http://$ingress_host/stac" + export RASTER_ENDPOINT="http://$ingress_host/raster" + export VECTOR_ENDPOINT="http://$ingress_host/vector" + + log_info "Running Python autoscaling tests..." + cd "${SCRIPT_DIR}/.." + + local cmd="python3 -m pytest tests/autoscaling" + [[ "$DEBUG_MODE" == "true" ]] && cmd="$cmd -v --tb=short" + + if eval "$cmd"; then + log_success "Autoscaling tests passed" + else + log_error "Autoscaling tests failed" return 1 fi +} + +load_normal() { + log_info "Running normal load test scenario..." + + validate_cluster || return 1 + validate_namespace "$NAMESPACE" || return 1 + validate_python_with_requirements "tests/requirements.txt" "${SCRIPT_DIR}/.." || return 1 local base_url - if ! base_url=$(get_base_url); then + if ! base_url=$(get_base_url "$NAMESPACE"); then log_error "Cannot reach eoAPI endpoints" return 1 fi - log_info "Using base URL: $base_url" - # Wait for services - for service in stac raster vector; do - kubectl wait --for=condition=Available deployment/"${RELEASE_NAME}-${service}" -n "$NAMESPACE" --timeout=90s || return 1 - done + wait_for_services "$base_url" + + log_info "Running Python normal load test..." + cd "${SCRIPT_DIR}/.." - log_info "Current HPA status:" - kubectl get hpa -n "$NAMESPACE" - - log_info "Generating sustained load to trigger autoscaling..." - - # Generate load that should trigger HPA (10 min, 15 concurrent) - if command_exists hey; then - log_info "Starting sustained load test (10 minutes)..." - hey -z 600s -c 15 "$base_url/stac/search" -m POST \ - -H "Content-Type: application/json" -d '{"limit":100}' & - local load_pid=$! - - # Monitor HPA changes every 30s - log_info "Monitoring HPA scaling..." - for i in {1..20}; do - sleep 30 - log_info "HPA status after ${i}x30s:" - kubectl get hpa -n "$NAMESPACE" --no-headers | awk '{print $1 ": " $6 "/" $7 " replicas, CPU: " $3}' - done - - # Stop load test - kill $load_pid 2>/dev/null || true - wait $load_pid 2>/dev/null || true - - log_info "Final HPA status:" - kubectl get hpa -n "$NAMESPACE" - log_success "Autoscaling test completed" + local cmd="python3 -m tests.load.load_tester normal --base-url $base_url" + [[ "$DEBUG_MODE" == "true" ]] && cmd="$cmd --duration 30 --users 5" + + log_debug "Running: $cmd" + + if eval "$cmd"; then + log_success "Normal load test completed" else - log_error "hey required for autoscaling tests" + log_error "Normal load test failed" return 1 fi } -load_normal() { - log_info "Running normal load test scenario..." - # TODO: Implement realistic mixed scenario -} - load_stress() { log_info "Running stress test to find breaking points..." - # TODO: Implement stress testing + + validate_cluster || return 1 + validate_namespace "$NAMESPACE" || return 1 + + validate_python_with_requirements "tests/requirements.txt" "${SCRIPT_DIR}/.." || return 1 + + local base_url + if ! base_url=$(get_base_url "$NAMESPACE"); then + log_error "Cannot reach eoAPI endpoints" + return 1 + fi + + wait_for_services "$base_url" + + log_info "Running Python stress test module..." + cd "${SCRIPT_DIR}/.." + + local cmd="python3 -m tests.load.load_tester --base-url $base_url" + [[ "$DEBUG_MODE" == "true" ]] && cmd="$cmd --test-duration 5 --max-workers 20" + + log_debug "Running: $cmd" + + if eval "$cmd"; then + log_success "Stress test completed" + else + log_error "Stress test failed" + return 1 + fi } load_chaos() { log_info "Running chaos testing with pod failures..." - # TODO: Implement chaos testing + + validate_cluster || return 1 + validate_namespace "$NAMESPACE" || return 1 + validate_python_with_requirements "tests/requirements.txt" "${SCRIPT_DIR}/.." || return 1 + + if ! command_exists kubectl; then + log_error "kubectl required for chaos testing" + return 1 + fi + + local base_url + if ! base_url=$(get_base_url "$NAMESPACE"); then + log_error "Cannot reach eoAPI endpoints" + return 1 + fi + + wait_for_services "$base_url" + + log_info "Running Python chaos test..." + cd "${SCRIPT_DIR}/.." + + local cmd="python3 -m tests.load.load_tester chaos --base-url $base_url --namespace $NAMESPACE" + [[ "$DEBUG_MODE" == "true" ]] && cmd="$cmd --duration 60 --kill-interval 30" + + log_debug "Running: $cmd" + + if eval "$cmd"; then + log_success "Chaos test completed" + else + log_error "Chaos test failed" + return 1 + fi } load_all() { @@ -285,7 +337,7 @@ main() { load_autoscaling ;; normal) - load_mixed + load_normal ;; stress) load_stress diff --git a/scripts/test/autoscaling.sh b/scripts/test/autoscaling.sh deleted file mode 100755 index 52d5d4fb..00000000 --- a/scripts/test/autoscaling.sh +++ /dev/null @@ -1,163 +0,0 @@ -#!/usr/bin/env bash - -# eoAPI Autoscaling Tests Script - -set -euo pipefail - -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -PROJECT_ROOT="$(cd "${SCRIPT_DIR}/../.." && pwd)" - -source "${SCRIPT_DIR}/../lib/common.sh" - -NAMESPACE="${NAMESPACE:-eoapi}" -RELEASE_NAME="${RELEASE_NAME:-eoapi}" - -run_autoscaling_tests() { - local pytest_args="${1:-}" - - log_info "Running autoscaling tests..." - - check_requirements python3 kubectl || return 1 - validate_cluster || return 1 - - log_info "Installing Python test dependencies..." - python3 -m pip install --user -r "${PROJECT_ROOT}/tests/requirements.txt" >/dev/null 2>&1 || { - log_warn "Could not install test dependencies automatically" - log_info "Try manually: pip install -r tests/requirements.txt" - } - - if ! kubectl get deployment -n "$NAMESPACE" -l "app.kubernetes.io/instance=$RELEASE_NAME" &>/dev/null; then - log_error "eoAPI deployment not found (release: $RELEASE_NAME, namespace: $NAMESPACE)" - log_info "Deploy first with: eoapi deployment run" - return 1 - fi - - if ! kubectl get hpa -n "$NAMESPACE" &>/dev/null || [[ $(kubectl get hpa -n "$NAMESPACE" --no-headers 2>/dev/null | wc -l) -eq 0 ]]; then - log_error "No HPA resources found in namespace $NAMESPACE" - log_info "Autoscaling tests require HPA resources. Deploy with autoscaling enabled." - return 1 - fi - - if ! kubectl get deployment metrics-server -n kube-system &>/dev/null; then - log_warn "metrics-server not found in kube-system, checking other namespaces..." - if ! kubectl get deployment -A | grep -q metrics-server; then - log_error "metrics-server is not deployed - required for autoscaling tests" - return 1 - fi - fi - - cd "$PROJECT_ROOT" - - export RELEASE_NAME="$RELEASE_NAME" - export NAMESPACE="$NAMESPACE" - - log_info "Setting up test environment for autoscaling tests..." - - local ingress_host - ingress_host=$(kubectl get ingress -n "$NAMESPACE" -o jsonpath='{.items[0].spec.rules[0].host}' 2>/dev/null || echo "localhost") - log_info "Using ingress host: $ingress_host" - - log_info "Verifying services are ready for load testing..." - local service_ready=false - local retries=15 # More retries for autoscaling tests - while [ $retries -gt 0 ]; do - if curl -s -f http://"$ingress_host"/stac >/dev/null 2>&1 && \ - curl -s -f http://"$ingress_host"/raster/healthz >/dev/null 2>&1 && \ - curl -s -f http://"$ingress_host"/vector/healthz >/dev/null 2>&1; then - service_ready=true - log_info "All services are responding correctly" - break - fi - retries=$((retries - 1)) - if [ $retries -gt 0 ]; then - log_debug "Waiting for services to be ready... (retries left: $retries)" - sleep 3 - fi - done - - if [ "$service_ready" = false ]; then - log_error "Services are not ready for autoscaling tests" - return 1 - fi - - log_info "Ensuring all pods are ready for load testing..." - for service in stac raster vector; do - local deployment="${RELEASE_NAME}-${service}" - if ! kubectl wait --for=condition=available deployment/"${deployment}" -n "$NAMESPACE" --timeout=90s 2>/dev/null; then - log_error "Deployment ${deployment} is not ready for autoscaling tests" - return 1 - fi - done - - log_info "Allowing services to stabilize before load testing..." - sleep 10 - - export STAC_ENDPOINT="${STAC_ENDPOINT:-http://$ingress_host/stac}" - export RASTER_ENDPOINT="${RASTER_ENDPOINT:-http://$ingress_host/raster}" - export VECTOR_ENDPOINT="${VECTOR_ENDPOINT:-http://$ingress_host/vector}" - - log_info "Test endpoints configured:" - log_info " STAC: $STAC_ENDPOINT" - log_info " Raster: $RASTER_ENDPOINT" - log_info " Vector: $VECTOR_ENDPOINT" - - log_info "Checking HPA metrics availability..." - local hpa_ready=false - local hpa_retries=5 - while [ $hpa_retries -gt 0 ]; do - if kubectl get hpa -n "$NAMESPACE" -o json | grep -q "currentCPUUtilizationPercentage\|currentMetrics"; then - hpa_ready=true - log_info "HPA metrics are available" - break - fi - hpa_retries=$((hpa_retries - 1)) - if [ $hpa_retries -gt 0 ]; then - log_debug "Waiting for HPA metrics... (retries left: $hpa_retries)" - sleep 5 - fi - done - - if [ "$hpa_ready" = false ]; then - log_warn "HPA metrics may not be fully available - tests might be flaky" - fi - - log_info "Running extended warmup for load testing..." - for round in {1..3}; do - log_debug "Warmup round $round/3" - for endpoint in "$STAC_ENDPOINT/collections" "$RASTER_ENDPOINT/healthz" "$VECTOR_ENDPOINT/healthz"; do - for _ in {1..5}; do - curl -s -f "$endpoint" >/dev/null 2>&1 || true - sleep 0.2 - done - done - sleep 2 - done - - log_info "Current HPA status before autoscaling tests:" - kubectl get hpa -n "$NAMESPACE" || true - - local cmd="python3 -m pytest tests/autoscaling" - [[ "$DEBUG_MODE" == "true" ]] && cmd="$cmd -v --tb=short" - [[ -n "$pytest_args" ]] && cmd="$cmd $pytest_args" - - log_debug "Running: $cmd" - - if eval "$cmd"; then - log_success "Autoscaling tests passed" - - # Log final HPA status after tests - log_info "Final HPA status after autoscaling tests:" - kubectl get hpa -n "$NAMESPACE" || true - - return 0 - else - log_error "Autoscaling tests failed" - - log_info "HPA status after failed autoscaling tests:" - kubectl get hpa -n "$NAMESPACE" || true - - return 1 - fi -} - -run_autoscaling_tests "$@" diff --git a/scripts/test/integration.sh b/scripts/test/integration.sh index f1a2ecb6..d8d41e38 100755 --- a/scripts/test/integration.sh +++ b/scripts/test/integration.sh @@ -17,14 +17,9 @@ run_integration_tests() { log_info "Running integration tests..." - check_requirements python3 kubectl || return 1 + check_requirements kubectl || return 1 validate_cluster || return 1 - - log_info "Installing Python test dependencies..." - python3 -m pip install --user -r "${PROJECT_ROOT}/tests/requirements.txt" >/dev/null 2>&1 || { - log_warn "Could not install test dependencies automatically" - log_info "Try manually: pip install -r tests/requirements.txt" - } + validate_python_with_requirements "tests/requirements.txt" "$PROJECT_ROOT" || return 1 if ! kubectl get deployment -n "$NAMESPACE" -l "app.kubernetes.io/instance=$RELEASE_NAME" &>/dev/null; then log_error "eoAPI deployment not found (release: $RELEASE_NAME, namespace: $NAMESPACE)" diff --git a/tests/load/README.md b/tests/load/README.md new file mode 100644 index 00000000..2db712c9 --- /dev/null +++ b/tests/load/README.md @@ -0,0 +1,227 @@ +# eoAPI Load Testing + +This directory contains load testing utilities and scripts for eoAPI services. + +## Overview + +The load testing framework provides: +- **Stress testing** to find service breaking points +- **Baseline load testing** for performance verification +- **Pytest-compatible tests** for CI/CD integration +- **Configurable parameters** for different testing scenarios + +## Components + +### `load_tester.py` +Core module containing the `LoadTester` class and unified CLI for all test types. + +**Usage:** +```bash +# Run with defaults (localhost, 50 max workers) +python3 -m tests.load.load_tester + +# Custom configuration +python3 -m tests.load.load_tester \ + --base-url http://my-eoapi.com \ + --endpoint /stac/search \ + --max-workers 100 \ + --success-threshold 90.0 \ + --test-duration 15 +``` + +**Parameters:** +- `--base-url`: Base URL for eoAPI services +- `--endpoint`: Specific endpoint to test (default: `/stac/collections`) +- `--max-workers`: Maximum concurrent workers (default: 50) +- `--success-threshold`: Minimum success rate % (default: 95.0) +- `--step-size`: Worker increment step (default: 5) +- `--test-duration`: Duration per concurrency level in seconds (default: 10) +- `--timeout`: Request timeout in seconds (default: 30) +- `--cooldown`: Time between test levels in seconds (default: 2) + +### Test Modules + +#### `test_load.py` +Baseline load tests and shared fixtures for basic functionality verification. + +**Test Classes:** +- `TestLoadBaseline`: Light load tests for basic service functionality +- `TestLoadScalability`: Response time and scalability tests +- `TestLoadIntegration`: Multi-service integration tests + +#### `test_stress.py` +Stress testing to find breaking points and verify resilience under high load. + +**Test Classes:** +- `TestStressBreakingPoints`: Find service breaking points +- `TestStressResilience`: Service recovery and sustained load tests +- `TestStressLimits`: Maximum capacity and error rate tests + +#### `test_normal.py` +Realistic production workload patterns and sustained usage simulation. + +**Test Classes:** +- `TestNormalMixedLoad`: Mixed endpoint realistic traffic patterns +- `TestNormalSustained`: Long-running moderate load tests +- `TestNormalUserPatterns`: User session and interaction simulation + +#### `test_chaos.py` +Chaos engineering tests for infrastructure failure resilience. + +**Test Classes:** +- `TestChaosResilience`: Pod failure and recovery tests +- `TestChaosNetwork`: Network instability and timeout handling +- `TestChaosResource`: Resource exhaustion and constraint tests +- `TestChaosRecovery`: Recovery timing and degradation patterns + +**Running Load Tests:** +```bash +# Run all load tests +pytest tests/load/ + +# Run specific test types +pytest tests/load/test_load.py # Baseline tests +pytest tests/load/test_stress.py # Stress tests +pytest tests/load/test_normal.py # Normal load tests +pytest tests/load/test_chaos.py # Chaos tests + +# Run specific test classes +pytest tests/load/test_stress.py::TestStressBreakingPoints +pytest tests/load/test_normal.py::TestNormalMixedLoad + +# Skip slow tests +pytest tests/load/ -m "not slow" +``` + +## Integration with Shell Scripts + +The load testing is integrated with the main `load.sh` script: + +```bash +# Run stress test via load.sh +./scripts/load.sh stress --debug + +# Run all load tests +./scripts/load.sh all +``` + +The shell script automatically: +- Installs Python dependencies +- Sets up environment variables +- Configures endpoints based on cluster state +- Runs tests with appropriate parameters + +## Configuration + +### Environment Variables +- `STAC_ENDPOINT`: STAC service URL +- `RASTER_ENDPOINT`: Raster service URL +- `VECTOR_ENDPOINT`: Vector service URL +- `DEBUG_MODE`: Enable debug output + +### Test Parameters +Tests can be configured via pytest markers: +- `@pytest.mark.slow`: Long-running stress tests +- `@pytest.mark.integration`: Multi-service tests + +### Performance Thresholds +Default success rate thresholds: +- Health endpoints: 98% +- API endpoints: 95% +- Stress tests: 90% + +## Best Practices + +### Local Development +```bash +# Quick smoke test +python3 -m tests.load.stress_test --max-workers 10 --test-duration 5 + +# Baseline verification +pytest tests/load/test_load.py::TestLoadBaseline -v +``` + +### CI/CD Integration +```bash +# Fast load tests for CI +pytest tests/load/ -m "not slow" --tb=short + +# Full load testing +./eoapi-cli test all --debug +``` + +### Production Validation +```bash +# Conservative stress test +python3 -m tests.load.stress_test \ + --max-workers 200 \ + --success-threshold 95.0 \ + --test-duration 30 \ + --cooldown 5 +``` + +## Monitoring + +During load tests, monitor: +- Pod CPU/Memory usage: `kubectl top pods -n eoapi` +- Service metrics: `kubectl get hpa -n eoapi` +- Response times and error rates in test output + +## Troubleshooting + +### Common Issues + +**ImportError: No module named 'tests.load'** +- Ensure you're running from the project root directory +- Install dependencies: `pip install -r tests/requirements.txt` + +**Connection refused errors** +- Verify services are running: `kubectl get pods -n eoapi` +- Check endpoints are accessible: `curl http://localhost/stac` +- Ensure ingress is configured correctly + +**Low success rates** +- Check resource limits and requests in Helm values +- Verify HPA is configured for autoscaling +- Monitor pod logs for errors: `kubectl logs -f deployment/eoapi-stac -n eoapi` + +### Debug Mode +Enable debug output for detailed information: +```bash +DEBUG_MODE=true python3 -m tests.load.stress_test +./scripts/load.sh stress --debug +``` + +## Extending + +### Adding New Test Endpoints +1. Add endpoints to appropriate test modules (`test_load.py`, `test_stress.py`, etc.) +2. Update `load_tester.py` with endpoint-specific logic if needed +3. Add endpoint validation to shell scripts + +### Custom Load Patterns +Create new test classes in the appropriate module: +```python +# In test_stress.py +class TestStressCustom: + def test_my_stress_scenario(self, base_url: str): + # Custom stress testing logic + pass + +# In test_normal.py +class TestNormalCustom: + def test_my_normal_scenario(self, base_url: str): + # Custom normal load testing logic + pass +``` + +### Integration with Monitoring +Extend tests to collect metrics: +```python +from .load_tester import LoadTester + +class MonitoringLoadTester(LoadTester): + def collect_metrics(self): + # Custom metrics collection + pass +``` diff --git a/tests/load/load_tester.py b/tests/load/load_tester.py new file mode 100644 index 00000000..d6b9efa1 --- /dev/null +++ b/tests/load/load_tester.py @@ -0,0 +1,405 @@ +#!/usr/bin/env python3 +""" +eoAPI Load Testing Utility + +This module provides the core LoadTester class and CLI for all types of +load testing: stress, normal, and chaos testing. +""" + +import argparse +import concurrent.futures +import os +import random +import subprocess +import sys +import time +from typing import Tuple + +import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + + +class LoadTester: + """Load tester for eoAPI endpoints supporting stress, normal, and chaos testing""" + + def __init__(self, base_url: str, max_workers: int = 50, timeout: int = 30): + self.base_url = base_url.rstrip("/") + self.max_workers = max_workers + self.timeout = timeout + self.session = self._create_session() + + def _create_session(self) -> requests.Session: + """Create a session with retry strategy""" + session = requests.Session() + + # Retry strategy + retry_strategy = Retry( + total=3, + backoff_factor=1, + status_forcelist=[429, 500, 502, 503, 504], + ) + + adapter = HTTPAdapter(max_retries=retry_strategy) + session.mount("http://", adapter) + session.mount("https://", adapter) + + return session + + def make_request(self, url: str) -> bool: + """Make a single request and return success status""" + try: + response = self.session.get(url, timeout=self.timeout) + return response.status_code == 200 + except Exception: + return False + + def test_concurrency_level( + self, url: str, workers: int, duration: int = 10 + ) -> Tuple[int, int, float]: + """Test a specific concurrency level for a given duration""" + print(f"Testing with {workers} concurrent requests...") + + start_time = time.time() + success_count = 0 + total_requests = 0 + + with concurrent.futures.ThreadPoolExecutor( + max_workers=workers + ) as executor: + futures = [] + + # Submit requests for the specified duration + while time.time() - start_time < duration: + future = executor.submit(self.make_request, url) + futures.append(future) + total_requests += 1 + time.sleep(0.1) # Small delay between request submissions + + # Collect results + for future in concurrent.futures.as_completed(futures): + if future.result(): + success_count += 1 + + success_rate = ( + (success_count / total_requests) * 100 if total_requests > 0 else 0 + ) + print( + f"Workers: {workers}, Success rate: {success_rate:.1f}% ({success_count}/{total_requests})" + ) + + return success_count, total_requests, success_rate + + def find_breaking_point( + self, + endpoint: str = "/stac/collections", + success_threshold: float = 95.0, + step_size: int = 5, + test_duration: int = 10, + cooldown: int = 2, + ) -> int: + """ + Find the breaking point by gradually increasing concurrent load + + Args: + endpoint: API endpoint to test (relative to base_url) + success_threshold: Minimum success rate to maintain + step_size: Increment for number of workers + test_duration: Duration to test each concurrency level + cooldown: Time to wait between tests + + Returns: + Number of workers at breaking point + """ + url = f"{self.base_url}{endpoint}" + print(f"Starting stress test on {url}") + print( + f"Max workers: {self.max_workers}, Success threshold: {success_threshold}%" + ) + + for workers in range(step_size, self.max_workers + 1, step_size): + _, _, success_rate = self.test_concurrency_level( + url, workers, test_duration + ) + + # Stop if success rate drops below threshold + if success_rate < success_threshold: + print( + f"Breaking point found at {workers} concurrent requests (success rate: {success_rate:.1f}%)" + ) + return workers + + # Cool down between test levels + if cooldown > 0: + time.sleep(cooldown) + + print("Stress test completed - no breaking point found") + return self.max_workers + + def run_normal_load( + self, + endpoints: list = None, + duration: int = 60, + concurrent_users: int = 10, + ramp_up: int = 30, + ) -> dict: + """ + Run realistic mixed-workload test + + Args: + endpoints: List of endpoints to test + duration: Total test duration + concurrent_users: Peak concurrent users + ramp_up: Time to reach peak load + + Returns: + Dict with results for each endpoint + """ + if endpoints is None: + endpoints = [ + "/stac/collections", + "/raster/healthz", + "/vector/healthz", + ] + + results = {} + print( + f"Starting normal load test ({duration}s, {concurrent_users} users)" + ) + + for endpoint in endpoints: + url = f"{self.base_url}{endpoint}" + print(f"Testing {endpoint}...") + + # Gradual ramp-up + workers = max(1, concurrent_users // len(endpoints)) + success, total, rate = self.test_concurrency_level( + url, workers, duration // len(endpoints) + ) + + results[endpoint] = { + "success_count": success, + "total_requests": total, + "success_rate": rate, + } + + return results + + def run_chaos_test( + self, + namespace: str = "eoapi", + duration: int = 300, + kill_interval: int = 60, + endpoint: str = "/stac/collections", + ) -> dict: + """ + Run chaos test by killing pods during load + + Args: + namespace: Kubernetes namespace + duration: Test duration + kill_interval: Seconds between pod kills + endpoint: Endpoint to test + + Returns: + Test results and pod kill events + """ + url = f"{self.base_url}{endpoint}" + print(f"Starting chaos test on {url} (namespace: {namespace})") + + # Get initial pod list + try: + pods = ( + subprocess.check_output( + [ + "kubectl", + "get", + "pods", + "-n", + namespace, + "-l", + "app.kubernetes.io/component in (stac,raster,vector)", + "-o", + "jsonpath={.items[*].metadata.name}", + ], + text=True, + ) + .strip() + .split() + ) + except subprocess.CalledProcessError: + print("Warning: Could not get pod list, chaos disabled") + pods = [] + + results = {"killed_pods": [], "success_rate": 0} + start_time = time.time() + + # Background load generation + import threading + + load_results = {"success": 0, "total": 0} + + def generate_load(): + while time.time() - start_time < duration: + if self.make_request(url): + load_results["success"] += 1 + load_results["total"] += 1 + time.sleep(0.5) + + # Start load generation + load_thread = threading.Thread(target=generate_load) + load_thread.start() + + # Kill pods periodically + while time.time() - start_time < duration and pods: + time.sleep(kill_interval) + + if pods: + pod_to_kill = random.choice(pods) + print(f"Killing pod: {pod_to_kill}") + try: + subprocess.run( + [ + "kubectl", + "delete", + "pod", + pod_to_kill, + "-n", + namespace, + ], + check=True, + capture_output=True, + ) + results["killed_pods"].append(pod_to_kill) + pods.remove(pod_to_kill) + except subprocess.CalledProcessError as e: + print(f"Failed to kill pod {pod_to_kill}: {e}") + + load_thread.join() + + if load_results["total"] > 0: + results["success_rate"] = ( + load_results["success"] / load_results["total"] + ) * 100 + results.update(load_results) + + print( + f"Chaos test completed: {results['success_rate']:.1f}% success rate, killed {len(results['killed_pods'])} pods" + ) + return results + + +def main(): + """Main entry point for eoAPI load testing CLI""" + parser = argparse.ArgumentParser(description="eoAPI Load Testing CLI") + + # Test type selection + parser.add_argument( + "test_type", + choices=["stress", "normal", "chaos"], + default="stress", + nargs="?", + help="Type of test to run (default: stress)", + ) + + # Common arguments + parser.add_argument( + "--base-url", + default=os.getenv("STAC_ENDPOINT", "http://localhost").replace( + "/stac", "" + ), + help="Base URL for eoAPI (default: from STAC_ENDPOINT env or http://localhost)", + ) + parser.add_argument( + "--timeout", + type=int, + default=30, + help="Request timeout in seconds (default: 30)", + ) + + # Stress test arguments + stress_group = parser.add_argument_group("stress test options") + stress_group.add_argument("--endpoint", default="/stac/collections") + stress_group.add_argument("--max-workers", type=int, default=50) + stress_group.add_argument("--success-threshold", type=float, default=95.0) + stress_group.add_argument("--step-size", type=int, default=5) + stress_group.add_argument("--test-duration", type=int, default=10) + stress_group.add_argument("--cooldown", type=int, default=2) + + # Normal test arguments + normal_group = parser.add_argument_group("normal test options") + normal_group.add_argument( + "--duration", type=int, default=60, help="Test duration (default: 60)" + ) + normal_group.add_argument( + "--users", type=int, default=10, help="Concurrent users (default: 10)" + ) + + # Chaos test arguments + chaos_group = parser.add_argument_group("chaos test options") + chaos_group.add_argument( + "--namespace", + default="eoapi", + help="Kubernetes namespace (default: eoapi)", + ) + chaos_group.add_argument( + "--kill-interval", + type=int, + default=60, + help="Seconds between pod kills (default: 60)", + ) + + args = parser.parse_args() + + try: + tester = LoadTester( + base_url=args.base_url, + max_workers=getattr(args, "max_workers", 50), + timeout=args.timeout, + ) + + if args.test_type == "stress": + result = tester.find_breaking_point( + endpoint=args.endpoint, + success_threshold=args.success_threshold, + step_size=args.step_size, + test_duration=args.test_duration, + cooldown=args.cooldown, + ) + print(f"\nStress test completed. Breaking point: {result} workers") + sys.exit(1 if result < args.max_workers else 0) + + elif args.test_type == "normal": + results = tester.run_normal_load( + duration=args.duration, + concurrent_users=args.users, + ) + avg_success = sum( + r["success_rate"] for r in results.values() + ) / len(results) + print( + f"\nNormal load test completed. Average success rate: {avg_success:.1f}%" + ) + sys.exit(0 if avg_success >= 95 else 1) + + elif args.test_type == "chaos": + results = tester.run_chaos_test( + namespace=args.namespace, + duration=args.duration, + kill_interval=args.kill_interval, + ) + print( + f"\nChaos test completed. Success rate: {results['success_rate']:.1f}%" + ) + sys.exit(0 if results["success_rate"] >= 80 else 1) + + except KeyboardInterrupt: + print(f"\n{args.test_type.title()} test interrupted by user") + sys.exit(2) + except Exception as e: + print(f"{args.test_type.title()} test failed: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/tests/load/test_chaos.py b/tests/load/test_chaos.py new file mode 100644 index 00000000..a2f3490f --- /dev/null +++ b/tests/load/test_chaos.py @@ -0,0 +1,260 @@ +#!/usr/bin/env python3 +""" +Pytest-based chaos tests for eoAPI services + +This module provides chaos engineering tests to verify service resilience +during infrastructure failures, network issues, and resource constraints. +""" + +import os +import subprocess +import time + +import pytest + +from .load_tester import LoadTester + + +@pytest.fixture +def base_url() -> str: + """Get the base URL for eoAPI services""" + stac_endpoint = os.getenv("STAC_ENDPOINT", "http://localhost/stac") + return stac_endpoint.replace("/stac", "") + + +class TestChaosResilience: + """Tests for service resilience during infrastructure chaos""" + + @pytest.mark.slow + def test_pod_failure_resilience(self, base_url: str): + """Test service resilience during pod failures""" + try: + subprocess.run( + ["kubectl", "version", "--client"], + check=True, + capture_output=True, + ) + except (subprocess.CalledProcessError, FileNotFoundError): + pytest.skip("kubectl not available or not in cluster environment") + + tester = LoadTester(base_url, timeout=5) + + results = tester.run_chaos_test( + duration=60, kill_interval=30, endpoint="/stac/collections" + ) + + # Even with chaos, should maintain some service level + assert results["success_rate"] >= 60.0, ( + f"Chaos test failed: {results['success_rate']}% success rate" + ) + + @pytest.mark.slow + def test_multiple_service_failures(self, base_url: str): + """Test resilience when multiple services experience issues""" + try: + subprocess.run( + ["kubectl", "get", "pods"], + check=True, + capture_output=True, + ) + except (subprocess.CalledProcessError, FileNotFoundError): + pytest.skip("kubectl not available") + + tester = LoadTester(base_url, timeout=8) + + # Test different endpoints during chaos + endpoints = ["/stac/collections", "/raster/healthz", "/vector/healthz"] + results = [] + + for endpoint in endpoints: + chaos_results = tester.run_chaos_test( + duration=45, + kill_interval=20, + endpoint=endpoint, + ) + results.append(chaos_results["success_rate"]) + + # At least one service should maintain reasonable uptime + max_success_rate = max(results) + assert max_success_rate >= 70.0, ( + f"All services failed during chaos: max {max_success_rate}%" + ) + + def test_gradual_failure_recovery(self, base_url: str): + """Test service recovery after gradual failure introduction""" + tester = LoadTester(base_url, max_workers=10, timeout=3) + url = f"{base_url}/stac/collections" + + # Phase 1: Normal operation + _, _, normal_rate = tester.test_concurrency_level(url, 3, 10) + + # Phase 2: Introduce failures (simulate with aggressive timeouts) + aggressive_tester = LoadTester(base_url, max_workers=10, timeout=1) + _, _, degraded_rate = aggressive_tester.test_concurrency_level( + url, 5, 15 + ) + + # Phase 3: Recovery (return to normal) + time.sleep(5) # Recovery time + _, _, recovery_rate = tester.test_concurrency_level(url, 3, 10) + + assert normal_rate >= 90.0, "Baseline performance too low" + assert recovery_rate >= 85.0, ( + f"Service didn't recover properly: {recovery_rate}%" + ) + + +class TestChaosNetwork: + """Tests for network-related chaos scenarios""" + + def test_network_instability(self, base_url: str): + """Test behavior under network instability""" + # Simulate network issues with very short timeouts + tester = LoadTester(base_url, max_workers=5, timeout=2) + url = f"{base_url}/stac/collections" + + success, total, rate = tester.test_concurrency_level(url, 3, 10) + + # Should handle some failures gracefully + assert rate >= 50.0, "Complete failure under network instability" + assert total > 0, "No requests made during instability test" + + def test_timeout_cascade_prevention(self, base_url: str): + """Test that timeout issues don't cascade across requests""" + # Use progressively shorter timeouts to simulate degradation + timeouts = [5, 3, 1, 2, 4] # Recovery pattern + url = f"{base_url}/stac/collections" + + results = [] + for timeout in timeouts: + tester = LoadTester(base_url, max_workers=3, timeout=timeout) + _, _, rate = tester.test_concurrency_level(url, 2, 5) + results.append(rate) + time.sleep(1) + + # Should show recovery in later phases + recovery_rate = results[-1] + assert recovery_rate >= 80.0, ( + f"No recovery from timeout cascade: {recovery_rate}%" + ) + + def test_concurrent_failure_modes(self, base_url: str): + """Test multiple failure modes occurring simultaneously""" + # Combine short timeouts with high concurrency + tester = LoadTester(base_url, max_workers=5, timeout=10) + + endpoints = ["/stac/collections", "/raster/healthz", "/vector/healthz"] + concurrent_results = [] + + # Test all endpoints simultaneously under stress + for endpoint in endpoints: + url = f"{base_url}{endpoint}" + _, _, rate = tester.test_concurrency_level(url, 4, 12) + concurrent_results.append(rate) + + # At least health endpoints should maintain some reliability + health_rates = [r for i, r in enumerate(concurrent_results) if i > 0] + if health_rates: + max_health_rate = max(health_rates) + assert max_health_rate >= 60.0, ( + f"All health endpoints failed: max {max_health_rate}%" + ) + + +class TestChaosResource: + """Tests for resource constraint chaos scenarios""" + + def test_resource_exhaustion_simulation(self, base_url: str): + """Test behavior when resources are constrained""" + # Simulate resource exhaustion with many concurrent requests + tester = LoadTester(base_url, max_workers=25, timeout=5) + url = f"{base_url}/stac/collections" + + success, total, rate = tester.test_concurrency_level(url, 20, 15) + + # Should gracefully degrade, not completely fail + assert rate >= 30.0, ( + f"Complete failure under resource pressure: {rate}%" + ) + assert total >= 50, "Insufficient load applied for resource test" + + def test_memory_pressure_resilience(self, base_url: str): + """Test resilience under simulated memory pressure""" + # Use many concurrent connections to simulate memory pressure + tester = LoadTester(base_url, max_workers=30, timeout=8) + + # Test with sustained high concurrency + url = ( + f"{base_url}/raster/healthz" # Health endpoint should be resilient + ) + success, total, rate = tester.test_concurrency_level(url, 15, 20) + + # Health endpoints should maintain higher reliability + assert rate >= 50.0, f"Health endpoint failed under pressure: {rate}%" + + def test_connection_pool_exhaustion(self, base_url: str): + """Test behavior when connection pools are exhausted""" + # Create multiple testers to exhaust connection pools + testers = [ + LoadTester(base_url, max_workers=10, timeout=3) for _ in range(3) + ] + + url = f"{base_url}/stac/collections" + results = [] + + # Concurrent tests from multiple testers + for i, tester in enumerate(testers): + _, _, rate = tester.test_concurrency_level(url, 6, 8) + results.append(rate) + + # At least one connection pool should work reasonably + max_rate = max(results) + assert max_rate >= 40.0, f"All connection pools failed: max {max_rate}%" + + +class TestChaosRecovery: + """Tests for service recovery patterns after chaos events""" + + def test_automatic_recovery_timing(self, base_url: str): + """Test automatic service recovery after failures""" + tester = LoadTester(base_url, max_workers=8, timeout=15) + url = f"{base_url}/stac/collections" + + # Phase 1: Induce failures + failure_tester = LoadTester(base_url, max_workers=20, timeout=1) + _, _, failure_rate = failure_tester.test_concurrency_level(url, 15, 10) + + # Phase 2: Monitor recovery over time + recovery_times = [5, 10, 15] # Recovery intervals + recovery_rates = [] + + for wait_time in recovery_times: + time.sleep(wait_time) + _, _, rate = tester.test_concurrency_level(url, 3, 5) + recovery_rates.append(rate) + + # Should show progressive recovery + final_rate = recovery_rates[-1] + assert final_rate >= 80.0, f"No recovery after chaos: {final_rate}%" + + def test_service_degradation_levels(self, base_url: str): + """Test graceful degradation under increasing chaos""" + url = f"{base_url}/stac/collections" + + # Progressive degradation test + chaos_levels = [ + (5, 10, 5), # Light chaos + (3, 15, 8), # Medium chaos + (1, 20, 12), # Heavy chaos + ] + + degradation_rates = [] + for timeout, workers, duration in chaos_levels: + tester = LoadTester(base_url, max_workers=25, timeout=timeout) + _, _, rate = tester.test_concurrency_level(url, workers, duration) + degradation_rates.append(rate) + time.sleep(3) # Brief recovery between tests + + # Should show controlled degradation, not cliff-edge failure + assert degradation_rates[0] >= 70.0, "Failed at low chaos level" + assert min(degradation_rates) >= 20.0, "Complete failure under chaos" diff --git a/tests/load/test_load.py b/tests/load/test_load.py new file mode 100644 index 00000000..4a48a714 --- /dev/null +++ b/tests/load/test_load.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python3 +""" +Pytest-based baseline load tests for eoAPI services + +This module provides shared fixtures and baseline functionality tests +that verify basic service performance under light load conditions. +""" + +import os +import time + +import pytest +import requests + +from .load_tester import LoadTester + + +@pytest.fixture +def base_url() -> str: + """Get the base URL for eoAPI services""" + stac_endpoint = os.getenv("STAC_ENDPOINT", "http://localhost/stac") + return stac_endpoint.replace("/stac", "") + + +@pytest.fixture +def load_tester(base_url: str) -> LoadTester: + """Create a LoadTester instance""" + return LoadTester(base_url=base_url, max_workers=20, timeout=10) + + +class TestLoadBaseline: + """Basic load tests to verify service functionality under light load""" + + def test_stac_collections_light_load(self, base_url: str): + """Test STAC collections endpoint with light concurrent load""" + url = f"{base_url}/stac/collections" + + # Test with 3 concurrent requests for 5 seconds + tester = LoadTester(base_url, max_workers=10, timeout=10) + success_count, total_requests, success_rate = ( + tester.test_concurrency_level(url, workers=3, duration=5) + ) + + assert success_rate >= 95.0, f"Success rate {success_rate}% below 95%" + assert total_requests > 0, "No requests were made" + assert success_count > 0, "No successful requests" + + def test_raster_health_light_load(self, base_url: str): + """Test raster health endpoint with light concurrent load""" + url = f"{base_url}/raster/healthz" + + tester = LoadTester(base_url, max_workers=10, timeout=10) + success_count, total_requests, success_rate = ( + tester.test_concurrency_level(url, workers=2, duration=3) + ) + + assert success_rate >= 98.0, ( + f"Health endpoint success rate {success_rate}% below 98%" + ) + + def test_vector_health_light_load(self, base_url: str): + """Test vector health endpoint with light concurrent load""" + url = f"{base_url}/vector/healthz" + + tester = LoadTester(base_url, max_workers=10, timeout=10) + success_count, total_requests, success_rate = ( + tester.test_concurrency_level(url, workers=2, duration=3) + ) + + assert success_rate >= 98.0, ( + f"Health endpoint success rate {success_rate}% below 98%" + ) + + +class TestLoadScalability: + """Tests for service scalability characteristics""" + + def test_response_time_under_load(self, base_url: str): + """Test that response times remain reasonable under moderate load""" + url = f"{base_url}/stac/collections" + + # Single request baseline + start_time = time.time() + response = requests.get(url, timeout=10) + baseline_time = time.time() - start_time + + assert response.status_code == 200, "Baseline request failed" + + # Test with concurrent load + session = requests.Session() + times = [] + + for _ in range(5): + start_time = time.time() + response = session.get(url, timeout=10) + request_time = time.time() - start_time + times.append(request_time) + assert response.status_code == 200, "Request under load failed" + + avg_load_time = sum(times) / len(times) + + # Response time shouldn't increase more than 5x under light concurrent load + # Allow more tolerance since we're testing on a shared system + max_allowed_time = max( + baseline_time * 5, 0.1 + ) # At least 100ms tolerance + assert avg_load_time <= max_allowed_time, ( + f"Response time degraded too much: {avg_load_time:.2f}s vs {baseline_time:.2f}s baseline (max allowed: {max_allowed_time:.2f}s)" + ) + + @pytest.mark.parametrize( + "endpoint", ["/stac/collections", "/raster/healthz", "/vector/healthz"] + ) + def test_endpoint_availability(self, base_url: str, endpoint: str): + """Test that endpoints remain available under light load""" + url = f"{base_url}{endpoint}" + + tester = LoadTester(base_url, max_workers=5, timeout=15) + success_count, total_requests, success_rate = ( + tester.test_concurrency_level(url, workers=2, duration=3) + ) + + assert success_rate >= 95.0, ( + f"{endpoint} availability {success_rate}% below 95%" + ) + assert total_requests >= 10, f"Too few requests made to {endpoint}" + + +@pytest.mark.integration +class TestLoadIntegration: + """Integration load tests that test multiple services together""" + + def test_mixed_endpoint_load(self, base_url: str): + """Test load across multiple endpoints simultaneously""" + endpoints = ["/stac/collections", "/raster/healthz", "/vector/healthz"] + + results = {} + + # Test each endpoint with light concurrent load + for endpoint in endpoints: + url = f"{base_url}{endpoint}" + tester = LoadTester(base_url, max_workers=5, timeout=10) + + success_count, total_requests, success_rate = ( + tester.test_concurrency_level(url, workers=2, duration=3) + ) + + results[endpoint] = { + "success_rate": success_rate, + "total_requests": total_requests, + } + + # All endpoints should maintain good performance + for endpoint, result in results.items(): + assert result["success_rate"] >= 90.0, ( + f"{endpoint} failed with {result['success_rate']}% success rate" + ) + assert result["total_requests"] > 0, ( + f"No requests made to {endpoint}" + ) diff --git a/tests/load/test_normal.py b/tests/load/test_normal.py new file mode 100644 index 00000000..4e0cdd6e --- /dev/null +++ b/tests/load/test_normal.py @@ -0,0 +1,220 @@ +#!/usr/bin/env python3 +""" +Pytest-based normal load tests for eoAPI services + +This module provides realistic mixed-workload tests that simulate +normal production traffic patterns and sustained usage. +""" + +import os +import time + +import pytest + +from .load_tester import LoadTester + + +@pytest.fixture +def base_url() -> str: + """Get the base URL for eoAPI services""" + stac_endpoint = os.getenv("STAC_ENDPOINT", "http://localhost/stac") + return stac_endpoint.replace("/stac", "") + + +class TestNormalMixedLoad: + """Tests with realistic mixed workload patterns""" + + def test_mixed_endpoint_load(self, base_url: str): + """Test normal load with mixed endpoints simultaneously""" + tester = LoadTester(base_url, max_workers=15, timeout=10) + + results = tester.run_normal_load( + duration=30, concurrent_users=8, ramp_up=10 + ) + + # All endpoints should maintain good performance + for endpoint, result in results.items(): + assert result["success_rate"] >= 90.0, ( + f"{endpoint} failed with {result['success_rate']}% success rate" + ) + assert result["total_requests"] > 0 + + def test_stac_workflow_simulation(self, base_url: str): + """Simulate typical STAC API workflow""" + tester = LoadTester(base_url, max_workers=10, timeout=15) + + # Typical workflow: collections -> search -> items + workflow_endpoints = [ + "/stac/collections", + "/stac/search", + "/stac/collections", # Often revisited + ] + + total_success = 0 + total_requests = 0 + + for endpoint in workflow_endpoints: + url = f"{base_url}{endpoint}" + success, requests, rate = tester.test_concurrency_level( + url, workers=3, duration=8 + ) + total_success += success + total_requests += requests + + # Brief pause between workflow steps + time.sleep(1) + + workflow_success_rate = (total_success / total_requests) * 100 + assert workflow_success_rate >= 92.0, ( + f"Workflow success rate {workflow_success_rate}% too low" + ) + + def test_realistic_traffic_pattern(self, base_url: str): + """Test with realistic traffic pattern variations""" + tester = LoadTester(base_url, max_workers=12, timeout=12) + + # Simulate varying load throughout the day + traffic_pattern = [ + (2, 5), # Low morning traffic + (5, 8), # Moderate midday + (3, 5), # Afternoon dip + (6, 10), # Peak evening + ] + + results = [] + for workers, duration in traffic_pattern: + url = f"{base_url}/stac/collections" + _, _, rate = tester.test_concurrency_level(url, workers, duration) + results.append(rate) + time.sleep(2) # Transition time + + avg_performance = sum(results) / len(results) + assert avg_performance >= 95.0, ( + f"Traffic pattern handling failed: {avg_performance}%" + ) + + +class TestNormalSustained: + """Tests for sustained normal load over extended periods""" + + def test_sustained_moderate_load(self, base_url: str): + """Test sustained moderate load over time""" + tester = LoadTester(base_url, max_workers=10, timeout=15) + url = f"{base_url}/stac/collections" + + # Sustained load for 45 seconds + success, total, rate = tester.test_concurrency_level( + url, workers=5, duration=45 + ) + + assert rate >= 95.0, f"Sustained load failed: {rate}% success rate" + assert total >= 200, "Too few requests for sustained test" + + def test_consistent_response_times(self, base_url: str): + """Test that response times remain consistent under normal load""" + tester = LoadTester(base_url, max_workers=8, timeout=10) + url = f"{base_url}/stac/collections" + + # Collect response time samples + response_times = [] + for _ in range(10): + start_time = time.time() + success = tester.make_request(url) + response_time = time.time() - start_time + + if success: + response_times.append(response_time) + + time.sleep(0.5) + + if response_times: + avg_time = sum(response_times) / len(response_times) + max_time = max(response_times) + + # Response times should be reasonable and consistent + assert avg_time <= 2.0, ( + f"Average response time too high: {avg_time:.2f}s" + ) + assert max_time <= 5.0, ( + f"Max response time too high: {max_time:.2f}s" + ) + + def test_memory_stability_under_load(self, base_url: str): + """Test that service remains stable under prolonged normal load""" + tester = LoadTester(base_url, max_workers=8, timeout=10) + url = f"{base_url}/raster/healthz" # Health endpoint should be very stable + + # Run for 60 seconds with steady load + success, total, rate = tester.test_concurrency_level( + url, workers=4, duration=60 + ) + + # Health endpoints should be extremely reliable + assert rate >= 98.0, ( + f"Health endpoint instability: {rate}% success rate" + ) + + +class TestNormalUserPatterns: + """Tests simulating realistic user interaction patterns""" + + def test_concurrent_user_sessions(self, base_url: str): + """Test multiple concurrent user sessions""" + tester = LoadTester(base_url, max_workers=12, timeout=12) + + # Simulate 6 concurrent users, each making requests over time + url = f"{base_url}/stac/collections" + success, total, rate = tester.test_concurrency_level( + url, workers=6, duration=25 + ) + + assert rate >= 93.0, f"Concurrent user test failed: {rate}% success" + assert total >= 100, "Insufficient concurrent user simulation" + + def test_user_session_duration(self, base_url: str): + """Test typical user session duration patterns""" + tester = LoadTester(base_url, max_workers=6, timeout=15) + + # Simulate user sessions of different lengths + session_patterns = [ + ("/stac/collections", 3, 8), # Quick browse + ("/stac/search", 2, 12), # Detailed search + ("/vector/healthz", 1, 5), # Health check + ] + + total_success_rate = 0 + for endpoint, workers, duration in session_patterns: + url = f"{base_url}{endpoint}" + _, _, rate = tester.test_concurrency_level(url, workers, duration) + total_success_rate += rate + + avg_session_success = total_success_rate / len(session_patterns) + assert avg_session_success >= 94.0, ( + f"User session patterns failed: {avg_session_success}%" + ) + + def test_api_usage_distribution(self, base_url: str): + """Test realistic API endpoint usage distribution""" + tester = LoadTester(base_url, max_workers=10, timeout=12) + + # Realistic usage: collections (high), search (medium), health (low) + usage_pattern = [ + ("/stac/collections", 4, 15), # High usage + ("/stac/search", 2, 10), # Medium usage + ("/raster/healthz", 1, 5), # Low usage + ("/vector/healthz", 1, 5), # Low usage + ] + + results = {} + for endpoint, workers, duration in usage_pattern: + url = f"{base_url}{endpoint}" + success, total, rate = tester.test_concurrency_level( + url, workers, duration + ) + results[endpoint] = {"rate": rate, "total": total} + + # All endpoints should perform well under their expected load + for endpoint, result in results.items(): + assert result["rate"] >= 90.0, ( + f"{endpoint} failed under expected load: {result['rate']}%" + ) diff --git a/tests/load/test_stress.py b/tests/load/test_stress.py new file mode 100644 index 00000000..876b115f --- /dev/null +++ b/tests/load/test_stress.py @@ -0,0 +1,180 @@ +#!/usr/bin/env python3 +""" +Pytest-based stress tests for eoAPI services + +This module provides stress testing functionality to find breaking points +and test service resilience under high load. +""" + +import os +import time + +import pytest + +from .load_tester import LoadTester + + +@pytest.fixture +def base_url() -> str: + """Get the base URL for eoAPI services""" + stac_endpoint = os.getenv("STAC_ENDPOINT", "http://localhost/stac") + return stac_endpoint.replace("/stac", "") + + +@pytest.fixture +def stress_tester(base_url: str) -> LoadTester: + """Create a LoadTester instance optimized for stress testing""" + return LoadTester(base_url=base_url, max_workers=50, timeout=10) + + +class TestStressBreakingPoints: + """Tests to find service breaking points under increasing load""" + + @pytest.mark.slow + def test_stac_collections_stress(self, stress_tester: LoadTester): + """Find breaking point for STAC collections endpoint""" + breaking_point = stress_tester.find_breaking_point( + endpoint="/stac/collections", + success_threshold=90.0, + step_size=3, + test_duration=5, + cooldown=1, + ) + + assert breaking_point >= 6, f"Breaking point {breaking_point} too low" + + @pytest.mark.slow + def test_stac_search_stress(self, stress_tester: LoadTester): + """Find breaking point for STAC search endpoint""" + breaking_point = stress_tester.find_breaking_point( + endpoint="/stac/search", + success_threshold=85.0, # Lower threshold for search + step_size=2, + test_duration=8, + cooldown=2, + ) + + assert breaking_point >= 4, ( + f"Search breaking point {breaking_point} too low" + ) + + def test_health_endpoints_stress(self, stress_tester: LoadTester): + """Test health endpoints under stress - should handle high load""" + for endpoint in ["/raster/healthz", "/vector/healthz"]: + breaking_point = stress_tester.find_breaking_point( + endpoint=endpoint, + success_threshold=95.0, # Health endpoints should be more resilient + step_size=5, + test_duration=3, + cooldown=1, + ) + + assert breaking_point >= 10, ( + f"{endpoint} breaking point {breaking_point} too low" + ) + + +class TestStressResilience: + """Tests for service resilience and recovery under stress""" + + @pytest.mark.slow + def test_service_recovery_after_stress(self, base_url: str): + """Test that services recover properly after high stress""" + tester = LoadTester(base_url, max_workers=20, timeout=5) + url = f"{base_url}/stac/collections" + + # Apply high stress load + _, _, stress_rate = tester.test_concurrency_level( + url, workers=15, duration=5 + ) + + # Allow recovery time + time.sleep(3) + + # Test normal load after stress + _, _, recovery_rate = tester.test_concurrency_level( + url, workers=2, duration=5 + ) + + assert recovery_rate >= 95.0, ( + f"Service didn't recover properly: {recovery_rate}%" + ) + + def test_sustained_high_load(self, base_url: str): + """Test service behavior under sustained high load""" + tester = LoadTester(base_url, max_workers=15, timeout=8) + url = f"{base_url}/stac/collections" + + # Sustained load for 30 seconds + _, _, success_rate = tester.test_concurrency_level( + url, workers=8, duration=30 + ) + + assert success_rate >= 80.0, ( + f"Sustained load failed: {success_rate}% success rate" + ) + + def test_burst_load_handling(self, base_url: str): + """Test handling of burst traffic patterns""" + tester = LoadTester(base_url, max_workers=25, timeout=5) + url = f"{base_url}/stac/collections" + + results = [] + + # Simulate burst pattern: high -> low -> high + for workers, duration in [(1, 3), (12, 5), (2, 3), (15, 5)]: + _, _, rate = tester.test_concurrency_level(url, workers, duration) + results.append(rate) + time.sleep(1) # Brief pause between bursts + + # All burst phases should maintain reasonable performance + avg_performance = sum(results) / len(results) + assert avg_performance >= 85.0, ( + f"Burst handling failed: {avg_performance}% average performance" + ) + + +class TestStressLimits: + """Tests to verify service limits and thresholds""" + + @pytest.mark.slow + def test_maximum_concurrent_users(self, stress_tester: LoadTester): + """Test behavior at maximum designed concurrent user limit""" + # Test at high concurrency level + url = f"{stress_tester.base_url}/stac/collections" + + _, _, success_rate = stress_tester.test_concurrency_level( + url, workers=25, duration=10 + ) + + # Should handle some level of high concurrency + assert success_rate >= 70.0, ( + f"High concurrency test failed: {success_rate}% success rate" + ) + + def test_timeout_behavior_under_load(self, base_url: str): + """Test timeout behavior when system is under stress""" + # Use shorter timeout to trigger timeout conditions + tester = LoadTester(base_url, max_workers=20, timeout=2) + url = f"{base_url}/stac/collections" + + _, total, _ = tester.test_concurrency_level(url, workers=10, duration=8) + + # Should make reasonable number of attempts even with timeouts + assert total >= 30, f"Too few requests attempted: {total}" + + def test_error_rate_under_stress(self, base_url: str): + """Test that error rates remain within acceptable bounds under stress""" + tester = LoadTester(base_url, max_workers=30, timeout=5) + url = f"{base_url}/stac/collections" + + success, total, success_rate = tester.test_concurrency_level( + url, workers=20, duration=15 + ) + + error_rate = ((total - success) / total) * 100 if total > 0 else 0 + + # Error rate should be less than 30% even under high stress + assert error_rate <= 30.0, ( + f"Error rate too high under stress: {error_rate}%" + ) diff --git a/tests/requirements.txt b/tests/requirements.txt index 19142e14..5c13fb26 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -2,6 +2,7 @@ httpx==0.27.0 requests==2.31.0 +urllib3==2.0.7 pytest==8.3.2 pytest-timeout==2.3.1