[docs]asyncdefon_transaction_started(self,event:TransactionEvent):"""Event handler to store the transaction in the database."""ifself.pressure>=4:event.transaction.markers.add(SKIP_STORAGE)return# Copy fields into a dict that won't change before the task is handledtransaction_data=event.transaction.as_storable_dict(with_tags=True)asyncdefcreate_transaction():returnawaitself.storage.transactions.create(transaction_data)# Scheduleawaitself.push(create_transaction)
[docs]asyncdefon_transaction_message(self,event:HttpMessageEvent):ifSKIP_STORAGEinevent.transaction.markersorself.pressure>=3:returnawaitevent.message.aread()serializer=get_serializer_for(event.message)message_data={"transaction_id":event.transaction.id,"kind":event.message.kind,"summary":serializer.summary,"created_at":event.message.created_at,}# Eventually store the headers blob (later)ifself.pressure<=2:headers_blob=Blob.from_data(serializer.headers,content_type="http/headers")awaitself.push(partial(self.blob_storage.put,headers_blob),ignore_errors=True)message_data["headers"]=headers_blob.id# Eventually store the content blob (later)ifself.pressure<=1:content_blob=Blob.from_data(serializer.body,content_type=event.message.headers.get("content-type"))awaitself.push(partial(self.blob_storage.put,content_blob),ignore_errors=True)message_data["body"]=content_blob.idasyncdefcreate_message():asyncwithself.engine.connect()asconn:awaitconn.execute(insert(SqlMessage).values(**message_data))awaitconn.commit()awaitself.push(create_message)