Files
iOSAI/Utils/JsonUtils.py
2025-11-19 17:23:41 +08:00

250 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
from pathlib import Path
import portalocker as locker # ① 引入跨平台锁
class JsonUtils:
@staticmethod
def _normalize_filename(filename: str) -> str:
"""
确保文件名以 .json 结尾
"""
if not filename.endswith(".json"):
filename = f"{filename}.json"
return filename
@staticmethod
def _get_data_path(filename: str) -> str:
"""
根据文件名生成 data 目录下的完整路径
"""
filename = JsonUtils._normalize_filename(filename)
base_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) # 当前项目根目录
data_dir = os.path.join(base_dir, "data")
Path(data_dir).mkdir(parents=True, exist_ok=True) # 确保 data 目录存在
return os.path.join(data_dir, filename)
@staticmethod
def read_json(filename: str) -> dict:
"""
读取 JSON 文件,返回字典
如果文件不存在,返回空字典
"""
file_path = JsonUtils._get_data_path(filename)
try:
if not os.path.exists(file_path):
return {}
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
return data if isinstance(data, dict) else {}
except Exception as e:
print(f"读取 JSON 文件失败: {e}")
return {}
@staticmethod
def write_json(filename: str, data: dict, overwrite: bool = True) -> bool:
"""
将字典写入 JSON 文件
:param filename: 文件名(不用写后缀,自动补 .json
:param data: 要写入的字典
:param overwrite: True=覆盖写False=合并更新
"""
file_path = JsonUtils._get_data_path(filename)
try:
if not overwrite and os.path.exists(file_path):
with open(file_path, "r", encoding="utf-8") as f:
old_data = json.load(f)
if not isinstance(old_data, dict):
old_data = {}
old_data.update(data)
data = old_data
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
return True
except Exception as e:
print(f"写入 JSON 文件失败: {e}")
return False
@staticmethod
def update_json(filename: str, new_data: dict) -> bool:
"""
修改 JSON 文件:
- 如果 key 已存在,则修改其值
- 如果 key 不存在,则新增
"""
try:
data = JsonUtils.read_json(filename)
if not isinstance(data, dict):
data = {}
data.update(new_data)
return JsonUtils.write_json(filename, data)
except Exception as e:
print(f"更新 JSON 文件失败: {e}")
return False
@staticmethod
def delete_json_key(filename: str, key: str) -> bool:
"""
删除 JSON 文件中的某个 key
"""
try:
data = JsonUtils.read_json(filename)
if not isinstance(data, dict):
data = {}
if key in data:
del data[key]
return JsonUtils.write_json(filename, data)
except Exception as e:
print(f"删除 JSON key 失败: {e}")
return False
# "-------------------------------------------------"
@classmethod
def _read_json_list(cls, file_path: Path) -> list:
try:
if not file_path.exists():
return []
with file_path.open("r", encoding="utf-8") as f:
data = json.load(f)
return data if isinstance(data, list) else []
except Exception:
return []
@classmethod
def _write_json_list(cls, file_path: Path, data: list) -> None:
file_path.parent.mkdir(parents=True, exist_ok=True)
with file_path.open("w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
# --- 新增:通用追加(不做字段校验) ---
# @classmethod
# def append_json_items(cls, items, filename="log/last_message.json"):
# """
# 将 dict 或 [dict, ...] 追加到 JSON 文件(数组)中;不校验字段。
# """
# file_path = Path(filename)
# data = cls._read_json_list(file_path)
#
# # 统一成 list
# if isinstance(items, dict):
# items = [items]
# elif not isinstance(items, list):
# # 既不是 dict 也不是 list直接忽略
# return
#
# # 只接受字典项
# items = [it for it in items if isinstance(it, dict)]
# if not items:
# return
#
# data.extend(items)
#
# # LogManager.method_info(filename,"路径")
# cls._write_json_list(file_path, data)
@classmethod
def append_json_items(cls, items, filename="log/last_message.json"):
file_path = Path(filename)
data = cls._read_json_list(file_path)
# 统一成 list
if isinstance(items, dict):
items = [items]
elif not isinstance(items, list):
return
# 只保留 sender 非空的字典
items = [
it for it in items
if isinstance(it, dict) and it.get("sender") != ""
]
if not items:
return
data.extend(items)
cls._write_json_list(file_path, data)
@classmethod
def update_json_items(cls, match: dict, patch: dict, filename="log/last_message.json", multi: bool = True) -> int:
"""
修改 JSON 文件(数组)中符合条件的项
:param match: 匹配条件(如 {"sender": "xxx"}
:param patch: 要修改/更新的字段(如 {"status": 1}
:param filename: JSON 文件路径
:param multi: True=修改所有匹配项False=只修改第一项
:return: 修改的条数
"""
file_path = Path(filename)
data = cls._read_json_list(file_path)
if not isinstance(match, dict) or not isinstance(patch, dict):
return 0
updated = 0
for idx, item in enumerate(data):
if not isinstance(item, dict):
continue
# 判断是否匹配
if all(item.get(k) == v for k, v in match.items()):
data[idx].update(patch)
updated += 1
if not multi:
break
if updated > 0:
cls._write_json_list(file_path, data)
return updated
@classmethod
def query_all_json_items(cls, filename="log/last_message.json") -> list:
"""
查询 JSON 文件(数组)中的所有项,并剔除 sender 为空的记录
:param filename: JSON 文件路径
:return: list可能为空
"""
file_path = Path(filename)
data = cls._read_json_list(file_path)
if not isinstance(data, list):
return []
# 过滤 sender 和 text 为空字符串的项
return [item for item in data if isinstance(item, dict) and item.get("sender", "").strip() and item.get("text", "").strip()]
@classmethod
def delete_json_items(cls,
match: dict,
filename: str = "log/last_message.json",
multi: bool = True) -> int:
file_path = Path(filename)
with file_path.open('r+', encoding='utf-8') as f:
locker.lock(f, locker.LOCK_EX) # ② 加独占锁Windows/Linux 通用)
try:
data = json.load(f)
if not isinstance(match, dict):
return 0
deleted = 0
new_data = []
for item in data:
if isinstance(item, dict) and all(item.get(k) == v for k, v in match.items()):
if multi or deleted == 0: # 删多条 / 第一条
deleted += 1
continue
new_data.append(item)
if deleted:
f.seek(0)
json.dump(new_data, f, ensure_ascii=False, indent=2)
f.truncate()
return deleted
finally:
locker.unlock(f) # ③ 解锁