Source code for harp_apps.dashboard.controllers.system

from typing import cast

from pydantic import ValidationError
from sqlalchemy import func, select
from sqlalchemy.orm import aliased, joinedload

from harp import __revision__, __version__, get_logger
from harp.config.asdict import asdict
from harp.controllers import GetHandler, ProxyControllerResolver, PutHandler, RouterPrefix, RoutingController
from harp.http import HttpRequest, HttpResponse
from harp.typing.global_settings import GlobalSettings
from harp.views.json import json
from harp_apps.storage.models import MetricValue
from harp_apps.storage.services.sql import SqlStorage
from harp_apps.storage.types import IStorage

from ..utils.dependencies import get_python_dependencies, parse_dependencies
from .models.system import SystemPutProxyInput

logger = get_logger(__name__)


[docs] @RouterPrefix("/api/system") class SystemController(RoutingController):
[docs] def __init__( self, *, storage: IStorage, settings: GlobalSettings, resolver: ProxyControllerResolver, handle_errors=True, router=None, ): self.settings = settings self.storage: SqlStorage = cast(SqlStorage, storage) self.resolver = resolver self._dependencies = None super().__init__(handle_errors=handle_errors, router=router)
[docs] @GetHandler("/") async def get(self, request: HttpRequest): user = request.extensions.get("user") return json( { "version": __version__, "revision": __revision__, "user": user, } )
[docs] @GetHandler("/proxy") async def get_proxy(self): endpoints = list(self.resolver.endpoints.values()) return json({"endpoints": asdict(endpoints, verbose=True)})
[docs] @PutHandler("/proxy") async def put_proxy(self, request: HttpRequest): endpoints = list(self.resolver.endpoints.values()) try: input_data = SystemPutProxyInput.model_validate_json(await request.aread()) except ValidationError as exc: return HttpResponse(f"Invalid input: {exc}", status=400) # find endpoint endpoint = None for _endpoint in endpoints: if _endpoint.settings.name == input_data.endpoint: endpoint = _endpoint break if not endpoint: return HttpResponse(f"Endpoint not found: {request.query.get('endpoint')}", status=404) try: endpoint.remote[input_data.url] except KeyError: return HttpResponse(f"Endpoint URL not found: {input_data.url}", status=404) match input_data.action: case "up": endpoint.remote.set_up(input_data.url) case "down": endpoint.remote.set_down(input_data.url) case "checking": endpoint.remote.set_checking(input_data.url) case _: return HttpResponse(b"Invalid action", status=400) return await self.get_proxy()
[docs] @GetHandler("/settings") async def get_settings(self): settings_dict = asdict(self.settings, verbose=True, secure=True, mode="python") return json(settings_dict)
[docs] @GetHandler("/dependencies") async def get_dependencies(self): return json({"python": await self.__get_cached_python_dependencies()})
[docs] @GetHandler("/storage") async def get_storage(self): subquery = select( func.rank() .over( order_by=MetricValue.created_at.desc(), partition_by=MetricValue.metric_id, ) .label("rank"), MetricValue, ).subquery() v = aliased(MetricValue, subquery) query = select(v).where(subquery.c.rank == 1).options(joinedload(v.metric)) async with self.storage.session_factory() as session: result = (await session.execute(query)).scalars().all() return json( { "settings": asdict(self.settings.get("storage", {}), secure=True, mode="python"), "counts": {value.metric.name.split(".", 1)[-1]: value.value for value in result}, } )
async def __get_cached_python_dependencies(self): if self._dependencies is None: self._dependencies = parse_dependencies(await get_python_dependencies()) return self._dependencies