"""OSS 服务实现。""" from fastapi_common.fastapi_common_storage.oss_client import OssClient from fastapi_modules.fastapi_leaudit.services.ossService import IOssService class OssServiceImpl(IOssService): """OSS 服务实现。""" def __init__(self, Client: OssClient | None = None) -> None: self.Client = Client or OssClient() async def DownloadBytes(self, Source: str, Bucket: str | None = None) -> bytes: """下载对象内容。""" return self.Client.DownloadBytes(Source=Source, Bucket=Bucket) async def DownloadToTempFile( self, Source: str, Suffix: str = "", Prefix: str = "oss-", Bucket: str | None = None, ) -> str: """下载对象到本地临时文件。""" return self.Client.DownloadToTempFile( Source=Source, Suffix=Suffix, Prefix=Prefix, Bucket=Bucket, ) async def UploadBytes( self, ObjectKey: str, Content: bytes, ContentType: str = "application/octet-stream", Bucket: str | None = None, ) -> str: """上传二进制内容。""" return self.Client.UploadBytes( ObjectKey=ObjectKey, Content=Content, ContentType=ContentType, Bucket=Bucket, ) async def UploadText( self, ObjectKey: str, Content: str, ContentType: str = "text/plain; charset=utf-8", Bucket: str | None = None, ) -> str: """上传文本内容。""" return self.Client.UploadText( ObjectKey=ObjectKey, Content=Content, ContentType=ContentType, Bucket=Bucket, ) async def ObjectExists(self, Source: str, Bucket: str | None = None) -> bool: """判断对象是否存在。""" return self.Client.ObjectExists(Source=Source, Bucket=Bucket) async def PresignGetUrl(self, Source: str, Bucket: str | None = None) -> str: """生成对象下载签名 URL。""" return self.Client.PresignGetUrl(Source=Source, Bucket=Bucket)