120 lines
5.3 KiB
Python
120 lines
5.3 KiB
Python
"""企查查 HTTP 客户端。"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import time
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
from fastapi_admin.config import (
|
|
QICHACHA_APP_KEY,
|
|
QICHACHA_BASE_URL,
|
|
QICHACHA_DISHONESTY_PATH,
|
|
QICHACHA_ENTERPRISE_PATH,
|
|
QICHACHA_MAX_RETRIES,
|
|
QICHACHA_RETRY_DELAY,
|
|
QICHACHA_SECRET_KEY,
|
|
QICHACHA_TIMEOUT,
|
|
)
|
|
from fastapi_common.fastapi_common_logger import logger
|
|
from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum
|
|
from fastapi_common.fastapi_common_web.exception.QichachaException import QichachaException
|
|
|
|
|
|
class QichachaClient:
|
|
"""企查查 HTTP 客户端。"""
|
|
|
|
def __init__(
|
|
self,
|
|
AppKey: str | None = None,
|
|
SecretKey: str | None = None,
|
|
BaseUrl: str | None = None,
|
|
EnterprisePath: str | None = None,
|
|
DishonestyPath: str | None = None,
|
|
Timeout: int | None = None,
|
|
MaxRetries: int | None = None,
|
|
RetryDelay: float | None = None,
|
|
) -> None:
|
|
"""初始化客户端配置。"""
|
|
self.AppKey = AppKey if AppKey is not None else str(QICHACHA_APP_KEY)
|
|
self.SecretKey = SecretKey if SecretKey is not None else str(QICHACHA_SECRET_KEY)
|
|
self.BaseUrl = (BaseUrl if BaseUrl is not None else str(QICHACHA_BASE_URL)).rstrip("/")
|
|
self.EnterprisePath = EnterprisePath if EnterprisePath is not None else str(QICHACHA_ENTERPRISE_PATH)
|
|
self.DishonestyPath = DishonestyPath if DishonestyPath is not None else str(QICHACHA_DISHONESTY_PATH)
|
|
self.Timeout = Timeout if Timeout is not None else int(QICHACHA_TIMEOUT)
|
|
self.MaxRetries = MaxRetries if MaxRetries is not None else int(QICHACHA_MAX_RETRIES)
|
|
self.RetryDelay = RetryDelay if RetryDelay is not None else float(QICHACHA_RETRY_DELAY)
|
|
|
|
def BuildHeaders(self) -> dict[str, str]:
|
|
"""生成企查查鉴权请求头。"""
|
|
timespan = str(int(time.time()))
|
|
token_source = f"{self.AppKey}{timespan}{self.SecretKey}"
|
|
token = hashlib.md5(token_source.encode("utf-8")).hexdigest().upper()
|
|
return {"Token": token, "Timespan": timespan}
|
|
|
|
async def Request(self, Url: str, Params: dict[str, str]) -> dict[str, Any]:
|
|
"""发送企查查 GET 请求并返回 JSON。"""
|
|
if not self.AppKey:
|
|
raise QichachaException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, "企查查 APP_KEY 未配置")
|
|
if not self.SecretKey:
|
|
raise QichachaException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, "企查查 SECRET_KEY 未配置")
|
|
|
|
last_error: Exception | None = None
|
|
for attempt in range(max(self.MaxRetries, 1)):
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.Timeout) as client:
|
|
response = await client.get(Url, params=Params, headers=self.BuildHeaders())
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
if not isinstance(data, dict):
|
|
raise QichachaException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, "企查查响应格式错误")
|
|
status = str(data.get("Status") or "")
|
|
if status and status not in {"200", "201"}:
|
|
message = str(data.get("Message") or "企查查查询失败")
|
|
raise QichachaException(StatusCodeEnum.HTTP_400_BAD_REQUEST, message)
|
|
return data
|
|
except QichachaException:
|
|
raise
|
|
except Exception as exc:
|
|
last_error = exc
|
|
if attempt < max(self.MaxRetries, 1) - 1:
|
|
await asyncio.sleep(self.RetryDelay)
|
|
|
|
logger.error(f"企查查请求失败: url={Url}, error={last_error}")
|
|
raise QichachaException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, f"企查查请求失败: {last_error}")
|
|
|
|
async def GetEnterpriseInfo(self, Keyword: str) -> dict[str, Any] | None:
|
|
"""查询企业工商信息。"""
|
|
data = await self.Request(
|
|
Url=f"{self.BaseUrl}{self.EnterprisePath}",
|
|
Params={"key": self.AppKey, "keyword": Keyword},
|
|
)
|
|
result = data.get("Result")
|
|
return result if isinstance(result, dict) else None
|
|
|
|
async def GetDishonestyInfo(self, Keyword: str) -> dict[str, Any] | None:
|
|
"""查询企业失信信息。"""
|
|
data = await self.Request(
|
|
Url=f"{self.BaseUrl}{self.DishonestyPath}",
|
|
Params={"key": self.AppKey, "searchKey": Keyword},
|
|
)
|
|
result = data.get("Result")
|
|
return result if isinstance(result, dict) else None
|
|
|
|
async def QueryCompany(self, Keyword: str) -> tuple[dict[str, Any] | None, dict[str, Any] | None, str | None, str | None]:
|
|
"""并发查询工商信息与失信信息。"""
|
|
enterprise_result, dishonesty_result = await asyncio.gather(
|
|
self.GetEnterpriseInfo(Keyword),
|
|
self.GetDishonestyInfo(Keyword),
|
|
)
|
|
credit_code = (
|
|
str(enterprise_result.get("CreditCode"))
|
|
if enterprise_result and enterprise_result.get("CreditCode")
|
|
else None
|
|
)
|
|
company_name = str(enterprise_result.get("Name")) if enterprise_result and enterprise_result.get("Name") else None
|
|
return enterprise_result, dishonesty_result, credit_code, company_name
|