20250911-初步功能已完成
This commit is contained in:
@@ -709,4 +709,4 @@ class AiUtils(object):
|
||||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||||
except Exception as e:
|
||||
LogManager.error(f"[delete_anchors_by_ids] 写入失败: {e}")
|
||||
return deleted
|
||||
return deleted
|
||||
@@ -2,6 +2,8 @@ import os
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
from Utils.LogManager import LogManager
|
||||
|
||||
|
||||
class JsonUtils:
|
||||
@staticmethod
|
||||
@@ -98,3 +100,129 @@ class JsonUtils:
|
||||
except Exception as e:
|
||||
print(f"删除 JSON key 失败: {e}")
|
||||
return False
|
||||
# "-------------------------------------------------"
|
||||
@classmethod
|
||||
def _read_json_list(cls, file_path: Path) -> list:
|
||||
try:
|
||||
if not file_path.exists():
|
||||
return []
|
||||
with file_path.open("r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
return data if isinstance(data, list) else []
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
@classmethod
|
||||
def _write_json_list(cls, file_path: Path, data: list) -> None:
|
||||
file_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with file_path.open("w", encoding="utf-8") as f:
|
||||
json.dump(data, f, ensure_ascii=False, indent=4)
|
||||
|
||||
# --- 新增:通用追加(不做字段校验) ---
|
||||
@classmethod
|
||||
def append_json_items(cls, items, filename="log/last_message.json"):
|
||||
"""
|
||||
将 dict 或 [dict, ...] 追加到 JSON 文件(数组)中;不校验字段。
|
||||
"""
|
||||
file_path = Path(filename)
|
||||
data = cls._read_json_list(file_path)
|
||||
|
||||
# 统一成 list
|
||||
if isinstance(items, dict):
|
||||
items = [items]
|
||||
elif not isinstance(items, list):
|
||||
# 既不是 dict 也不是 list,直接忽略
|
||||
return
|
||||
|
||||
# 只接受字典项
|
||||
items = [it for it in items if isinstance(it, dict)]
|
||||
if not items:
|
||||
return
|
||||
|
||||
data.extend(items)
|
||||
|
||||
LogManager.method_info(filename,"路径")
|
||||
cls._write_json_list(file_path, data)
|
||||
|
||||
|
||||
@classmethod
|
||||
def update_json_items(cls, match: dict, patch: dict, filename="log/last_message.json", multi: bool = True) -> int:
|
||||
"""
|
||||
修改 JSON 文件(数组)中符合条件的项
|
||||
:param match: 匹配条件(如 {"sender": "xxx"})
|
||||
:param patch: 要修改/更新的字段(如 {"status": 1})
|
||||
:param filename: JSON 文件路径
|
||||
:param multi: True=修改所有匹配项,False=只修改第一项
|
||||
:return: 修改的条数
|
||||
"""
|
||||
file_path = Path(filename)
|
||||
data = cls._read_json_list(file_path)
|
||||
|
||||
if not isinstance(match, dict) or not isinstance(patch, dict):
|
||||
return 0
|
||||
|
||||
updated = 0
|
||||
for idx, item in enumerate(data):
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
|
||||
# 判断是否匹配
|
||||
if all(item.get(k) == v for k, v in match.items()):
|
||||
data[idx].update(patch)
|
||||
updated += 1
|
||||
if not multi:
|
||||
break
|
||||
|
||||
if updated > 0:
|
||||
cls._write_json_list(file_path, data)
|
||||
|
||||
return updated
|
||||
|
||||
@classmethod
|
||||
def query_all_json_items(cls, filename="log/last_message.json") -> list:
|
||||
"""
|
||||
查询 JSON 文件(数组)中的所有项
|
||||
:param filename: JSON 文件路径
|
||||
:return: list,可能为空
|
||||
"""
|
||||
file_path = Path(filename)
|
||||
print(file_path)
|
||||
data = cls._read_json_list(file_path)
|
||||
return data if isinstance(data, list) else []
|
||||
|
||||
|
||||
|
||||
@classmethod
|
||||
def delete_json_items(cls, match: dict, filename="log/last_message.json", multi: bool = True) -> int:
|
||||
"""
|
||||
删除 JSON 文件(数组)中符合条件的项
|
||||
:param match: 匹配条件(如 {"sender": "xxx"})
|
||||
:param filename: JSON 文件路径
|
||||
:param multi: True=删除所有匹配项,False=只删除第一项
|
||||
:return: 删除的条数
|
||||
"""
|
||||
file_path = Path(filename)
|
||||
data = cls._read_json_list(file_path)
|
||||
|
||||
if not isinstance(match, dict):
|
||||
return 0
|
||||
|
||||
deleted = 0
|
||||
new_data = []
|
||||
for item in data:
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
# 是否匹配
|
||||
if all(item.get(k) == v for k, v in match.items()):
|
||||
deleted += 1
|
||||
if not multi and deleted > 0:
|
||||
# 只删除一条 → 剩下的全保留
|
||||
new_data.extend(data[data.index(item) + 1:])
|
||||
break
|
||||
continue
|
||||
new_data.append(item)
|
||||
|
||||
if deleted > 0:
|
||||
cls._write_json_list(file_path, new_data)
|
||||
|
||||
return deleted
|
||||
|
||||
Reference in New Issue
Block a user