Source code for harp_apps.notifications.senders.google_chat
from typing import Optional
from httpx import AsyncClient
[docs]
class GoogleChatNotificationSender:
[docs]
def __init__(self, webhook_url: str, public_url: Optional[str] = None) -> None:
self.webhook_url = webhook_url
self.public_url = public_url
[docs]
async def send_notification(
self,
method: Optional[str],
url: Optional[str],
status_code: int,
message: str,
transaction_id: Optional[str],
) -> None:
error_message = self._format_error_message(method, url, status_code, message, transaction_id, self.public_url)
async with AsyncClient() as client:
response = await client.post(self.webhook_url, json=error_message)
response.raise_for_status()
assert response.status_code == 200
@staticmethod
def _format_error_message(
method: Optional[str],
url: Optional[str],
status_code: int,
message: str,
transaction_id: Optional[str],
public_url: Optional[str] = None,
) -> dict:
sections = [
{"widgets": [{"textParagraph": {"text": f"<b>Error in {method} request to {url}</b>"}}]},
{
"widgets": [
{
"keyValue": {
"topLabel": "Error Code",
"content": f"<b>{status_code}</b>",
}
},
{"keyValue": {"topLabel": "Status", "content": message}},
]
},
{"widgets": [{"textParagraph": {"text": f"<i>Transaction ID:</i> <b>{transaction_id}</b>"}}]},
]
if public_url:
sections.append(
{
"widgets": [
{
"buttons": [
{
"textButton": {
"text": "VIEW LOGS",
"onClick": {"openLink": {"url": f"{public_url}/transactions/{transaction_id}"}},
}
}
] # type: ignore
}
]
}
)
return {
"cards": [
{
"header": {
"title": "🔴 Error Notification",
},
"sections": sections,
}
]
}