diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index ea6d48ad..572ae266 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -78,6 +78,10 @@ A special case is the module `ldclient.impl`, and any modules within it. Everyth So, if there is a class whose existence is entirely an implementation detail, it should be in `impl`. Similarly, classes that are _not_ in `impl` must not expose any public members (i.e. symbols that do not have an underscore prefix) that are not meant to be part of the supported public API. This is important because of our guarantee of backward compatibility for all public APIs within a major version: we want to be able to change our implementation details to suit the needs of the code, without worrying about breaking a customer's code. Due to how the language works, we can't actually prevent an application developer from referencing those classes in their code, but this convention makes it clear that such use is discouraged and unsupported. +### Sync/async parity + +The SDK maintains parallel sync (`foo.py`) and async (`async_foo.py`) implementations by hand. When you change a method in a sync module, make the matching change in its `async_` sibling (and vice versa), and justify any difference beyond `async`/`await` keywords. Shared I/O-free ("sans-I/O") logic lives in modules with a `_core` suffix that are imported by both siblings: `impl/client_core.py`, `impl/datasystem/fdv2_core.py`, and `impl/events/event_processor_core.py`. Per-side logic that genuinely differs (more than `async`/`await`) lives directly in each sibling, not in a separate core file — for example `impl/evaluator.py`/`impl/async_evaluator.py` and the in-memory feature stores `feature_store.py`/`async_feature_store.py`. Both the sync and async contract test suites must pass. + ### Type hints Python does not require the use of type hints, but they can be extremely helpful for spotting mistakes and for improving the IDE experience, so we should always use them in the SDK. Every method in the public API is expected to have type hints for all non-`self` parameters, and for its return value if any. diff --git a/ldclient/client.py b/ldclient/client.py index 829575d4..89647583 100644 --- a/ldclient/client.py +++ b/ldclient/client.py @@ -2,8 +2,6 @@ This submodule contains the client class that provides most of the SDK functionality. """ -import hashlib -import hmac import threading import traceback from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple @@ -19,6 +17,11 @@ _EvaluationWithHookResult ) from ldclient.impl.big_segments import BigSegmentStoreManager +from ldclient.impl.client_core import ( + get_environment_metadata, + get_plugin_hooks +) +from ldclient.impl.client_core import secure_mode_hash as _secure_mode_hash from ldclient.impl.datasource.feature_requester import FeatureRequesterImpl from ldclient.impl.datasource.polling import PollingUpdateProcessor from ldclient.impl.datasource.status import ( @@ -57,12 +60,7 @@ ReadOnlyStore ) from ldclient.migrations import OpTracker, Stage -from ldclient.plugin import ( - ApplicationMetadata, - EnvironmentMetadata, - SdkMetadata -) -from ldclient.version import VERSION +from ldclient.plugin import EnvironmentMetadata from ldclient.versioned_data_kind import FEATURES, SEGMENTS, VersionedDataKind from .impl import AnyNum @@ -239,8 +237,8 @@ def postfork(self, start_wait: float = 5): self.__start_up(start_wait) def __start_up(self, start_wait: float): - environment_metadata = self.__get_environment_metadata() - plugin_hooks = self.__get_plugin_hooks(environment_metadata) + environment_metadata = get_environment_metadata(self._config) + plugin_hooks = get_plugin_hooks(self._config, environment_metadata) self.__hooks_lock = ReadWriteLock() self.__hooks = self._config.hooks + plugin_hooks # type: List[Hook] @@ -305,36 +303,6 @@ def __start_up(self, start_wait: float): else: log.warning("Initialization timeout exceeded for LaunchDarkly Client or an error occurred. " "Feature Flags may not yet be available.") - def __get_environment_metadata(self) -> EnvironmentMetadata: - sdk_metadata = SdkMetadata( - name="python-server-sdk", - version=VERSION, - wrapper_name=self._config.wrapper_name, - wrapper_version=self._config.wrapper_version - ) - - application_metadata = None - if self._config.application: - application_metadata = ApplicationMetadata( - id=self._config.application.get('id'), - version=self._config.application.get('version'), - ) - - return EnvironmentMetadata( - sdk=sdk_metadata, - application=application_metadata, - sdk_key=self._config.sdk_key - ) - - def __get_plugin_hooks(self, environment_metadata: EnvironmentMetadata) -> List[Hook]: - hooks = [] - for plugin in self._config.plugins: - try: - hooks.extend(plugin.get_hooks(environment_metadata)) - except Exception as e: - log.error("Error getting hooks from plugin %s: %s", plugin.metadata.name, e) - return hooks - def __register_plugins(self, environment_metadata: EnvironmentMetadata): for plugin in self._config.plugins: try: @@ -693,10 +661,7 @@ def secure_mode_hash(self, context: Context) -> str: :param context: the evaluation context :return: the hash string """ - if not context.valid: - log.warning("Context was invalid for secure_mode_hash (%s); returning empty hash" % context.error) - return "" - return hmac.new(str(self._config.sdk_key).encode(), context.fully_qualified_key.encode(), hashlib.sha256).hexdigest() + return _secure_mode_hash(self._config, context) def add_hook(self, hook: Hook): """ diff --git a/ldclient/impl/client_core.py b/ldclient/impl/client_core.py new file mode 100644 index 00000000..02531a04 --- /dev/null +++ b/ldclient/impl/client_core.py @@ -0,0 +1,66 @@ +""" +Genuinely I/O-free, await-free helpers shared by the sync :class:`ldclient.client.LDClient` +and async :class:`ldclient.async_client.AsyncLDClient`. + +These functions contain no awaits and touch no store/network/event I/O, so they +can live in a single module imported by both clients. Anything that reads the +store, sends events, or runs hook stages is I/O-adjacent and is hand-duplicated +across the two client classes instead (differing only in ``async``/``await``). +""" + +import hashlib +import hmac +from typing import List + +from ldclient.config import Config +from ldclient.context import Context +from ldclient.hook import Hook +from ldclient.impl.util import log +from ldclient.plugin import ( + ApplicationMetadata, + EnvironmentMetadata, + SdkMetadata +) +from ldclient.version import VERSION + + +def get_environment_metadata(config: Config) -> EnvironmentMetadata: + sdk_metadata = SdkMetadata( + name="python-server-sdk", + version=VERSION, + wrapper_name=config.wrapper_name, + wrapper_version=config.wrapper_version + ) + + application_metadata = None + if config.application: + application_metadata = ApplicationMetadata( + id=config.application.get('id'), + version=config.application.get('version'), + ) + + return EnvironmentMetadata( + sdk=sdk_metadata, + application=application_metadata, + sdk_key=config.sdk_key + ) + + +def get_plugin_hooks(config: Config, environment_metadata: EnvironmentMetadata) -> List[Hook]: + hooks = [] + for plugin in config.plugins: + try: + hooks.extend(plugin.get_hooks(environment_metadata)) + except Exception as e: + log.error("Error getting hooks from plugin %s: %s", plugin.metadata.name, e) + return hooks + + +def secure_mode_hash(config: Config, context: Context) -> str: + """Computes the secure-mode HMAC for a context, or an empty string for an + invalid context. Pure: depends only on the SDK key and the context's + fully-qualified key.""" + if not context.valid: + log.warning("Context was invalid for secure_mode_hash (%s); returning empty hash" % context.error) + return "" + return hmac.new(str(config.sdk_key).encode(), context.fully_qualified_key.encode(), hashlib.sha256).hexdigest() diff --git a/ldclient/impl/datasystem/fdv2.py b/ldclient/impl/datasystem/fdv2.py index d4dbd18c..c315d40d 100644 --- a/ldclient/impl/datasystem/fdv2.py +++ b/ldclient/impl/datasystem/fdv2.py @@ -1,18 +1,21 @@ import time -from copy import copy -from enum import Enum from queue import Queue from threading import Event, Thread -from typing import Any, Callable, Dict, List, Mapping, Optional +from typing import Any, Callable, Dict, List, Optional from ldclient.config import Config, DataSourceBuilder, DataSystemConfig -from ldclient.feature_store import _FeatureStoreDataSetSorter from ldclient.impl.datasystem import ( DataAvailability, DataSystem, DiagnosticAccumulator, DiagnosticSource ) +from ldclient.impl.datasystem.fdv2_core import ( + ConditionDirective, + DataSourceStatusProviderImpl, + DataStoreStatusProviderImpl, + FeatureStoreClientWrapper +) from ldclient.impl.datasystem.store import Store from ldclient.impl.flag_tracker import FlagTrackerImpl from ldclient.impl.listeners import Listeners @@ -28,7 +31,6 @@ DataStoreMode, DataStoreStatus, DataStoreStatusProvider, - FeatureStore, FlagTracker, ReadOnlyStore, Synchronizer @@ -36,262 +38,6 @@ from ldclient.versioned_data_kind import VersionedDataKind -class DataSourceStatusProviderImpl(DataSourceStatusProvider): - def __init__(self, listeners: Listeners): - self.__listeners = listeners - self.__status = DataSourceStatus(DataSourceState.INITIALIZING, time.time(), None) - self.__lock = ReadWriteLock() - - @property - def status(self) -> DataSourceStatus: - with self.__lock.read(): - return self.__status - - def update_status(self, new_state: DataSourceState, new_error: Optional[DataSourceErrorInfo]): - status_to_broadcast = None - - with self.__lock.write(): - old_status = self.__status - - if new_state == DataSourceState.INTERRUPTED and old_status.state == DataSourceState.INITIALIZING: - new_state = DataSourceState.INITIALIZING - - if new_state == old_status.state and new_error is None: - return - - new_since = self.__status.since if new_state == self.__status.state else time.time() - new_error = self.__status.error if new_error is None else new_error - - self.__status = DataSourceStatus(new_state, new_since, new_error) - - status_to_broadcast = self.__status - - if status_to_broadcast is not None: - self.__listeners.notify(status_to_broadcast) - - def add_listener(self, listener: Callable[[DataSourceStatus], None]): - self.__listeners.add(listener) - - def remove_listener(self, listener: Callable[[DataSourceStatus], None]): - self.__listeners.remove(listener) - - -class DataStoreStatusProviderImpl(DataStoreStatusProvider): - def __init__(self, store: Optional[FeatureStore], listeners: Listeners): - self.__store = store - self.__listeners = listeners - - self.__lock = ReadWriteLock() - self.__status = DataStoreStatus(True, False) - - def update_status(self, status: DataStoreStatus): - """ - update_status is called from the data store to push a status update. - """ - modified = False - - with self.__lock.write(): - if self.__status != status: - self.__status = status - modified = True - - if modified: - self.__listeners.notify(status) - - @property - def status(self) -> DataStoreStatus: - with self.__lock.read(): - return copy(self.__status) - - def is_monitoring_enabled(self) -> bool: - if self.__store is None: - return False - if hasattr(self.__store, "is_monitoring_enabled") is False: - return False - - return self.__store.is_monitoring_enabled() # type: ignore - - def add_listener(self, listener: Callable[[DataStoreStatus], None]): - self.__listeners.add(listener) - - def remove_listener(self, listener: Callable[[DataStoreStatus], None]): - self.__listeners.remove(listener) - - -class FeatureStoreClientWrapper(FeatureStore): - """Provides additional behavior that the client requires before or after feature store operations. - Currently this just means sorting the data set for init() and dealing with data store status listeners. - """ - - def __init__(self, store: FeatureStore, store_update_sink: DataStoreStatusProviderImpl): - self.store = store - self.__store_update_sink = store_update_sink - self.__monitoring_enabled = self.is_monitoring_enabled() - - # Covers the following variables - self.__lock = ReadWriteLock() - self.__last_available = True - self.__poller: Optional[RepeatingTask] = None - self.__closed = False - - def init(self, all_data: Mapping[VersionedDataKind, Mapping[str, Dict[Any, Any]]]): - return self.__wrapper(lambda: self.store.init(_FeatureStoreDataSetSorter.sort_all_collections(all_data))) - - def get(self, kind, key, callback): - return self.__wrapper(lambda: self.store.get(kind, key, callback)) - - def all(self, kind, callback): - return self.__wrapper(lambda: self.store.all(kind, callback)) - - def delete(self, kind, key, version): - return self.__wrapper(lambda: self.store.delete(kind, key, version)) - - def upsert(self, kind, item): - return self.__wrapper(lambda: self.store.upsert(kind, item)) - - @property - def initialized(self) -> bool: - return self.store.initialized - - def disable_cache(self) -> None: - def _do_disable(): - try: - inner = self.store - if hasattr(inner, "disable_cache"): - inner.disable_cache() # type: ignore[attr-defined] - except Exception as e: - log.warning("disable_cache failed on inner store: %s", e) - - self.__wrapper(_do_disable) - - def __wrapper(self, fn: Callable): - try: - return fn() - except BaseException: - if self.__monitoring_enabled: - self.__update_availability(False) - raise - - def __update_availability(self, available: bool): - state_changed = False - poller_to_stop = None - task_to_start = None - - with self.__lock.write(): - if self.__closed: - return - if available == self.__last_available: - return - - state_changed = True - self.__last_available = available - - if available: - poller_to_stop = self.__poller - self.__poller = None - elif self.__poller is None: - task_to_start = RepeatingTask("ldclient.check-availability", 0.5, 0, self.__check_availability) - self.__poller = task_to_start - - if available: - log.warning("Persistent store is available again") - else: - log.warning("Detected persistent store unavailability; updates will be cached until it recovers") - - status = DataStoreStatus(available, True) - self.__store_update_sink.update_status(status) - - if poller_to_stop is not None: - poller_to_stop.stop() - - if task_to_start is not None: - task_to_start.start() - - def __check_availability(self): - try: - if self.store.is_available(): - self.__update_availability(True) - except BaseException as e: - log.error("Unexpected error from data store status function: %s", e) - - def is_monitoring_enabled(self) -> bool: - """ - This methods determines whether the wrapped store can support enabling monitoring. - - The wrapped store must provide a monitoring_enabled method, which must - be true. But this alone is not sufficient. - - Because this class wraps all interactions with a provided store, it can - technically "monitor" any store. However, monitoring also requires that - we notify listeners when the store is available again. - - We determine this by checking the store's `available?` method, so this - is also a requirement for monitoring support. - - These extra checks won't be necessary once `available` becomes a part - of the core interface requirements and this class no longer wraps every - feature store. - """ - - if not hasattr(self.store, 'is_monitoring_enabled'): - return False - - if not hasattr(self.store, 'is_available'): - return False - - monitoring_enabled = getattr(self.store, 'is_monitoring_enabled') - if not callable(monitoring_enabled): - return False - - return monitoring_enabled() - - def close(self): - """ - Close the wrapper and stop the repeating task poller if it's running. - Also forwards the close call to the underlying store if it has a close method. - """ - poller_to_stop = None - - with self.__lock.write(): - if self.__closed: - return - self.__closed = True - poller_to_stop = self.__poller - self.__poller = None - - if poller_to_stop is not None: - poller_to_stop.stop() - - if hasattr(self.store, "close"): - self.store.close() - - -class ConditionDirective(str, Enum): - """ - ConditionDirective represents the possible directives that can be returned from a condition check. - """ - - REMOVE = "remove" - """ - REMOVE suggests that the current data source should be permanently removed from consideration. - """ - - FALLBACK = "fallback" - """ - FALLBACK suggests that this data source should be abandoned in favor of the next one. - """ - - RECOVER = "recover" - """ - RECOVER suggests that we should try to return to the primary data source. - """ - - FDV1 = "fdv1" - """ - FDV1 suggests that we should immediately fall back to the FDv1 Fallback Synchronizer. - """ - - class FDv2(DataSystem): """ FDv2 is an implementation of the DataSystem interface that uses the Flag Delivery V2 protocol @@ -788,3 +534,12 @@ def target_availability(self) -> DataAvailability: return DataAvailability.REFRESHED return DataAvailability.CACHED + + +__all__ = [ + 'ConditionDirective', + 'DataSourceStatusProviderImpl', + 'DataStoreStatusProviderImpl', + 'FDv2', + 'FeatureStoreClientWrapper', +] diff --git a/ldclient/impl/datasystem/fdv2_core.py b/ldclient/impl/datasystem/fdv2_core.py new file mode 100644 index 00000000..9db072c3 --- /dev/null +++ b/ldclient/impl/datasystem/fdv2_core.py @@ -0,0 +1,292 @@ +""" +Support classes shared by the sync and async FDv2 data system coordinators. + +These are synchronous (thread-based) components used identically by both +``FDv2`` and ``AsyncFDv2``: status providers, the persistent-store wrapper, +and the condition directive enum. +""" + +import time +from copy import copy +from enum import Enum +from typing import Any, Callable, Dict, Mapping, Optional + +from ldclient.feature_store import _FeatureStoreDataSetSorter +from ldclient.impl.listeners import Listeners +from ldclient.impl.repeating_task import RepeatingTask +from ldclient.impl.rwlock import ReadWriteLock +from ldclient.impl.util import log +from ldclient.interfaces import ( + DataSourceErrorInfo, + DataSourceState, + DataSourceStatus, + DataSourceStatusProvider, + DataStoreStatus, + DataStoreStatusProvider, + FeatureStore +) +from ldclient.versioned_data_kind import VersionedDataKind + + +class DataSourceStatusProviderImpl(DataSourceStatusProvider): + def __init__(self, listeners: Listeners): + self.__listeners = listeners + self.__status = DataSourceStatus(DataSourceState.INITIALIZING, time.time(), None) + self.__lock = ReadWriteLock() + + @property + def status(self) -> DataSourceStatus: + with self.__lock.read(): + return self.__status + + def update_status(self, new_state: DataSourceState, new_error: Optional[DataSourceErrorInfo]): + status_to_broadcast = None + + with self.__lock.write(): + old_status = self.__status + + if new_state == DataSourceState.INTERRUPTED and old_status.state == DataSourceState.INITIALIZING: + new_state = DataSourceState.INITIALIZING + + if new_state == old_status.state and new_error is None: + return + + new_since = self.__status.since if new_state == self.__status.state else time.time() + new_error = self.__status.error if new_error is None else new_error + + self.__status = DataSourceStatus(new_state, new_since, new_error) + + status_to_broadcast = self.__status + + if status_to_broadcast is not None: + self.__listeners.notify(status_to_broadcast) + + def add_listener(self, listener: Callable[[DataSourceStatus], None]): + self.__listeners.add(listener) + + def remove_listener(self, listener: Callable[[DataSourceStatus], None]): + self.__listeners.remove(listener) + + +class DataStoreStatusProviderImpl(DataStoreStatusProvider): + def __init__(self, store: Optional[FeatureStore], listeners: Listeners): + self.__store = store + self.__listeners = listeners + + self.__lock = ReadWriteLock() + self.__status = DataStoreStatus(True, False) + + def update_status(self, status: DataStoreStatus): + """ + update_status is called from the data store to push a status update. + """ + modified = False + + with self.__lock.write(): + if self.__status != status: + self.__status = status + modified = True + + if modified: + self.__listeners.notify(status) + + @property + def status(self) -> DataStoreStatus: + with self.__lock.read(): + return copy(self.__status) + + def is_monitoring_enabled(self) -> bool: + if self.__store is None: + return False + if hasattr(self.__store, "is_monitoring_enabled") is False: + return False + + return self.__store.is_monitoring_enabled() # type: ignore + + def add_listener(self, listener: Callable[[DataStoreStatus], None]): + self.__listeners.add(listener) + + def remove_listener(self, listener: Callable[[DataStoreStatus], None]): + self.__listeners.remove(listener) + + +class FeatureStoreClientWrapper(FeatureStore): + """Provides additional behavior that the client requires before or after feature store operations. + Currently this just means sorting the data set for init() and dealing with data store status listeners. + """ + + def __init__(self, store: FeatureStore, store_update_sink: DataStoreStatusProviderImpl): + self.store = store + self.__store_update_sink = store_update_sink + self.__monitoring_enabled = self.is_monitoring_enabled() + + # Covers the following variables + self.__lock = ReadWriteLock() + self.__last_available = True + self.__poller: Optional[RepeatingTask] = None + self.__closed = False + + def init(self, all_data: Mapping[VersionedDataKind, Mapping[str, Dict[Any, Any]]]): + return self.__wrapper(lambda: self.store.init(_FeatureStoreDataSetSorter.sort_all_collections(all_data))) + + def get(self, kind, key, callback): + return self.__wrapper(lambda: self.store.get(kind, key, callback)) + + def all(self, kind, callback): + return self.__wrapper(lambda: self.store.all(kind, callback)) + + def delete(self, kind, key, version): + return self.__wrapper(lambda: self.store.delete(kind, key, version)) + + def upsert(self, kind, item): + return self.__wrapper(lambda: self.store.upsert(kind, item)) + + @property + def initialized(self) -> bool: + return self.store.initialized + + def disable_cache(self) -> None: + def _do_disable(): + try: + inner = self.store + if hasattr(inner, "disable_cache"): + inner.disable_cache() # type: ignore[attr-defined] + except Exception as e: + log.warning("disable_cache failed on inner store: %s", e) + + self.__wrapper(_do_disable) + + def __wrapper(self, fn: Callable): + try: + return fn() + except BaseException: + if self.__monitoring_enabled: + self.__update_availability(False) + raise + + def __update_availability(self, available: bool): + state_changed = False + poller_to_stop = None + task_to_start = None + + with self.__lock.write(): + if self.__closed: + return + if available == self.__last_available: + return + + state_changed = True + self.__last_available = available + + if available: + poller_to_stop = self.__poller + self.__poller = None + elif self.__poller is None: + task_to_start = RepeatingTask("ldclient.check-availability", 0.5, 0, self.__check_availability) + self.__poller = task_to_start + + if available: + log.warning("Persistent store is available again") + else: + log.warning("Detected persistent store unavailability; updates will be cached until it recovers") + + status = DataStoreStatus(available, True) + self.__store_update_sink.update_status(status) + + if poller_to_stop is not None: + poller_to_stop.stop() + + if task_to_start is not None: + task_to_start.start() + + def __check_availability(self): + try: + if self.store.is_available(): + self.__update_availability(True) + except BaseException as e: + log.error("Unexpected error from data store status function: %s", e) + + def is_monitoring_enabled(self) -> bool: + """ + This methods determines whether the wrapped store can support enabling monitoring. + + The wrapped store must provide a monitoring_enabled method, which must + be true. But this alone is not sufficient. + + Because this class wraps all interactions with a provided store, it can + technically "monitor" any store. However, monitoring also requires that + we notify listeners when the store is available again. + + We determine this by checking the store's `available?` method, so this + is also a requirement for monitoring support. + + These extra checks won't be necessary once `available` becomes a part + of the core interface requirements and this class no longer wraps every + feature store. + """ + + if not hasattr(self.store, 'is_monitoring_enabled'): + return False + + if not hasattr(self.store, 'is_available'): + return False + + monitoring_enabled = getattr(self.store, 'is_monitoring_enabled') + if not callable(monitoring_enabled): + return False + + return monitoring_enabled() + + def close(self): + """ + Close the wrapper and stop the repeating task poller if it's running. + Also forwards the close call to the underlying store if it has a close method. + """ + poller_to_stop = None + + with self.__lock.write(): + if self.__closed: + return + self.__closed = True + poller_to_stop = self.__poller + self.__poller = None + + if poller_to_stop is not None: + poller_to_stop.stop() + + if hasattr(self.store, "close"): + self.store.close() + + +class ConditionDirective(str, Enum): + """ + ConditionDirective represents the possible directives that can be returned from a condition check. + """ + + REMOVE = "remove" + """ + REMOVE suggests that the current data source should be permanently removed from consideration. + """ + + FALLBACK = "fallback" + """ + FALLBACK suggests that this data source should be abandoned in favor of the next one. + """ + + RECOVER = "recover" + """ + RECOVER suggests that we should try to return to the primary data source. + """ + + FDV1 = "fdv1" + """ + FDV1 suggests that we should immediately fall back to the FDv1 Fallback Synchronizer. + """ + + +__all__ = [ + 'ConditionDirective', + 'DataSourceStatusProviderImpl', + 'DataStoreStatusProviderImpl', + 'FeatureStoreClientWrapper', +] diff --git a/ldclient/impl/events/event_processor.py b/ldclient/impl/events/event_processor.py index df4064bf..1f65f2a3 100644 --- a/ldclient/impl/events/event_processor.py +++ b/ldclient/impl/events/event_processor.py @@ -12,15 +12,19 @@ from email.utils import parsedate from random import Random from threading import Event, Lock, Thread -from typing import Any, Callable, Dict, List, Optional +from typing import Any, Callable, Optional import urllib3 from ldclient.config import Config from ldclient.context import Context from ldclient.impl.events.diagnostics import create_diagnostic_init -from ldclient.impl.events.event_context_formatter import EventContextFormatter -from ldclient.impl.events.event_summarizer import EventSummarizer, EventSummary +from ldclient.impl.events.event_processor_core import ( + DebugEvent, + EventBuffer, + EventOutputFormatter, + IndexEvent +) from ldclient.impl.events.types import ( EventInput, EventInputCustom, @@ -37,8 +41,7 @@ check_if_error_is_recoverable_and_log, current_time_millis, is_http_error_recoverable, - log, - timedelta_millis + log ) from ldclient.interfaces import EventProcessor from ldclient.migrations.tracker import MigrationOpEvent @@ -50,153 +53,6 @@ EventProcessorMessage = namedtuple('EventProcessorMessage', ['type', 'param']) -class DebugEvent: - __slots__ = ['original_input'] - - def __init__(self, original_input: EventInputEvaluation): - self.original_input = original_input - - -class IndexEvent: - __slots__ = ['timestamp', 'context'] - - def __init__(self, timestamp: int, context: Context): - self.timestamp = timestamp - self.context = context - - -class EventOutputFormatter: - def __init__(self, config: Config): - self._context_formatter = EventContextFormatter(config.all_attributes_private, config.private_attributes) - - def make_output_events(self, events: List[Any], summary: EventSummary): - events_out = [self.make_output_event(e) for e in events] - if not summary.is_empty(): - events_out.append(self.make_summary_event(summary)) - return events_out - - def make_output_event(self, e: Any): - if isinstance(e, EventInputEvaluation): - out = self._base_eval_props(e, 'feature') - out['context'] = self._process_context(e.context, True) - return out - elif isinstance(e, DebugEvent): - out = self._base_eval_props(e.original_input, 'debug') - out['context'] = self._process_context(e.original_input.context, False) - return out - elif isinstance(e, EventInputIdentify): - return {'kind': 'identify', 'creationDate': e.timestamp, 'context': self._process_context(e.context, False)} - elif isinstance(e, IndexEvent): - return {'kind': 'index', 'creationDate': e.timestamp, 'context': self._process_context(e.context, False)} - elif isinstance(e, EventInputCustom): - out = { - 'kind': 'custom', - 'creationDate': e.timestamp, - 'key': e.key, - 'context': self._process_context(e.context, True) - } - - if e.data is not None: - out['data'] = e.data - if e.metric_value is not None: - out['metricValue'] = e.metric_value - return out - elif isinstance(e, MigrationOpEvent): - out = { - 'kind': 'migration_op', - 'creationDate': e.timestamp, - 'operation': e.operation.value, - 'context': self._process_context(e.context, True), - 'evaluation': {'key': e.key, 'value': e.detail.value}, - } - - if e.flag is not None: - out["evaluation"]["version"] = e.flag.version - if e.default_stage: - out["evaluation"]["default"] = e.default_stage.value - if e.detail.variation_index is not None: - out["evaluation"]["variation"] = e.detail.variation_index - if e.detail.reason is not None: - out["evaluation"]["reason"] = e.detail.reason - - if e.sampling_ratio is not None and e.sampling_ratio != 1: - out["samplingRatio"] = e.sampling_ratio - - measurements: List[Dict] = [] - - if len(e.invoked) > 0: - measurements.append({"key": "invoked", "values": {origin.value: True for origin in e.invoked}}) - - if e.consistent is not None: - measurement = {"key": "consistent", "value": e.consistent} - - if e.consistent_ratio is not None and e.consistent_ratio != 1: - measurement["samplingRatio"] = e.consistent_ratio - - measurements.append(measurement) - - if len(e.latencies) > 0: - measurements.append({"key": "latency_ms", "values": {o.value: timedelta_millis(d) for o, d in e.latencies.items()}}) - - if len(e.errors) > 0: - measurements.append({"key": "error", "values": {origin.value: True for origin in e.errors}}) - - if len(measurements): - out["measurements"] = measurements - - return out - - return None - - def make_summary_event(self, summary: EventSummary): - """ - Transform summarizer data into the format used for the event payload. - """ - flags_out = dict() # type: Dict[str, Any] - for key, flag_data in summary.flags.items(): - flag_data_out = {'default': flag_data.default, 'contextKinds': list(flag_data.context_kinds)} - counters = [] # type: List[Dict[str, Any]] - for ckey, cval in flag_data.counters.items(): - variation, version = ckey - counter = {'count': cval.count, 'value': cval.value} - if variation is not None: - counter['variation'] = variation - if version is None: - counter['unknown'] = True - else: - counter['version'] = version - counters.append(counter) - flag_data_out['counters'] = counters - flags_out[key] = flag_data_out - return {'kind': 'summary', 'startDate': summary.start_date, 'endDate': summary.end_date, 'features': flags_out} - - def _process_context(self, context: Context, redact_anonymous: bool): - if redact_anonymous: - return self._context_formatter.format_context_redact_anonymous(context) - - return self._context_formatter.format_context(context) - - def _context_keys(self, context: Context): - out = {} - for i in range(context.individual_context_count): - c = context.get_individual_context(i) - if c is not None: - out[c.kind] = c.key - return out - - def _base_eval_props(self, e: EventInputEvaluation, kind: str) -> dict: - out = {'kind': kind, 'creationDate': e.timestamp, 'key': e.key, 'value': e.value, 'default': e.default_value} - if e.flag is not None: - out['version'] = e.flag.version - if e.variation is not None: - out['variation'] = e.variation - if e.reason is not None: - out['reason'] = e.reason - if e.prereq_of is not None: - out['prereqOf'] = e.prereq_of.key - return out - - class EventPayloadSendTask: def __init__(self, http, config, formatter, payload, response_fn): self._http = http @@ -242,43 +98,6 @@ def run(self): log.warning('Unhandled exception in event processor. Diagnostic event was not sent. [%s]', e) -FlushPayload = namedtuple('FlushPayload', ['events', 'summary']) - - -class EventBuffer: - def __init__(self, capacity): - self._capacity = capacity - self._events = [] - self._summarizer = EventSummarizer() - self._exceeded_capacity = False - self._dropped_events = 0 - - def add_event(self, event: Any): - if len(self._events) >= self._capacity: - self._dropped_events += 1 - if not self._exceeded_capacity: - log.warning("Exceeded event queue capacity. Increase capacity to avoid dropping events.") - self._exceeded_capacity = True - else: - self._events.append(event) - self._exceeded_capacity = False - - def add_to_summary(self, event: EventInputEvaluation): - self._summarizer.summarize_event(event) - - def get_and_clear_dropped_count(self): - dropped_count = self._dropped_events - self._dropped_events = 0 - return dropped_count - - def get_payload(self): - return FlushPayload(self._events, self._summarizer.snapshot()) - - def clear(self): - self._events = [] - self._summarizer.clear() - - class EventDispatcher: def __init__(self, inbox, config, http_client, diagnostic_accumulator=None): self._inbox = inbox diff --git a/ldclient/impl/events/event_processor_core.py b/ldclient/impl/events/event_processor_core.py new file mode 100644 index 00000000..726caf04 --- /dev/null +++ b/ldclient/impl/events/event_processor_core.py @@ -0,0 +1,226 @@ +""" +Shared data types and pure-transform helpers for the analytics event processors. + +These classes contain no I/O and are used by both the sync (event_processor.py) +and async (async_event_processor.py) implementations. +""" + +from collections import namedtuple +from typing import Any, Dict, List + +from ldclient.config import Config +from ldclient.context import Context +from ldclient.impl.events.event_context_formatter import EventContextFormatter +from ldclient.impl.events.event_summarizer import EventSummarizer, EventSummary +from ldclient.impl.events.types import ( + EventInputCustom, + EventInputEvaluation, + EventInputIdentify +) +from ldclient.impl.util import log, timedelta_millis +from ldclient.migrations.tracker import MigrationOpEvent + +# --------------------------------------------------------------------------- +# Shared event wrapper types +# --------------------------------------------------------------------------- + + +class DebugEvent: + __slots__ = ['original_input'] + + def __init__(self, original_input: EventInputEvaluation): + self.original_input = original_input + + +class IndexEvent: + __slots__ = ['timestamp', 'context'] + + def __init__(self, timestamp: int, context: Context): + self.timestamp = timestamp + self.context = context + + +FlushPayload = namedtuple('FlushPayload', ['events', 'summary']) + + +# --------------------------------------------------------------------------- +# EventBuffer — in-memory accumulation buffer (no I/O) +# --------------------------------------------------------------------------- + +class EventBuffer: + def __init__(self, capacity: int): + self._capacity = capacity + self._events: List[Any] = [] + self._summarizer = EventSummarizer() + self._exceeded_capacity = False + self._dropped_events = 0 + + def add_event(self, event: Any): + if len(self._events) >= self._capacity: + self._dropped_events += 1 + if not self._exceeded_capacity: + log.warning("Exceeded event queue capacity. Increase capacity to avoid dropping events.") + self._exceeded_capacity = True + else: + self._events.append(event) + self._exceeded_capacity = False + + def add_to_summary(self, event: EventInputEvaluation): + self._summarizer.summarize_event(event) + + def get_and_clear_dropped_count(self) -> int: + count = self._dropped_events + self._dropped_events = 0 + return count + + def get_payload(self) -> FlushPayload: + return FlushPayload(self._events, self._summarizer.snapshot()) + + def clear(self): + self._events = [] + self._summarizer.clear() + + +# --------------------------------------------------------------------------- +# EventOutputFormatter — pure data transform (no I/O) +# --------------------------------------------------------------------------- + +class EventOutputFormatter: + def __init__(self, config: Config): + self._context_formatter = EventContextFormatter( + config.all_attributes_private, config.private_attributes + ) + + def make_output_events(self, events: List[Any], summary: EventSummary): + events_out = [self.make_output_event(e) for e in events] + if not summary.is_empty(): + events_out.append(self.make_summary_event(summary)) + return events_out + + def make_output_event(self, e: Any): + if isinstance(e, EventInputEvaluation): + out = self._base_eval_props(e, 'feature') + out['context'] = self._process_context(e.context, True) + return out + elif isinstance(e, DebugEvent): + out = self._base_eval_props(e.original_input, 'debug') + out['context'] = self._process_context(e.original_input.context, False) + return out + elif isinstance(e, EventInputIdentify): + return { + 'kind': 'identify', + 'creationDate': e.timestamp, + 'context': self._process_context(e.context, False), + } + elif isinstance(e, IndexEvent): + return { + 'kind': 'index', + 'creationDate': e.timestamp, + 'context': self._process_context(e.context, False), + } + elif isinstance(e, EventInputCustom): + out = { + 'kind': 'custom', + 'creationDate': e.timestamp, + 'key': e.key, + 'context': self._process_context(e.context, True), + } + if e.data is not None: + out['data'] = e.data + if e.metric_value is not None: + out['metricValue'] = e.metric_value + return out + elif isinstance(e, MigrationOpEvent): + out = { + 'kind': 'migration_op', + 'creationDate': e.timestamp, + 'operation': e.operation.value, + 'context': self._process_context(e.context, True), + 'evaluation': {'key': e.key, 'value': e.detail.value}, + } + if e.flag is not None: + out["evaluation"]["version"] = e.flag.version + if e.default_stage: + out["evaluation"]["default"] = e.default_stage.value + if e.detail.variation_index is not None: + out["evaluation"]["variation"] = e.detail.variation_index + if e.detail.reason is not None: + out["evaluation"]["reason"] = e.detail.reason + if e.sampling_ratio is not None and e.sampling_ratio != 1: + out["samplingRatio"] = e.sampling_ratio + + measurements: List[Dict] = [] + if len(e.invoked) > 0: + measurements.append({"key": "invoked", "values": {o.value: True for o in e.invoked}}) + if e.consistent is not None: + measurement = {"key": "consistent", "value": e.consistent} + if e.consistent_ratio is not None and e.consistent_ratio != 1: + measurement["samplingRatio"] = e.consistent_ratio + measurements.append(measurement) + if len(e.latencies) > 0: + measurements.append({"key": "latency_ms", "values": {o.value: timedelta_millis(d) for o, d in e.latencies.items()}}) + if len(e.errors) > 0: + measurements.append({"key": "error", "values": {o.value: True for o in e.errors}}) + if measurements: + out["measurements"] = measurements + return out + return None + + def make_summary_event(self, summary: EventSummary): + """Transform summarizer data into the format used for the event payload.""" + flags_out: Dict[str, Any] = {} + for key, flag_data in summary.flags.items(): + flag_data_out = { + 'default': flag_data.default, + 'contextKinds': list(flag_data.context_kinds), + } + counters = [] + for ckey, cval in flag_data.counters.items(): + variation, version = ckey + counter = {'count': cval.count, 'value': cval.value} + if variation is not None: + counter['variation'] = variation + if version is None: + counter['unknown'] = True + else: + counter['version'] = version + counters.append(counter) + flag_data_out['counters'] = counters + flags_out[key] = flag_data_out + return { + 'kind': 'summary', + 'startDate': summary.start_date, + 'endDate': summary.end_date, + 'features': flags_out, + } + + def _process_context(self, context: Context, redact_anonymous: bool): + if redact_anonymous: + return self._context_formatter.format_context_redact_anonymous(context) + return self._context_formatter.format_context(context) + + def _context_keys(self, context: Context): + out = {} + for i in range(context.individual_context_count): + c = context.get_individual_context(i) + if c is not None: + out[c.kind] = c.key + return out + + def _base_eval_props(self, e: EventInputEvaluation, kind: str) -> dict: + out = { + 'kind': kind, + 'creationDate': e.timestamp, + 'key': e.key, + 'value': e.value, + 'default': e.default_value, + } + if e.flag is not None: + out['version'] = e.flag.version + if e.variation is not None: + out['variation'] = e.variation + if e.reason is not None: + out['reason'] = e.reason + if e.prereq_of is not None: + out['prereqOf'] = e.prereq_of.key + return out