importasynciofrominspectimportiscoroutinefunctionfromprometheus_clientimportGaugefromharpimportget_loggerlogger=get_logger(__name__)ASYNC_WORKER_QUEUE_BACKLOG_GAUGE=Gauge("async_worker_queue_backlog","Number of items in the async worker queue.",["queue_id"],)
asyncdef__call__(self):whileTrue:ifself._last_cleanup_at<asyncio.get_event_loop().time()-5:self.cleanup()try:item,ignore_errors=awaitself._queue.get()exceptRuntimeError:# queue is closedbreaktry:awaititem()exceptExceptionase:ifnotignore_errors:logger.exception(f"Error while executing queued task: {e}")finally:self._queue.task_done()
[docs]defcleanup(self):self._last_cleanup_at=asyncio.get_event_loop().time()self._pressure=self._queue.qsize()ifASYNC_WORKER_QUEUE_BACKLOG_GAUGE:# prom ignore 0 values so we set the minimum as 1ASYNC_WORKER_QUEUE_BACKLOG_GAUGE.labels(id(self)).set(max(self._pressure,1))
[docs]asyncdefpush(self,item,/,*,ignore_errors=False):ifnotself._running:raiseRuntimeError("Queue is closed.")ifnotiscoroutinefunction(item):raiseValueError(f"Unknown item type: {type(item)}, expecting coroutine function.")awaitself._queue.put((item,ignore_errors))