Source code for harp_apps.storage.services.blob_storages.redis
from typing import override
from redis.asyncio import Redis
from harp.models import Blob
from harp_apps.storage.types import IBlobStorage
[docs]
class RedisBlobStorage(IBlobStorage):
type = "redis"
[docs]
def __init__(self, client: Redis):
self.client = client
[docs]
@override
async def get(self, blob_id):
data = await self.client.get(blob_id)
if data is not None:
content_type, body = data.split(b"\n", 1)
return Blob(
id=blob_id,
data=body,
content_type=content_type.decode() if content_type else None,
)
[docs]
@override
async def put(self, blob: Blob) -> Blob:
await self.client.set(blob.id, ((blob.content_type or "") + "\n").encode() + blob.data)
return blob
force_put = put
[docs]
@override
async def delete(self, blob_id):
await self.client.delete(blob_id)
[docs]
@override
async def exists(self, blob_id: str) -> bool:
return await self.client.exists(blob_id)