Source code for conflowgen.tools.continuous_distribution

from __future__ import annotations

import abc
import math
import typing

import numpy as np
import scipy.stats

from conflowgen.domain_models.distribution_models.container_dwell_time_distribution import \
    ContainerDwellTimeDistributionInterface


[docs] class ContinuousDistribution(abc.ABC): average: float variance: float minimum: float maximum: float distribution_types: typing.Dict[str, typing.Type[ContinuousDistribution]] = {} def __init__( self, average: typing.Optional[float], minimum: float, maximum: float, unit: typing.Optional[str] = None, ): """ Args: average: The expected mean of the distribution. minimum: The minimum of the distribution. Smaller values are automatically set to zero. maximum: The maximum of the distribution. Larger values are automatically set to zero. unit: The unit for the average, minimum, and maximum. It is used for the ``__repr__`` implementation. """ assert minimum < maximum, f"The assertion {minimum} < {maximum} failed." if average is not None: assert minimum < average < maximum, f"The assertion {minimum} < {average} < {maximum} failed." self.average = average self.minimum = minimum self.maximum = maximum self.unit = unit self.unit_repr, self.unit_repr_square = "", "" if unit: self.unit_repr = unit self.unit_repr_square = unit + "²" # noinspection PyMethodOverriding def __init_subclass__(cls, /, short_name: str) -> None: """ Args: short_name: Provide a short name for the distribution. This is, e.g., used in the database. """ super().__init_subclass__() cls.distribution_types[short_name] = cls cls.short_name = short_name @abc.abstractmethod def _get_probabilities_based_on_distribution(self, xs: typing.Sequence[float]) -> np.ndarray: pass
[docs] @classmethod @abc.abstractmethod def from_database_entry(cls, entry: ContainerDwellTimeDistributionInterface) -> typing.Type[ContinuousDistribution]: """ Args: entry: The database entry describing a continuous distribution. Returns: The loaded distribution instance. """ pass
def to_dict(self) -> typing.Dict: return { "distribution_name": self.short_name, "average_number_of_hours": self.average, "variance": self.variance, "maximum_number_of_hours": self.maximum, "minimum_number_of_hours": self.minimum }
[docs] def get_probabilities( self, xs: typing.Sequence[float], reversed_distribution: bool = False ) -> np.ndarray: """ Args: xs: Elements that are on the same scale as average, variance, minimum, and maximum reversed_distribution: Whether to reverse the probabilities Returns: The respective probability that element x of xs is drawn from this distribution. """ xs = np.array(xs) densities = self._get_probabilities_based_on_distribution(xs) densities[xs < self.minimum] = 0 densities[xs > self.maximum] = 0 sum_of_all_densities = densities.sum() if not np.isnan(sum_of_all_densities) and sum_of_all_densities > 0: densities = densities / sum_of_all_densities if reversed_distribution: densities = np.flip(densities) else: densities = np.zeros_like(xs) return densities
class ClippedLogNormal(ContinuousDistribution, short_name="lognormal"): def __init__( self, average: float, variance: float, minimum: float, maximum: float, unit: typing.Optional[str] = None ): super().__init__( average=average, minimum=minimum, maximum=maximum, unit=unit ) self.variance = variance self._lognorm = self._get_scipy_lognorm() # noinspection PyUnresolvedReferences def _get_scipy_lognorm(self) -> scipy.stats.rv_frozen: """ See https://www.johndcook.com/blog/2022/02/24/find-log-normal-parameters/ for reference """ shifted_average = self.average - self.minimum sigma2 = math.log(self.variance / shifted_average ** 2 + 1) mu = math.log(shifted_average) - sigma2 / 2 scipy_shape = sigma2 ** 0.5 scipy_scale = math.exp(mu) frozen_lognorm = scipy.stats.lognorm(s=scipy_shape, scale=scipy_scale, loc=self.minimum) return frozen_lognorm def _get_probabilities_based_on_distribution(self, xs: typing.Sequence[float]) -> np.ndarray: return self._lognorm.pdf(xs) @classmethod def from_database_entry(cls, entry: ContainerDwellTimeDistributionInterface) -> ClippedLogNormal: return cls( average=entry.average_number_of_hours, variance=entry.variance, minimum=entry.minimum_number_of_hours, maximum=entry.maximum_number_of_hours, unit="h" ) def __repr__(self) -> str: return ( f"<{self.__class__.__name__}: " f"avg={self.average:.1f}{self.unit_repr}, " f"min={self.minimum:.1f}{self.unit_repr}, " f"max={self.maximum:.1f}{self.unit_repr}, " f"var={self.variance:.1f}{self.unit_repr_square}, " f"sd={self.variance ** .5:.1f}{self.unit_repr}" f">" ) class Uniform(ContinuousDistribution, short_name="uniform"): def __init__( self, minimum: float, maximum: float, unit: typing.Optional[str] = None ): super().__init__( average=None, minimum=minimum, maximum=maximum, unit=unit ) def _get_probabilities_based_on_distribution(self, xs: typing.Sequence[float]) -> np.ndarray: return np.ones_like(xs) @classmethod def from_database_entry(cls, entry: ContainerDwellTimeDistributionInterface) -> Uniform: return cls( minimum=entry.minimum_number_of_hours, maximum=entry.maximum_number_of_hours, unit="h" ) def __repr__(self) -> str: return ( f"<{self.__class__.__name__}: " f"min={self.minimum:.1f}{self.unit_repr}, " f"max={self.maximum:.1f}{self.unit_repr}" f">" ) def multiply_discretized_probability_densities(*probabilities: typing.Collection[float]) -> typing.Sequence[float]: assert len({len(p) for p in probabilities}) == 1, "All probability vectors have the same length, but found these:" \ f"'{ [len(p) for p in probabilities] } " for i, probability_vector in enumerate(probabilities): assert not np.isnan(probability_vector).any(), \ f"All probability vector should contain only numbers, but found a NaN in vector {i}: {probability_vector}" np_probs = [np.array(probs, dtype=np.double) for probs in probabilities] multiplied_probabilities = np.multiply(*np_probs) sum_of_all_probabilities = multiplied_probabilities.sum() if np.isnan(sum_of_all_probabilities).any() or sum_of_all_probabilities == 0: normalized_probabilities = np.zeros_like(multiplied_probabilities) else: normalized_probabilities = multiplied_probabilities / sum_of_all_probabilities return normalized_probabilities