Source code for harp_apps.telemetry.manager
import asyncio
import hashlib
import platform
import sys
from functools import cached_property
from httpx import AsyncClient, TransportError
from whistle import IAsyncEventDispatcher
from harp import __version__, get_logger
from harp.asgi.events import EVENT_CORE_STARTED
from harp.typing import GlobalSettings
from harp_apps.storage.types import IStorage
logger = get_logger(__name__)
[docs]
class TelemetryManager:
default_period = 86400 # 24 hours
default_endpoint = "https://connect.makersquad.fr/t/a"
[docs]
def __init__(
self,
global_settings: GlobalSettings,
client: AsyncClient,
storage: IStorage = None,
dispatcher: IAsyncEventDispatcher = None,
**kwargs,
):
self.global_settings = global_settings
self.client = client
self.storage = storage
self.endpoint = kwargs.pop("endpoint", None) or self.default_endpoint
self.worker = None
self.count = 0
self._platform = " ".join((platform.python_implementation(), sys.version, "on", platform.platform()))
self._host = "; ".join(platform.uname())
self._hashed = hashlib.sha1("\n".join([self._platform, self._host]).encode("utf-8")).hexdigest()
if dispatcher:
dispatcher.add_listener(EVENT_CORE_STARTED, self.start, priority=-10)
@cached_property
def period(self):
return self.default_period
@cached_property
def applications(self):
return ",".join(map(lambda x: x.split(".")[-1], self.global_settings.get("applications", [])))
[docs]
async def start(self, event):
self.worker = asyncio.create_task(self.loop_forever())
[docs]
async def ping(self):
try:
return await self.client.post(
self.endpoint,
json={
# anonymous fingerprint of instance
"f": self._hashed,
# configured applications
"a": self.applications,
# application version
"v": __version__,
# transaction count in last period (not implemented yet)
"c": (await self.storage.get_usage()) if self.storage else 0,
# activity type
"t": "ping" if self.count else "start",
# incrementing counter
"i": self.count,
},
timeout=5.0,
)
finally:
self.count += 1
[docs]
async def loop_forever(self):
while True:
try:
await self.ping()
except TransportError as exc:
logger.warning("Failed to send activity ping: %s", exc)
await asyncio.sleep(self.period)