Source code for harp_apps.janitor.worker

import asyncio
from typing import cast

from prometheus_client import Gauge

from harp import get_logger
from harp_apps.storage.services import SqlStorage

from ..storage.models.base import with_session
from ..storage.types import IBlobStorage, IStorage
from .settings import OLD_AFTER, PERIOD

logger = get_logger(__name__)

PROMETHEUS_GAUGES = {
    "storage.transactions": Gauge("storage_transactions", "Transactions currently in storage."),
    "storage.messages": Gauge("storage_messages", "Messages currently in storage."),
    "storage.blobs": Gauge("storage_blobs", "Blob objects currently in storage."),
    "storage.blobs.orphans": Gauge("storage_blobs_orphans", "Orphan blobs currently in storage."),
}


[docs] class JanitorWorker:
[docs] def __init__(self, storage: IStorage, blob_storage: IBlobStorage): self.storage: SqlStorage = cast(SqlStorage, storage) self.blob_storage: IBlobStorage = blob_storage self.running = False self.session_factory = self.storage.session_factory self._running_lock = asyncio.Lock()
[docs] def stop(self): """ Mark the loop for termination. """ self.running = False
[docs] async def run(self): """ Once dependencies are ready, start the main loop (basically, run the `loop()` every PERIOD seconds), until `stop()` is called. """ # do not start before storage is ready async with self._running_lock: self.running = True while self.running: try: await self.loop() except Exception as exc: logger.exception(exc) await asyncio.sleep(PERIOD)
[docs] async def loop(self): """ One iteration of the janitor loop. """ # Delete old transactions result = await self.delete_old_transactions() if result.rowcount: logger.debug("🧹 Deleted %d old transactions", result.rowcount) await self.delete_orphan_blobs() # Compute and store stored objecg counts as metrics logger.debug("🧹 Compute and store metrics...") await self.compute_and_store_metrics()
[docs] @with_session async def delete_old_transactions(self, /, *, session): """ Remove transactions older than OLD_AFTER days. On correct database implementations (postgresql for example), it will cascade to related objects. On sqlite, there will be garbage left, but it's not a big deal. """ # TODO to storage result = await session.execute(self.storage.transactions.delete_old(OLD_AFTER)) await session.commit() return result
[docs] @with_session async def delete_orphan_blobs(self, /, *, session): """ Find and remove blobs that are not referenced anymore by any transaction. """ count = None if self.blob_storage.type == "sql": result = await session.execute(self.storage.blobs.delete_orphans()) await session.commit() count = result.rowcount if result.rowcount else 0 elif self.blob_storage.type == "redis": pass else: pass if count is None: # The blob storage may need to clean orphans but no implementation is available logger.debug("🧹 DeleteOrphanBlobs[%s] Not implemented.", self.blob_storage.type) elif count is False: # The blob storage does not NEED to delete orphans (for example for a NullBlobStorage) pass else: logger.debug( "🧹 DeleteOrphanBlobs[%s] Removed %d blobs.", self.blob_storage.type, count, )
[docs] @with_session async def compute_and_store_metrics(self, /, *, session): """ Compute counts of objects in storage, and store them as metrics. """ # TODO to storage await self.storage.metrics.insert_values(await self.compute_metrics(session))
[docs] async def compute_metrics(self, session): values = { "storage.transactions": await self.do_count(session, "transactions"), "storage.messages": await self.do_count(session, "messages"), "storage.blobs": await self.do_count(session, "blobs"), "storage.blobs.orphans": await self.do_count(session, "blobs", method="count_orphans"), } for key, value in values.items(): PROMETHEUS_GAUGES[key].set(value) return values
[docs] async def do_count(self, session, name: str, /, *, method="count"): """ Helper to count objects in storage, from different repositories and using different methods for building the actual query. :param session: sqlalchemy async session :param name: repository name (should be available from storage) :param method: method name to call on the repository to get the actual sqlalchemy query, default as "count" :return: integer """ # TODO to storage return ( await session.execute( getattr( getattr(self.storage, name), method, )() ) ).scalar()