import json import os from pathlib import Path from typing import Dict, Any import portalocker as locker # ① 引入跨平台锁 class JsonUtils: @staticmethod def _normalize_filename(filename: str) -> str: """ 确保文件名以 .json 结尾 """ if not filename.endswith(".json"): filename = f"{filename}.json" return filename @staticmethod def _get_data_path(filename: str) -> str: """ 根据文件名生成 data 目录下的完整路径 """ filename = JsonUtils._normalize_filename(filename) base_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) # 当前项目根目录 data_dir = os.path.join(base_dir, "data") Path(data_dir).mkdir(parents=True, exist_ok=True) # 确保 data 目录存在 return os.path.join(data_dir, filename) @staticmethod def read_json(filename: str) -> dict: """ 读取 JSON 文件,返回字典 如果文件不存在,返回空字典 """ file_path = JsonUtils._get_data_path(filename) try: if not os.path.exists(file_path): return {} with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) return data if isinstance(data, dict) else {} except Exception as e: print(f"读取 JSON 文件失败: {e}") return {} @staticmethod def write_json(filename: str, data: dict, overwrite: bool = True) -> bool: """ 将字典写入 JSON 文件 :param filename: 文件名(不用写后缀,自动补 .json) :param data: 要写入的字典 :param overwrite: True=覆盖写,False=合并更新 """ file_path = JsonUtils._get_data_path(filename) try: if not overwrite and os.path.exists(file_path): with open(file_path, "r", encoding="utf-8") as f: old_data = json.load(f) if not isinstance(old_data, dict): old_data = {} old_data.update(data) data = old_data with open(file_path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=4) return True except Exception as e: print(f"写入 JSON 文件失败: {e}") return False @staticmethod def update_json(filename: str, new_data: dict) -> bool: """ 修改 JSON 文件: - 如果 key 已存在,则修改其值 - 如果 key 不存在,则新增 """ try: data = JsonUtils.read_json(filename) if not isinstance(data, dict): data = {} data.update(new_data) return JsonUtils.write_json(filename, data) except Exception as e: print(f"更新 JSON 文件失败: {e}") return False @staticmethod def delete_json_key(filename: str, key: str) -> bool: """ 删除 JSON 文件中的某个 key """ try: data = JsonUtils.read_json(filename) if not isinstance(data, dict): data = {} if key in data: del data[key] return JsonUtils.write_json(filename, data) except Exception as e: print(f"删除 JSON key 失败: {e}") return False # "-------------------------------------------------" @classmethod def _read_json_list(cls, file_path: Path) -> list: try: if not file_path.exists(): return [] with file_path.open("r", encoding="utf-8") as f: data = json.load(f) return data if isinstance(data, list) else [] except Exception: return [] @classmethod def _write_json_list(cls, file_path: Path, data: list) -> None: file_path.parent.mkdir(parents=True, exist_ok=True) with file_path.open("w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=4) # --- 新增:通用追加(不做字段校验) --- # @classmethod # def append_json_items(cls, items, filename="log/last_message.json"): # """ # 将 dict 或 [dict, ...] 追加到 JSON 文件(数组)中;不校验字段。 # """ # file_path = Path(filename) # data = cls._read_json_list(file_path) # # # 统一成 list # if isinstance(items, dict): # items = [items] # elif not isinstance(items, list): # # 既不是 dict 也不是 list,直接忽略 # return # # # 只接受字典项 # items = [it for it in items if isinstance(it, dict)] # if not items: # return # # data.extend(items) # # # LogManager.method_info(filename,"路径") # cls._write_json_list(file_path, data) @classmethod def append_json_items(cls, items, filename="log/last_message.json"): file_path = Path(filename) data = cls._read_json_list(file_path) # 统一成 list if isinstance(items, dict): items = [items] elif not isinstance(items, list): return # 只保留 sender 非空的字典 items = [ it for it in items if isinstance(it, dict) and it.get("sender") != "" ] if not items: return data.extend(items) cls._write_json_list(file_path, data) @classmethod def update_json_items(cls, match: dict, patch: dict, filename="log/last_message.json", multi: bool = True) -> int: """ 修改 JSON 文件(数组)中符合条件的项 :param match: 匹配条件(如 {"sender": "xxx"}) :param patch: 要修改/更新的字段(如 {"status": 1}) :param filename: JSON 文件路径 :param multi: True=修改所有匹配项,False=只修改第一项 :return: 修改的条数 """ file_path = Path(filename) data = cls._read_json_list(file_path) if not isinstance(match, dict) or not isinstance(patch, dict): return 0 updated = 0 for idx, item in enumerate(data): if not isinstance(item, dict): continue # 判断是否匹配 if all(item.get(k) == v for k, v in match.items()): data[idx].update(patch) updated += 1 if not multi: break if updated > 0: cls._write_json_list(file_path, data) return updated @classmethod def query_all_json_items(cls, filename="log/last_message.json") -> list: """ 查询 JSON 文件(数组)中的所有项,并剔除 sender 和 text 为空的记录 :param filename: JSON 文件路径 :return: list,可能为空 """ file_path = Path(filename) data = cls._read_json_list(file_path) if not isinstance(data, list): return [] # 过滤 sender 和 text 为空字符串的项 return [item for item in data if isinstance(item, dict) and item.get("sender", "").strip() and item.get("text", "").strip()] @classmethod def delete_json_items(cls, match: dict, filename: str = "log/last_message.json", multi: bool = True) -> int: file_path = Path(filename) with file_path.open('r+', encoding='utf-8') as f: locker.lock(f, locker.LOCK_EX) # ② 加独占锁(Windows/Linux 通用) try: data = json.load(f) if not isinstance(match, dict): return 0 deleted = 0 new_data = [] for item in data: if isinstance(item, dict) and all(item.get(k) == v for k, v in match.items()): if multi or deleted == 0: # 删多条 / 第一条 deleted += 1 continue new_data.append(item) if deleted: f.seek(0) json.dump(new_data, f, ensure_ascii=False, indent=2) f.truncate() return deleted finally: locker.unlock(f) # ③ 解锁