harp_apps.proxy.settings.liveness.leaky_bucket

Inheritance diagram of harp_apps.proxy.settings.liveness.leaky_bucket

class LeakyBucketLiveness[source]

Bases: BaseLiveness[LeakyBucketLivenessSettings]

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

classmethod get_state_of(subject)[source]
Parameters:

subject (LivenessSubject)

Return type:

LeakyBucketLivenessSubjectState

failure(subject, reason=None)[source]
Parameters:
Return type:

bool

success(subject)[source]
Parameters:

subject (LivenessSubject)

Return type:

bool

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class LeakyBucketLivenessSettings[source]

Bases: BaseLivenessSettings

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

build_impl()[source]
capacity: float

The maximum capacity of the bucket.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

rate: float

The rate at which the bucket leaks (x per second).

threshold: float

The threshold over which the remote is considered down.

type: Literal['leaky']
class LeakyBucketLivenessSubjectState[source]

Bases: BaseModel

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

leak(rate)[source]
Parameters:

rate (float)

current: float
last_checked: float | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].