Files
2026-05-25 09:50:01 +08:00

120 lines
5.3 KiB
Python

"""企查查 HTTP 客户端。"""
from __future__ import annotations
import asyncio
import hashlib
import time
from typing import Any
import httpx
from fastapi_admin.config import (
QICHACHA_APP_KEY,
QICHACHA_BASE_URL,
QICHACHA_DISHONESTY_PATH,
QICHACHA_ENTERPRISE_PATH,
QICHACHA_MAX_RETRIES,
QICHACHA_RETRY_DELAY,
QICHACHA_SECRET_KEY,
QICHACHA_TIMEOUT,
)
from fastapi_common.fastapi_common_logger import logger
from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum
from fastapi_common.fastapi_common_web.exception.QichachaException import QichachaException
class QichachaClient:
"""企查查 HTTP 客户端。"""
def __init__(
self,
AppKey: str | None = None,
SecretKey: str | None = None,
BaseUrl: str | None = None,
EnterprisePath: str | None = None,
DishonestyPath: str | None = None,
Timeout: int | None = None,
MaxRetries: int | None = None,
RetryDelay: float | None = None,
) -> None:
"""初始化客户端配置。"""
self.AppKey = AppKey if AppKey is not None else str(QICHACHA_APP_KEY)
self.SecretKey = SecretKey if SecretKey is not None else str(QICHACHA_SECRET_KEY)
self.BaseUrl = (BaseUrl if BaseUrl is not None else str(QICHACHA_BASE_URL)).rstrip("/")
self.EnterprisePath = EnterprisePath if EnterprisePath is not None else str(QICHACHA_ENTERPRISE_PATH)
self.DishonestyPath = DishonestyPath if DishonestyPath is not None else str(QICHACHA_DISHONESTY_PATH)
self.Timeout = Timeout if Timeout is not None else int(QICHACHA_TIMEOUT)
self.MaxRetries = MaxRetries if MaxRetries is not None else int(QICHACHA_MAX_RETRIES)
self.RetryDelay = RetryDelay if RetryDelay is not None else float(QICHACHA_RETRY_DELAY)
def BuildHeaders(self) -> dict[str, str]:
"""生成企查查鉴权请求头。"""
timespan = str(int(time.time()))
token_source = f"{self.AppKey}{timespan}{self.SecretKey}"
token = hashlib.md5(token_source.encode("utf-8")).hexdigest().upper()
return {"Token": token, "Timespan": timespan}
async def Request(self, Url: str, Params: dict[str, str]) -> dict[str, Any]:
"""发送企查查 GET 请求并返回 JSON。"""
if not self.AppKey:
raise QichachaException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, "企查查 APP_KEY 未配置")
if not self.SecretKey:
raise QichachaException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, "企查查 SECRET_KEY 未配置")
last_error: Exception | None = None
for attempt in range(max(self.MaxRetries, 1)):
try:
async with httpx.AsyncClient(timeout=self.Timeout) as client:
response = await client.get(Url, params=Params, headers=self.BuildHeaders())
response.raise_for_status()
data = response.json()
if not isinstance(data, dict):
raise QichachaException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, "企查查响应格式错误")
status = str(data.get("Status") or "")
if status and status not in {"200", "201"}:
message = str(data.get("Message") or "企查查查询失败")
raise QichachaException(StatusCodeEnum.HTTP_400_BAD_REQUEST, message)
return data
except QichachaException:
raise
except Exception as exc:
last_error = exc
if attempt < max(self.MaxRetries, 1) - 1:
await asyncio.sleep(self.RetryDelay)
logger.error(f"企查查请求失败: url={Url}, error={last_error}")
raise QichachaException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, f"企查查请求失败: {last_error}")
async def GetEnterpriseInfo(self, Keyword: str) -> dict[str, Any] | None:
"""查询企业工商信息。"""
data = await self.Request(
Url=f"{self.BaseUrl}{self.EnterprisePath}",
Params={"key": self.AppKey, "keyword": Keyword},
)
result = data.get("Result")
return result if isinstance(result, dict) else None
async def GetDishonestyInfo(self, Keyword: str) -> dict[str, Any] | None:
"""查询企业失信信息。"""
data = await self.Request(
Url=f"{self.BaseUrl}{self.DishonestyPath}",
Params={"key": self.AppKey, "searchKey": Keyword},
)
result = data.get("Result")
return result if isinstance(result, dict) else None
async def QueryCompany(self, Keyword: str) -> tuple[dict[str, Any] | None, dict[str, Any] | None, str | None, str | None]:
"""并发查询工商信息与失信信息。"""
enterprise_result, dishonesty_result = await asyncio.gather(
self.GetEnterpriseInfo(Keyword),
self.GetDishonestyInfo(Keyword),
)
credit_code = (
str(enterprise_result.get("CreditCode"))
if enterprise_result and enterprise_result.get("CreditCode")
else None
)
company_name = str(enterprise_result.get("Name")) if enterprise_result and enterprise_result.get("Name") else None
return enterprise_result, dishonesty_result, credit_code, company_name