Source code for harp_apps.notifications.subscriber
import asyncio
from typing import List, Optional
from whistle import IAsyncEventDispatcher
from harp.asgi.events import EVENT_CORE_RESPONSE, ResponseEvent
from harp_apps.notifications.senders.google_chat import GoogleChatNotificationSender
from harp_apps.notifications.senders.slack import SlackNotificationSender
from harp_apps.notifications.settings import NotificationsSettings
from harp_apps.notifications.typing import NotificationSender
AVAILABLE_WEBHOOK_URLS = ["slack_webhook_url", "google_chat_webhook_url"]
[docs]
class NotificationSubscriber:
[docs]
def __init__(self, settings: NotificationsSettings, public_url: Optional[str] = None):
self.senders: List[NotificationSender] = []
for name in AVAILABLE_WEBHOOK_URLS:
if name == "google_chat_webhook_url":
try:
webhook_url = getattr(settings, name)
(
self.senders.append(GoogleChatNotificationSender(webhook_url, public_url))
if webhook_url
else None
)
except AttributeError:
pass
elif name == "slack_webhook_url":
try:
webhook_url = getattr(settings, name)
(self.senders.append(SlackNotificationSender(webhook_url, public_url)) if webhook_url else None)
except AttributeError:
pass
[docs]
def subscribe(self, dispatcher: IAsyncEventDispatcher):
dispatcher.add_listener(EVENT_CORE_RESPONSE, self.on_response_send_error_notifications)
[docs]
def unsubscribe(self, dispatcher: IAsyncEventDispatcher):
dispatcher.remove_listener(EVENT_CORE_RESPONSE, self.on_response_send_error_notifications)
[docs]
async def on_response_send_error_notifications(self, event: ResponseEvent):
if 500 <= event.response.status < 600:
transaction = event.request.extensions.get("transaction")
await self.send_notification(
method=event.request.extensions.get("remote_method"),
url=event.request.extensions.get("remote_url"),
status_code=event.response.status,
message=event.response.reason_phrase,
transaction_id=transaction.id if transaction else None,
)
[docs]
async def send_notification(
self,
method: Optional[str],
url: Optional[str],
status_code: int,
message: str,
transaction_id: Optional[str],
) -> None:
async with asyncio.TaskGroup() as tg:
for sender in self.senders:
tg.create_task(sender.send_notification(method, url, status_code, message, transaction_id))