"""企查查 HTTP 客户端。""" from __future__ import annotations import asyncio import hashlib import time from typing import Any import httpx from fastapi_admin.config import ( QICHACHA_APP_KEY, QICHACHA_BASE_URL, QICHACHA_DISHONESTY_PATH, QICHACHA_ENTERPRISE_PATH, QICHACHA_MAX_RETRIES, QICHACHA_RETRY_DELAY, QICHACHA_SECRET_KEY, QICHACHA_TIMEOUT, ) from fastapi_common.fastapi_common_logger import logger from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum from fastapi_common.fastapi_common_web.exception.QichachaException import QichachaException class QichachaClient: """企查查 HTTP 客户端。""" def __init__( self, AppKey: str | None = None, SecretKey: str | None = None, BaseUrl: str | None = None, EnterprisePath: str | None = None, DishonestyPath: str | None = None, Timeout: int | None = None, MaxRetries: int | None = None, RetryDelay: float | None = None, ) -> None: """初始化客户端配置。""" self.AppKey = AppKey if AppKey is not None else str(QICHACHA_APP_KEY) self.SecretKey = SecretKey if SecretKey is not None else str(QICHACHA_SECRET_KEY) self.BaseUrl = (BaseUrl if BaseUrl is not None else str(QICHACHA_BASE_URL)).rstrip("/") self.EnterprisePath = EnterprisePath if EnterprisePath is not None else str(QICHACHA_ENTERPRISE_PATH) self.DishonestyPath = DishonestyPath if DishonestyPath is not None else str(QICHACHA_DISHONESTY_PATH) self.Timeout = Timeout if Timeout is not None else int(QICHACHA_TIMEOUT) self.MaxRetries = MaxRetries if MaxRetries is not None else int(QICHACHA_MAX_RETRIES) self.RetryDelay = RetryDelay if RetryDelay is not None else float(QICHACHA_RETRY_DELAY) def BuildHeaders(self) -> dict[str, str]: """生成企查查鉴权请求头。""" timespan = str(int(time.time())) token_source = f"{self.AppKey}{timespan}{self.SecretKey}" token = hashlib.md5(token_source.encode("utf-8")).hexdigest().upper() return {"Token": token, "Timespan": timespan} async def Request(self, Url: str, Params: dict[str, str]) -> dict[str, Any]: """发送企查查 GET 请求并返回 JSON。""" if not self.AppKey: raise QichachaException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, "企查查 APP_KEY 未配置") if not self.SecretKey: raise QichachaException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, "企查查 SECRET_KEY 未配置") last_error: Exception | None = None for attempt in range(max(self.MaxRetries, 1)): try: async with httpx.AsyncClient(timeout=self.Timeout) as client: response = await client.get(Url, params=Params, headers=self.BuildHeaders()) response.raise_for_status() data = response.json() if not isinstance(data, dict): raise QichachaException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, "企查查响应格式错误") status = str(data.get("Status") or "") if status and status not in {"200", "201"}: message = str(data.get("Message") or "企查查查询失败") raise QichachaException(StatusCodeEnum.HTTP_400_BAD_REQUEST, message) return data except QichachaException: raise except Exception as exc: last_error = exc if attempt < max(self.MaxRetries, 1) - 1: await asyncio.sleep(self.RetryDelay) logger.error(f"企查查请求失败: url={Url}, error={last_error}") raise QichachaException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, f"企查查请求失败: {last_error}") async def GetEnterpriseInfo(self, Keyword: str) -> dict[str, Any] | None: """查询企业工商信息。""" data = await self.Request( Url=f"{self.BaseUrl}{self.EnterprisePath}", Params={"key": self.AppKey, "keyword": Keyword}, ) result = data.get("Result") return result if isinstance(result, dict) else None async def GetDishonestyInfo(self, Keyword: str) -> dict[str, Any] | None: """查询企业失信信息。""" data = await self.Request( Url=f"{self.BaseUrl}{self.DishonestyPath}", Params={"key": self.AppKey, "searchKey": Keyword}, ) result = data.get("Result") return result if isinstance(result, dict) else None async def QueryCompany(self, Keyword: str) -> tuple[dict[str, Any] | None, dict[str, Any] | None, str | None, str | None]: """并发查询工商信息与失信信息。""" enterprise_result, dishonesty_result = await asyncio.gather( self.GetEnterpriseInfo(Keyword), self.GetDishonestyInfo(Keyword), ) credit_code = ( str(enterprise_result.get("CreditCode")) if enterprise_result and enterprise_result.get("CreditCode") else None ) company_name = str(enterprise_result.get("Name")) if enterprise_result and enterprise_result.get("Name") else None return enterprise_result, dishonesty_result, credit_code, company_name