"""企查查服务实现。""" from __future__ import annotations from fastapi_admin.config import QICHACHA_CACHE_DAYS from fastapi_common.fastapi_common_logger import logger from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum from fastapi_common.fastapi_common_web.exception.QichachaException import QichachaException from fastapi_modules.fastapi_leaudit.domian.vo.qichachaVo import ( QichachaBatchQueryVO, QichachaCompanyQueryVO, QichachaRecordStatusVO, ) from fastapi_modules.fastapi_leaudit.models.qichachaCompanyInfo import QichachaCompanyInfo from fastapi_modules.fastapi_leaudit.services.impl.qichachaClient import QichachaClient from fastapi_modules.fastapi_leaudit.services.impl.qichachaVoAssembler import QichachaVoAssembler from fastapi_modules.fastapi_leaudit.services.qichachaService import IQichachaService class QichachaServiceImpl(IQichachaService): """企查查服务实现。""" def __init__(self, Client: QichachaClient | None = None, CacheDays: int | None = None) -> None: """初始化企查查服务。""" self.Client = Client if Client is not None else QichachaClient() self.CacheDays = CacheDays if CacheDays is not None else int(QICHACHA_CACHE_DAYS) async def QueryCompany(self, Keyword: str, ForceRefresh: bool = False) -> QichachaCompanyQueryVO: """查询企业完整信息。""" keyword = Keyword.strip() if not keyword: raise QichachaException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "查询关键词不能为空") async with GetAsyncSession() as session: record = await QichachaCompanyInfo.FindByKeyword(session, keyword) if record is not None and not ForceRefresh and QichachaCompanyInfo.GetAgeDays(record) <= self.CacheDays: return QichachaCompanyQueryVO( success=True, message="查询成功", data=QichachaVoAssembler.BuildCompanyInfo(record), ) enterprise, dishonesty, credit_code, company_name = await self.Client.QueryCompany(keyword) record = await QichachaCompanyInfo.Upsert( session, SearchKey=keyword, CreditCode=credit_code, CompanyName=company_name, Enterprise=enterprise, Dishonesty=dishonesty, ) logger.info(f"企查查企业信息已更新: {keyword}") return QichachaCompanyQueryVO( success=True, message="查询成功", data=QichachaVoAssembler.BuildCompanyInfo(record), ) async def QueryEnterpriseOnly(self, Keyword: str, ForceRefresh: bool = False) -> QichachaCompanyQueryVO: """仅查询企业工商信息。""" keyword = Keyword.strip() async with GetAsyncSession() as session: record = await QichachaCompanyInfo.FindByKeyword(session, keyword) if ( record is not None and record.enterprise is not None and not ForceRefresh and QichachaCompanyInfo.GetAgeDays(record) <= self.CacheDays ): return QichachaCompanyQueryVO( success=True, message="查询成功", data=QichachaVoAssembler.BuildCompanyInfo(record), ) enterprise = await self.Client.GetEnterpriseInfo(keyword) credit_code = str(enterprise.get("CreditCode")) if enterprise and enterprise.get("CreditCode") else None company_name = str(enterprise.get("Name")) if enterprise and enterprise.get("Name") else None record = await QichachaCompanyInfo.Upsert( session, SearchKey=keyword, CreditCode=credit_code, CompanyName=company_name, Enterprise=enterprise, Dishonesty=record.dishonesty if record is not None else None, ) return QichachaCompanyQueryVO( success=True, message="查询成功", data=QichachaVoAssembler.BuildCompanyInfo(record), ) async def QueryDishonestyOnly(self, Keyword: str, ForceRefresh: bool = False) -> QichachaCompanyQueryVO: """仅查询企业失信信息。""" keyword = Keyword.strip() async with GetAsyncSession() as session: record = await QichachaCompanyInfo.FindByKeyword(session, keyword) if ( record is not None and record.dishonesty is not None and not ForceRefresh and QichachaCompanyInfo.GetAgeDays(record) <= self.CacheDays ): return QichachaCompanyQueryVO( success=True, message="查询成功", data=QichachaVoAssembler.BuildCompanyInfo(record), ) dishonesty = await self.Client.GetDishonestyInfo(keyword) record = await QichachaCompanyInfo.Upsert( session, SearchKey=keyword, CreditCode=record.creditCode if record is not None else None, CompanyName=record.companyName if record is not None else keyword, Enterprise=record.enterprise if record is not None else None, Dishonesty=dishonesty, ) return QichachaCompanyQueryVO( success=True, message="查询成功", data=QichachaVoAssembler.BuildCompanyInfo(record), ) async def BatchQuery(self, Keywords: list[str], ForceRefresh: bool = False) -> QichachaBatchQueryVO: """批量查询企业信息。""" results: list[QichachaCompanyQueryVO] = [] for keyword in Keywords: try: results.append(await self.QueryCompany(keyword, ForceRefresh)) except Exception as exc: results.append( QichachaCompanyQueryVO( success=False, message=str(exc), data=None, errorCode="QICHACHA_QUERY_FAILED", ) ) success_count = len([item for item in results if item.success]) return QichachaBatchQueryVO( success=success_count == len(results), total=len(results), successCount=success_count, failedCount=len(results) - success_count, results=results, ) async def GetRecordStatus(self, Keyword: str) -> QichachaRecordStatusVO: """查询企业缓存状态。""" keyword = Keyword.strip() async with GetAsyncSession() as session: record = await QichachaCompanyInfo.FindByKeyword(session, keyword) if record is None: return QichachaRecordStatusVO( exists=False, searchKey=keyword, refreshThresholdDays=self.CacheDays, needRefresh=True, ) age_days = QichachaCompanyInfo.GetAgeDays(record) return QichachaRecordStatusVO( exists=True, searchKey=record.searchKey, creditCode=record.creditCode, companyName=record.companyName, hasEnterprise=record.enterprise is not None, hasDishonesty=record.dishonesty is not None, updatedAt=QichachaVoAssembler.FormatDatetime(record.updated_at), ageDays=age_days, refreshThresholdDays=self.CacheDays, needRefresh=age_days > self.CacheDays, )