Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions src/sentry/testutils/pytest/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from confluent_kafka import Consumer, Producer
from confluent_kafka.admin import AdminClient

from sentry.testutils.pytest import xdist

_log = logging.getLogger(__name__)

MAX_SECONDS_WAITING_FOR_EVENT = 16
Expand Down Expand Up @@ -71,10 +73,8 @@ def scope_consumers():

"""
all_consumers: MutableMapping[str, Consumer | None] = {
# Relay is configured to use this topic for all ingest messages. See
# `templates/config.yml`.
"ingest-events": None,
"outcomes": None,
xdist.get_kafka_topic("ingest-events"): None,
xdist.get_kafka_topic("outcomes"): None,
}

yield all_consumers
Expand Down Expand Up @@ -106,10 +106,8 @@ def ingest_consumer(settings):
from sentry.consumers import get_stream_processor
from sentry.utils.batching_kafka_consumer import create_topics

# Relay is configured to use this topic for all ingest messages. See
# `template/config.yml`.
cluster_name = "default"
topic_event_name = "ingest-events"
topic_event_name = xdist.get_kafka_topic("ingest-events")

if scope_consumers[topic_event_name] is not None:
# reuse whatever was already created (will ignore the settings)
Expand All @@ -120,8 +118,7 @@ def ingest_consumer(settings):
admin.delete_topic(topic_event_name)
create_topics(cluster_name, [topic_event_name])

# simulate the event ingestion task
group_id = "test-consumer"
group_id = xdist.get_kafka_topic("test-consumer")

consumer = get_stream_processor(
"ingest-attachments",
Expand Down
11 changes: 8 additions & 3 deletions src/sentry/testutils/pytest/relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import requests

from sentry.runner.commands.devservices import get_docker_client
from sentry.testutils.pytest.sentry import TEST_REDIS_DB
from sentry.testutils.pytest import xdist

_log = logging.getLogger(__name__)

Expand All @@ -23,6 +23,8 @@


def _relay_server_container_name() -> str:
if xdist._worker_id:
return f"sentry_test_relay_server_{xdist._worker_id}"
return "sentry_test_relay_server"


Expand Down Expand Up @@ -66,9 +68,10 @@ def relay_server_setup(live_server, tmpdir_factory):
template_path = _get_template_dir()
sources = ["config.yml", "credentials.json"]

relay_port = ephemeral_port_reserve.reserve(ip="127.0.0.1", port=33331)
worker_num = xdist._worker_num if xdist._worker_num is not None else 0
relay_port = ephemeral_port_reserve.reserve(ip="127.0.0.1", port=33331 + worker_num * 100)

redis_db = TEST_REDIS_DB
redis_db = xdist.get_redis_db()

from sentry.relay import projectconfig_cache
from sentry.relay.projectconfig_cache.redis import RedisProjectConfigCache
Expand All @@ -84,6 +87,8 @@ def relay_server_setup(live_server, tmpdir_factory):
"KAFKA_HOST": "kafka",
"REDIS_HOST": "redis",
"REDIS_DB": redis_db,
"KAFKA_TOPIC_EVENTS": xdist.get_kafka_topic("ingest-events"),
"KAFKA_TOPIC_OUTCOMES": xdist.get_kafka_topic("outcomes"),
}

for source in sources:
Expand Down
23 changes: 18 additions & 5 deletions src/sentry/testutils/pytest/sentry.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from sentry.runner.importer import install_plugin_apps
from sentry.silo.base import SiloMode
from sentry.testutils.pytest import xdist
from sentry.testutils.region import TestEnvRegionDirectory
from sentry.testutils.silo import monkey_patch_single_process_silo_mode_state
from sentry.types import region
Expand All @@ -32,8 +33,6 @@
os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, os.pardir, os.pardir, "tests")
)

TEST_REDIS_DB = 9


def _use_monolith_dbs() -> bool:
return os.environ.get("SENTRY_USE_MONOLITH_DBS", "0") == "1"
Expand Down Expand Up @@ -69,10 +68,21 @@ def _configure_test_env_regions() -> None:
# Assign a random name on every test run, as a reminder that test setup and
# assertions should not depend on this value. If you need to test behavior that
# depends on region attributes, use `override_regions` in your test case.
region_name = "testregion" + "".join(random.choices(string.digits, k=6))
# Under xdist, seed deterministically so all workers generate the same name
# (divergent names break xdist's requirement for identical test collection).
xdist_uid = os.environ.get("PYTEST_XDIST_TESTRUNUID")
r = random.Random(xdist_uid) if xdist_uid else random
region_name = "testregion" + "".join(r.choices(string.digits, k=6))

# Under xdist, each worker gets a unique snowflake_id (1, 2, 3, ...) so
# concurrent model creation doesn't produce colliding IDs.
region_snowflake_id = xdist._worker_num + 1 if xdist._worker_num is not None else 0

default_region = Cell(
region_name, 0, settings.SENTRY_OPTIONS["system.url-prefix"], RegionCategory.MULTI_TENANT
region_name,
region_snowflake_id,
settings.SENTRY_OPTIONS["system.url-prefix"],
RegionCategory.MULTI_TENANT,
)

settings.SENTRY_REGION = region_name
Expand Down Expand Up @@ -198,14 +208,17 @@ def pytest_configure(config: pytest.Config) -> None:
settings.SENTRY_RATELIMITER = "sentry.ratelimits.redis.RedisRateLimiter"
settings.SENTRY_RATELIMITER_OPTIONS = {}

if snuba_url := xdist.get_snuba_url():
settings.SENTRY_SNUBA = snuba_url

settings.SENTRY_ISSUE_PLATFORM_FUTURES_MAX_LIMIT = 1

if not hasattr(settings, "SENTRY_OPTIONS"):
settings.SENTRY_OPTIONS = {}

settings.SENTRY_OPTIONS.update(
{
"redis.clusters": {"default": {"hosts": {0: {"db": TEST_REDIS_DB}}}},
"redis.clusters": {"default": {"hosts": {0: {"db": xdist.get_redis_db()}}}},
"mail.backend": "django.core.mail.backends.locmem.EmailBackend",
"system.url-prefix": "http://testserver",
"system.base-hostname": "testserver",
Expand Down
8 changes: 4 additions & 4 deletions src/sentry/testutils/pytest/template/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ processing:
kafka_config:
- {name: 'bootstrap.servers', value: '${KAFKA_HOST}:9093'}
topics:
events: ingest-events
attachments: ingest-events
transactions: ingest-events
outcomes: outcomes
events: ${KAFKA_TOPIC_EVENTS}
attachments: ${KAFKA_TOPIC_EVENTS}
transactions: ${KAFKA_TOPIC_EVENTS}
outcomes: ${KAFKA_TOPIC_OUTCOMES}
redis: redis://${REDIS_HOST}:6379/${REDIS_DB}
aggregator:
bucket_interval: 1 # Use shortest possible interval to speed up tests
Expand Down
35 changes: 35 additions & 0 deletions src/sentry/testutils/pytest/xdist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from __future__ import annotations

import os

_TEST_REDIS_DB = 9
_SNUBA_BASE_PORT = 1230
# Redis defaults to 16 DBs (0-15). With base DB 9, max 7 workers (gw0-gw6).
_MAX_WORKERS = 7

_worker_id: str | None = os.environ.get("PYTEST_XDIST_WORKER")
_worker_num: int | None = int(_worker_id.replace("gw", "")) if _worker_id else None

if _worker_num is not None and _worker_num >= _MAX_WORKERS:
raise RuntimeError(
f"xdist worker {_worker_id} exceeds max supported workers ({_MAX_WORKERS}). "
f"Redis only has DBs 0-15 and base DB is {_TEST_REDIS_DB}."
)


def get_redis_db() -> int:
if _worker_num is not None:
return _TEST_REDIS_DB + _worker_num
return _TEST_REDIS_DB


def get_kafka_topic(base_name: str) -> str:
if _worker_id:
return f"{base_name}-{_worker_id}"
return base_name


def get_snuba_url() -> str | None:
if _worker_num is not None and os.environ.get("XDIST_PER_WORKER_SNUBA"):
return f"http://127.0.0.1:{_SNUBA_BASE_PORT + _worker_num}"
return None
7 changes: 5 additions & 2 deletions src/sentry/testutils/skips.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import pytest

from sentry.testutils.pytest import xdist


def _service_available(host: str, port: int) -> bool:
try:
Expand All @@ -21,8 +23,9 @@ def _requires_service_message(name: str) -> str:

@pytest.fixture(scope="session")
def _requires_snuba() -> None:
# TODO: ability to ask devservices what port a service is on
if not _service_available("127.0.0.1", 1218):
snuba_url = xdist.get_snuba_url()
port = int(snuba_url.rsplit(":", 1)[1]) if snuba_url else 1218
if not _service_available("127.0.0.1", port):
pytest.fail(_requires_service_message("snuba"))


Expand Down
7 changes: 7 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,15 @@

import psutil
import pytest
import pytest_rerunfailures
import responses
import sentry_sdk

# Disable crash recovery server in pytest-rerunfailures. Under xdist, Sentry's
# global socket.setdefaulttimeout(5) causes the server's per-worker recv threads
# to die during Django init (~10s), silently breaking crash recovery anyway.
# Normal --reruns (in-memory retry) is unaffected.
pytest_rerunfailures.HAS_PYTEST_HANDLECRASHITEM = False # type: ignore[attr-defined]
from django.core.cache import cache
from django.db import connections

Expand Down
Loading