diff --git a/ldclient/impl/datasourcev2/streaming.py b/ldclient/impl/datasourcev2/streaming.py index 5edd0450..e8637174 100644 --- a/ldclient/impl/datasourcev2/streaming.py +++ b/ldclient/impl/datasourcev2/streaming.py @@ -18,7 +18,13 @@ from ld_eventsource.errors import HTTPStatusError from ldclient.config import Config -from ldclient.impl.datasystem import SelectorStore, Synchronizer, Update +from ldclient.impl.datasystem import ( + DiagnosticAccumulator, + DiagnosticSource, + SelectorStore, + Synchronizer, + Update +) from ldclient.impl.datasystem.protocolv2 import ( ChangeSetBuilder, DeleteObject, @@ -98,7 +104,7 @@ def query_params() -> dict[str, str]: ) -class StreamingDataSource(Synchronizer): +class StreamingDataSource(Synchronizer, DiagnosticSource): """ StreamingSynchronizer is a specific type of Synchronizer that handles streaming data sources. @@ -112,6 +118,11 @@ def __init__(self, config: Config): self._config = config self._sse: Optional[SSEClient] = None self._running = False + self._diagnostic_accumulator: Optional[DiagnosticAccumulator] = None + self._connection_attempt_start_time: Optional[float] = None + + def set_diagnostic_accumulator(self, diagnostic_accumulator: DiagnosticAccumulator): + self._diagnostic_accumulator = diagnostic_accumulator @property def name(self) -> str: @@ -133,6 +144,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: change_set_builder = ChangeSetBuilder() self._running = True + self._connection_attempt_start_time = time() for action in self._sse.all: if isinstance(action, Fault): @@ -153,6 +165,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: if isinstance(action, Start) and action.headers is not None: fallback = action.headers.get('X-LD-FD-Fallback') == 'true' if fallback: + self._record_stream_init(True) yield Update( state=DataSourceState.OFF, revert_to_fdv1=True @@ -165,6 +178,8 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: try: update = self._process_message(action, change_set_builder) if update is not None: + self._record_stream_init(False) + self._connection_attempt_start_time = None yield update except json.decoder.JSONDecodeError as e: log.info( @@ -192,10 +207,6 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: environment_id=None, # TODO(sdk-1410) ) - # TODO(sdk-1408) - # if update is not None: - # self._record_stream_init(False) - self._sse.close() def stop(self): @@ -207,6 +218,12 @@ def stop(self): if self._sse: self._sse.close() + def _record_stream_init(self, failed: bool): + if self._diagnostic_accumulator and self._connection_attempt_start_time: + current_time = int(time() * 1000) + elapsed = current_time - int(self._connection_attempt_start_time * 1000) + self._diagnostic_accumulator.record_stream_init(current_time, elapsed if elapsed >= 0 else 0, failed) + # pylint: disable=too-many-return-statements def _process_message( self, msg: Event, change_set_builder: ChangeSetBuilder @@ -301,6 +318,9 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]: if isinstance(error, json.decoder.JSONDecodeError): log.error("Unexpected error on stream connection: %s, will retry", error) + self._record_stream_init(True) + self._connection_attempt_start_time = time() + \ + self._sse.next_retry_delay # type: ignore update = Update( state=DataSourceState.INTERRUPTED, @@ -313,6 +333,10 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]: return (update, True) if isinstance(error, HTTPStatusError): + self._record_stream_init(True) + self._connection_attempt_start_time = time() + \ + self._sse.next_retry_delay # type: ignore + error_info = DataSourceErrorInfo( DataSourceErrorKind.ERROR_RESPONSE, error.status, @@ -344,6 +368,7 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]: ) if not is_recoverable: + self._connection_attempt_start_time = None log.error(http_error_message_result) self.stop() return (update, False) @@ -352,6 +377,8 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]: return (update, True) log.warning("Unexpected error on stream connection: %s, will retry", error) + self._record_stream_init(True) + self._connection_attempt_start_time = time() + self._sse.next_retry_delay # type: ignore update = Update( state=DataSourceState.INTERRUPTED, diff --git a/ldclient/impl/datasystem/__init__.py b/ldclient/impl/datasystem/__init__.py index ec1fb9e0..1d299944 100644 --- a/ldclient/impl/datasystem/__init__.py +++ b/ldclient/impl/datasystem/__init__.py @@ -7,7 +7,7 @@ from dataclasses import dataclass from enum import Enum from threading import Event -from typing import Callable, Generator, Optional, Protocol +from typing import Generator, Optional, Protocol, runtime_checkable from ldclient.impl.datasystem.protocolv2 import Basis, ChangeSet, Selector from ldclient.impl.util import _Result @@ -151,6 +151,27 @@ def store(self) -> ReadOnlyStore: raise NotImplementedError +class DiagnosticAccumulator(Protocol): + def record_stream_init(self, timestamp, duration, failed): + raise NotImplementedError + + def record_events_in_batch(self, events_in_batch): + raise NotImplementedError + + def create_event_and_reset(self, dropped_events, deduplicated_users): + raise NotImplementedError + + +@runtime_checkable +class DiagnosticSource(Protocol): + @abstractmethod + def set_diagnostic_accumulator(self, diagnostic_accumulator: DiagnosticAccumulator): + """ + Set the diagnostic_accumulator to be used for reporting diagnostic events. + """ + raise NotImplementedError + + class SelectorStore(Protocol): """ SelectorStore represents a component capable of providing Selectors diff --git a/ldclient/impl/datasystem/fdv1.py b/ldclient/impl/datasystem/fdv1.py index 3e57ad34..023c1fc4 100644 --- a/ldclient/impl/datasystem/fdv1.py +++ b/ldclient/impl/datasystem/fdv1.py @@ -13,7 +13,7 @@ DataStoreStatusProviderImpl, DataStoreUpdateSinkImpl ) -from ldclient.impl.datasystem import DataAvailability +from ldclient.impl.datasystem import DataAvailability, DiagnosticAccumulator from ldclient.impl.flag_tracker import FlagTrackerImpl from ldclient.impl.listeners import Listeners from ldclient.impl.stubs import NullUpdateProcessor @@ -78,7 +78,7 @@ def __init__(self, config: Config): self._update_processor: Optional[UpdateProcessor] = None # Diagnostic accumulator provided by client for streaming metrics - self._diagnostic_accumulator = None + self._diagnostic_accumulator: Optional[DiagnosticAccumulator] = None # Track current data availability self._data_availability: DataAvailability = ( @@ -122,7 +122,7 @@ def set_flag_value_eval_fn(self, eval_fn): """ self._flag_tracker_impl = FlagTrackerImpl(self._flag_change_listeners, eval_fn) - def set_diagnostic_accumulator(self, diagnostic_accumulator): + def set_diagnostic_accumulator(self, diagnostic_accumulator: DiagnosticAccumulator): """ Sets the diagnostic accumulator for streaming initialization metrics. This should be called before start() to ensure metrics are collected. diff --git a/ldclient/impl/datasystem/fdv2.py b/ldclient/impl/datasystem/fdv2.py index 580aafb2..41df248b 100644 --- a/ldclient/impl/datasystem/fdv2.py +++ b/ldclient/impl/datasystem/fdv2.py @@ -9,7 +9,12 @@ DataSourceStatusProviderImpl, DataStoreStatusProviderImpl ) -from ldclient.impl.datasystem import DataAvailability, Synchronizer +from ldclient.impl.datasystem import ( + DataAvailability, + DiagnosticAccumulator, + DiagnosticSource, + Synchronizer +) from ldclient.impl.datasystem.store import Store from ldclient.impl.flag_tracker import FlagTrackerImpl from ldclient.impl.listeners import Listeners @@ -173,9 +178,7 @@ def __init__( self._disabled = self._config.offline # Diagnostic accumulator provided by client for streaming metrics - # TODO(fdv2): Either we need to use this, or we need to provide it to - # the streaming synchronizers - self._diagnostic_accumulator = None + self._diagnostic_accumulator: Optional[DiagnosticAccumulator] = None # Set up event listeners self._flag_change_listeners = Listeners() @@ -261,7 +264,7 @@ def stop(self): # Close the store self._store.close() - def set_diagnostic_accumulator(self, diagnostic_accumulator): + def set_diagnostic_accumulator(self, diagnostic_accumulator: DiagnosticAccumulator): """ Sets the diagnostic accumulator for streaming initialization metrics. This should be called before start() to ensure metrics are collected. @@ -334,6 +337,8 @@ def synchronizer_loop(self: 'FDv2'): try: self._lock.lock() primary_sync = self._primary_synchronizer_builder(self._config) + if isinstance(primary_sync, DiagnosticSource) and self._diagnostic_accumulator is not None: + primary_sync.set_diagnostic_accumulator(self._diagnostic_accumulator) self._active_synchronizer = primary_sync self._lock.unlock() @@ -367,6 +372,8 @@ def synchronizer_loop(self: 'FDv2'): self._lock.lock() secondary_sync = self._secondary_synchronizer_builder(self._config) + if isinstance(secondary_sync, DiagnosticSource) and self._diagnostic_accumulator is not None: + secondary_sync.set_diagnostic_accumulator(self._diagnostic_accumulator) log.info("Secondary synchronizer %s is starting", secondary_sync.name) self._active_synchronizer = secondary_sync self._lock.unlock() @@ -386,7 +393,6 @@ def synchronizer_loop(self: 'FDv2'): DataSourceState.OFF, self._data_source_status_provider.status.error ) - # TODO: WE might need to also set that threading.Event here break log.info("Recovery condition met, returning to primary synchronizer") @@ -398,8 +404,7 @@ def synchronizer_loop(self: 'FDv2'): log.error("Error in synchronizer loop: %s", e) finally: # Ensure we always set the ready event when exiting - if not set_on_ready.is_set(): - set_on_ready.set() + set_on_ready.set() self._lock.lock() if self._active_synchronizer is not None: self._active_synchronizer.stop() diff --git a/ldclient/impl/datasystem/protocolv2.py b/ldclient/impl/datasystem/protocolv2.py index 7feb8a81..e61f019e 100644 --- a/ldclient/impl/datasystem/protocolv2.py +++ b/ldclient/impl/datasystem/protocolv2.py @@ -6,10 +6,13 @@ from abc import abstractmethod from dataclasses import dataclass from enum import Enum -from typing import Any, List, Optional, Protocol +from typing import TYPE_CHECKING, Generator, List, Optional, Protocol from ldclient.impl.util import Result +if TYPE_CHECKING: + from ldclient.impl.datasystem import SelectorStore, Update + class EventName(str, Enum): """ @@ -502,7 +505,13 @@ def name(self) -> str: """Returns the name of the initializer.""" raise NotImplementedError - # TODO(fdv2): Need sync method + def sync(self, ss: "SelectorStore") -> "Generator[Update, None, None]": + """ + sync should begin the synchronization process for the data source, yielding + Update objects until the connection is closed or an unrecoverable error + occurs. + """ + raise NotImplementedError def close(self): """ diff --git a/ldclient/testing/impl/datasourcev2/test_streaming_synchronizer.py b/ldclient/testing/impl/datasourcev2/test_streaming_synchronizer.py index f749bff8..90c7037e 100644 --- a/ldclient/testing/impl/datasourcev2/test_streaming_synchronizer.py +++ b/ldclient/testing/impl/datasourcev2/test_streaming_synchronizer.py @@ -53,6 +53,10 @@ def __init__( def all(self) -> Iterable[Action]: return self._events + @property + def next_retry_delay(self): + return 1 + def interrupt(self): pass