Source code for ldclient.migrations.tracker

from typing import Callable, Optional, Union, Set, Dict
import time
from datetime import timedelta
from random import Random
from ldclient.impl.sampler import Sampler
from ldclient.evaluation import EvaluationDetail
from ldclient.context import Context
from ldclient.impl.model import FeatureFlag
from threading import Lock
from ldclient.impl.events.types import EventInput
from ldclient.migrations.types import Stage, Operation, Origin
from ldclient.impl.util import log


class MigrationOpEvent(EventInput):
    """
    A migration op event represents the results of a migration-assisted read or
    write operation.

    The event includes optional measurements reporting on consistency checks,
    error reporting, and operation latency values.

    This event should not be constructed directly; rather, it should be built
    through :class:`ldclient.migrations.OpTracker()`.
    """
    __slots__ = ['key', 'flag', 'operation', 'default_stage', 'detail', 'invoked', 'consistent', 'consistent_ratio', 'errors', 'latencies']

    def __init__(self, timestamp: int, context: Context, key: str, flag: Optional[FeatureFlag], operation: Operation, default_stage: Stage, detail: EvaluationDetail, invoked: Set[Origin], consistent: Optional[bool], consistent_ratio: Optional[int], errors: Set[Origin], latencies: Dict[Origin, timedelta]):
        sampling_ratio = None if flag is None else flag.sampling_ratio
        super().__init__(timestamp, context, sampling_ratio)

        self.key = key
        self.flag = flag
        self.operation = operation
        self.default_stage = default_stage
        self.detail = detail
        self.invoked = invoked
        self.consistent = consistent
        self.consistent_ratio = consistent_ratio
        self.errors = errors
        self.latencies = latencies

    def to_debugging_dict(self) -> dict:
        return {
            "timestamp": self.timestamp,
            "context": self.context.to_dict(),
            "flag": None if self.flag is None else {"key": self.flag.key},
            "operation": self.operation.value,
            "default_stage": self.default_stage.value,
            "detail": self.detail,
            "invoked": self.invoked,
            "consistent": self.consistent,
            "consistent_ratio": self.consistent_ratio,
            "errors": self.errors,
            "latencies": self.latencies,
        }


[docs]class OpTracker: """ An OpTracker is responsible for managing the collection of measurements that which a user might wish to record throughout a migration-assisted operation. Example measurements include latency, errors, and consistency. The OpTracker is not expected to be instantiated directly. Consumers should instead call :func:`ldclient.client.LDClient.migration_variation()` and use the returned tracker instance. """
[docs] def __init__( self, key: str, flag: Optional[FeatureFlag], context: Context, detail: EvaluationDetail, default_stage: Stage ): self.__key = key self.__flag = flag self.__context = context self.__detail = detail self.__default_stage = default_stage self.__mutex = Lock() self.__operation: Optional[Operation] = None self.__invoked: Set[Origin] = set() self.__consistent: Optional[bool] = None self.__consistent_ratio: int = 1 if flag is not None and flag.migrations is not None and flag.migrations.check_ratio is not None: self.__consistent_ratio = flag.migrations.check_ratio self.__errors: Set[Origin] = set() self.__latencies: Dict[Origin, timedelta] = {} self.__sampler = Sampler(Random())
[docs] def operation(self, op: Operation) -> 'OpTracker': """ Sets the migration related operation associated with these tracking measurements. :param op: The read or write operation symbol. """ if not isinstance(op, Operation): return self with self.__mutex: self.__operation = op return self
[docs] def invoked(self, origin: Origin) -> 'OpTracker': """ Allows recording which origins were called during a migration. :param origin: Designation for the old or new origin. """ if not isinstance(origin, Origin): return self with self.__mutex: self.__invoked.add(origin) return self
[docs] def consistent(self, is_consistent: Callable[[], bool]) -> 'OpTracker': """ Allows recording the results of a consistency check. This method accepts a callable which should take no parameters and return a single boolean to represent the consistency check results for a read operation. A callable is provided in case sampling rules do not require consistency checking to run. In this case, we can avoid the overhead of a function by not using the callable. :param is_consistent: closure to return result of comparison check """ with self.__mutex: try: if self.__sampler.sample(self.__consistent_ratio): self.__consistent = is_consistent() except Exception as e: log.error("exception raised during consistency check %s; failed to record measurement", repr(e)) return self
[docs] def error(self, origin: Origin) -> 'OpTracker': """ Allows recording whether an error occurred during the operation. :param origin: Designation for the old or new origin. """ if not isinstance(origin, Origin): return with self.__mutex: self.__errors.add(origin) return self
[docs] def latency(self, origin: Origin, duration: timedelta) -> 'OpTracker': """ Allows tracking the recorded latency for an individual operation. :param origin: Designation for the old or new origin. :param duration: Duration measurement. """ if not isinstance(origin, Origin): return with self.__mutex: self.__latencies[origin] = duration return self
[docs] def build(self) -> Union[MigrationOpEvent, str]: """ Creates an instance of :class:`MigrationOpEvent()`. This event data can be provided to :func:`ldclient.client.LDClient.track_migration_op()` to relay this metric information upstream to LaunchDarkly services. :return: A :class:`MigrationOpEvent()` or a string describing the type of failure. """ with self.__mutex: if self.__operation is None: return "operation not provided" if len(self.__key) == 0: return "migration operation cannot contain an empty key" if len(self.__invoked) == 0: return "no origins were invoked" if not self.__context.valid: return "provided context was invalid" error = self.__check_invoked_consistency() if error: return error # TODO: Inject this time function or something timestamp = int(time.time() * 1_000) return MigrationOpEvent( timestamp, self.__context, self.__key, self.__flag, self.__operation, self.__default_stage, self.__detail, self.__invoked.copy(), self.__consistent, None if self.__consistent is None else self.__consistent_ratio, self.__errors.copy(), self.__latencies.copy())
def __check_invoked_consistency(self) -> Optional[str]: for origin in Origin: if origin in self.__invoked: continue if origin in self.__latencies: return f"provided latency for origin '{origin.value}' without recording invocation" if origin in self.__errors: return f"provided error for origin '{origin.value}' without recording invocation" # A consistency measurement only makes sense if TWO origins were # executed. Otherwise, there is nothing to compare against. if self.__consistent is not None and len(self.__invoked) != 2: return "provided consistency without recording both invocations" return None