Files
iOSAI/Utils/TencentOCRUtils.py
2025-10-22 18:24:43 +08:00

328 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import base64
import hashlib
import hmac
import json
import os
import re
import socket
import time
from datetime import datetime, timezone
from http.client import HTTPSConnection
from typing import Any, Dict, List, Optional
Point = Dict[str, int]
ItemPolygon = Dict[str, int]
class TencentOCR:
"""腾讯云 OCR 封装,自动从环境变量或配置文件加载密钥"""
@staticmethod
def _load_secret() -> Dict[str, str]:
# 优先从环境变量读取
sid = "AKIDXw86q6D8pJYZOEvOm25wZy96oIZcQ1OX"
skey = "ye7MNAj4ub5PVO2TmriLkwtc8QTItGPO"
# 如果没有,就尝试从 ~/.tencent_ocr.json 加载
if not sid or not skey:
cfg_path = os.path.expanduser("~/.tencent_ocr.json")
if os.path.exists(cfg_path):
with open(cfg_path, "r", encoding="utf-8") as f:
cfg = json.load(f)
sid = sid or cfg.get("secret_id")
skey = skey or cfg.get("secret_key")
if not sid or not skey:
raise RuntimeError(
"❌ 未找到腾讯云 OCR 密钥,请设置环境变量 TENCENT_SECRET_ID / TENCENT_SECRET_KEY"
"或在用户目录下创建 ~/.tencent_ocr.json格式{\"secret_id\":\"...\",\"secret_key\":\"...\"}"
)
return {"secret_id": sid, "secret_key": skey}
@staticmethod
def _hmac_sha256(key: bytes, msg: str) -> bytes:
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
@staticmethod
def _strip_data_uri_prefix(b64: str) -> str:
if "," in b64 and b64.strip().lower().startswith("data:"):
return b64.split(",", 1)[1]
return b64
@staticmethod
def _now_ts_and_date():
ts = int(time.time())
date = datetime.fromtimestamp(ts, tz=timezone.utc).strftime("%Y-%m-%d")
return ts, date
@staticmethod
def recognize(
*,
image_path: Optional[str] = None,
image_bytes: Optional[bytes] = None,
image_url: Optional[str] = None,
region: Optional[str] = None,
token: Optional[str] = None,
action: str = "GeneralBasicOCR",
version: str = "2018-11-19",
service: str = "ocr",
host: str = "ocr.tencentcloudapi.com",
timeout: int = 15,
) -> Dict[str, Any]:
"""
调用腾讯云 OCR三选一image_path / image_bytes / image_url
自动加载密钥(优先环境变量 -> ~/.tencent_ocr.json
"""
# 读取密钥
sec = TencentOCR._load_secret()
secret_id = sec["secret_id"]
secret_key = sec["secret_key"]
assert sum(v is not None for v in (image_path, image_bytes, image_url)) == 1, \
"必须且只能提供 image_path / image_bytes / image_url 之一"
# 1. payload
payload: Dict[str, Any] = {}
if image_url:
payload["ImageUrl"] = image_url
else:
if image_bytes is None:
with open(image_path, "rb") as f:
image_bytes = f.read()
img_b64 = base64.b64encode(image_bytes).decode("utf-8")
img_b64 = TencentOCR._strip_data_uri_prefix(img_b64)
payload["ImageBase64"] = img_b64
payload_str = json.dumps(payload, ensure_ascii=False, separators=(",", ":"))
# 2. 参数准备
algorithm = "TC3-HMAC-SHA256"
http_method = "POST"
canonical_uri = "/"
canonical_querystring = ""
content_type = "application/json; charset=utf-8"
signed_headers = "content-type;host;x-tc-action"
timestamp, date = TencentOCR._now_ts_and_date()
credential_scope = f"{date}/{service}/tc3_request"
# 3. 规范请求串
canonical_headers = (
f"content-type:{content_type}\n"
f"host:{host}\n"
f"x-tc-action:{action.lower()}\n"
)
hashed_request_payload = hashlib.sha256(payload_str.encode("utf-8")).hexdigest()
canonical_request = (
f"{http_method}\n{canonical_uri}\n{canonical_querystring}\n"
f"{canonical_headers}\n{signed_headers}\n{hashed_request_payload}"
)
# 4. 签名
hashed_canonical_request = hashlib.sha256(canonical_request.encode("utf-8")).hexdigest()
string_to_sign = (
f"{algorithm}\n{timestamp}\n{credential_scope}\n{hashed_canonical_request}"
)
secret_date = TencentOCR._hmac_sha256(("TC3" + secret_key).encode("utf-8"), date)
secret_service = hmac.new(secret_date, service.encode("utf-8"), hashlib.sha256).digest()
secret_signing = hmac.new(secret_service, b"tc3_request", hashlib.sha256).digest()
signature = hmac.new(secret_signing, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
authorization = (
f"{algorithm} "
f"Credential={secret_id}/{credential_scope}, "
f"SignedHeaders={signed_headers}, "
f"Signature={signature}"
)
# 5. headers
headers = {
"Authorization": authorization,
"Content-Type": content_type,
"Host": host,
"X-TC-Action": action,
"X-TC-Timestamp": str(timestamp),
"X-TC-Version": version,
}
if region:
headers["X-TC-Region"] = region
if token:
headers["X-TC-Token"] = token
# 6. 发请求
try:
conn = HTTPSConnection(host, timeout=timeout)
conn.request("POST", "/", body=payload_str.encode("utf-8"), headers=headers)
resp = conn.getresponse()
raw = resp.read().decode("utf-8", errors="replace")
try:
data = json.loads(raw)
except Exception:
data = {"NonJSONBody": raw}
return {
"http_status": resp.status,
"http_reason": resp.reason,
"headers": dict(resp.getheaders()),
"body": data,
}
except socket.gaierror as e:
return {"error": "DNS_RESOLUTION_FAILED", "detail": str(e)}
except socket.timeout:
return {"error": "NETWORK_TIMEOUT", "detail": f"Timeout after {timeout}s"}
except Exception as e:
return {"error": "REQUEST_FAILED", "detail": str(e)}
finally:
try:
conn.close()
except Exception:
pass
@staticmethod
def _norm(s: str) -> str:
return (s or "").strip().lstrip("@").lower()
@staticmethod
def _rect_from_polygon(poly: List[Point]) -> Optional[ItemPolygon]:
if not poly:
return None
xs = [p["X"] for p in poly]
ys = [p["Y"] for p in poly]
return {"X": min(xs), "Y": min(ys), "Width": max(xs) - min(xs), "Height": max(ys) - min(ys)}
@classmethod
def find_last_name_bbox(cls, ocr: Dict[str, Any], name: str) -> Optional[Dict[str, Any]]:
"""
从 OCR JSON 中找到指定名字的“最后一次”出现并返回坐标信息。
:param ocr: 完整 OCR JSON含 Response.TextDetections
:param name: 前端传入的名字,比如 'lee39160'
:return: dict 或 None例如
{
"index": 21,
"text": "lee39160",
"item": {"X": 248, "Y": 1701, "Width": 214, "Height": 49},
"polygon": [...],
"center": {"x": 355.0, "y": 1725.5}
}
"""
dets = (ocr.get("body") or ocr).get("Response", {}).get("TextDetections", [])
if not dets or not name:
return None
target = cls._norm(name)
found = -1
# 从后往前找最后一个严格匹配
for i in range(len(dets) - 1, -1, -1):
txt = cls._norm(dets[i].get("DetectedText", ""))
if txt == target:
found = i
break
# 兜底:再匹配原始文本(可能带 @
if found == -1:
for i in range(len(dets) - 1, -1, -1):
raw = (dets[i].get("DetectedText") or "").strip().lower()
if raw.lstrip("@") == target:
found = i
break
if found == -1:
return None
det = dets[found]
item: Optional[ItemPolygon] = det.get("ItemPolygon")
poly: List[Point] = det.get("Polygon") or []
# 没有 ItemPolygon 就从 Polygon 算
if not item:
item = cls._rect_from_polygon(poly)
if not item:
return None
center = {"x": item["X"] + item["Width"] / 2.0, "y": item["Y"] + item["Height"] / 2.0}
return {
"index": found,
"text": det.get("DetectedText", ""),
"item": item,
"polygon": poly,
"center": center,
}
@staticmethod
def _get_detections(ocr: Dict[str, Any]) -> List[Dict[str, Any]]:
"""兼容含 body 层的 OCR 结构,提取 TextDetections 列表"""
return (ocr.get("body") or ocr).get("Response", {}).get("TextDetections", []) or []
@staticmethod
def _norm_txt(s: str) -> str:
"""清洗文本:去空格"""
return (s or "").strip()
@classmethod
def slice_texts_between(
cls,
ocr: Dict[str, Any],
start_keyword: str = "切换账号",
end_keyword: str = "添加账号",
*,
username_like: bool = False, # True 时只保留像用户名的文本
min_conf: int = 0 # 置信度下限
) -> List[Dict[str, Any]]:
"""
返回位于 start_keyword 与 end_keyword 之间的所有文本项(不含两端),
每项保留原始 DetectedText、Confidence、ItemPolygon 等信息。
"""
dets = cls._get_detections(ocr)
if not dets:
return []
# 找“切换账号”最后一次出现的下标
start_idx = -1
for i, d in enumerate(dets):
txt = cls._norm_txt(d.get("DetectedText", ""))
if txt == start_keyword:
start_idx = i
# 找“添加账号”第一次出现的下标
end_idx = -1
for i, d in enumerate(dets):
txt = cls._norm_txt(d.get("DetectedText", ""))
if txt == end_keyword:
end_idx = i
break
if start_idx == -1 or end_idx == -1 or end_idx <= start_idx:
return []
# 提取两者之间的内容
mid = []
for d in dets[start_idx + 1:end_idx]:
if int(d.get("Confidence", 0)) < min_conf:
continue
txt = cls._norm_txt(d.get("DetectedText", ""))
if not txt:
continue
mid.append(d)
if not username_like:
return mid
# 只保留像用户名的文本
pat = re.compile(r"^[A-Za-z0-9_.-]{3,}$")
filtered = [d for d in mid if pat.match(cls._norm_txt(d.get("DetectedText", "")))]
return filtered
if __name__ == "__main__":
result = TencentOCR.recognize(
image_path=r"C:\Users\zhangkai\Desktop\last-item\iosai\test.png",
action="GeneralAccurateOCR",
)
print(json.dumps(result, ensure_ascii=False, indent=2))