import os import json import threading from datetime import datetime, timedelta from typing import Optional, Dict, Any, List from loguru import logger class SessionStore: """ 会话持久化存储(日志文件版 + 内存缓存) 优化方案: 1. 使用日志文件记录(追加模式,性能好,不会因为文件变大而变慢) 2. 在内存中保留最近的会话记录(用于快速查询) 3. 定期清理过期的内存记录(保留最近1小时或最多1000条) """ def __init__(self, file_path: str = 'logs/sessions.log', enable_log: bool = True, max_memory_records: int = 1000): """ 初始化会话存储。 Args: file_path (str): 日志文件路径(默认 logs/sessions.log) enable_log (bool): 是否启用日志记录,False 则不记录到文件 max_memory_records (int): 内存中保留的最大记录数,默认1000 """ self.file_path = file_path self.enable_log = enable_log self.max_memory_records = max_memory_records self._lock = threading.Lock() # 内存中的会话记录 {pid: record} self._memory_cache: Dict[int, Dict[str, Any]] = {} # 记录创建时间,用于清理过期记录 self._cache_timestamps: Dict[int, datetime] = {} if enable_log: os.makedirs(os.path.dirname(file_path), exist_ok=True) def _write_log(self, action: str, record: Dict[str, Any]) -> None: """ 写入日志文件(追加模式,性能好) Args: action (str): 操作类型(CREATE/UPDATE) record (Dict[str, Any]): 会话记录 """ if not self.enable_log: return try: with self._lock: log_line = json.dumps({ 'action': action, 'timestamp': datetime.now().isoformat(), 'data': record }, ensure_ascii=False) with open(self.file_path, 'a', encoding='utf-8') as f: f.write(log_line + '\n') except Exception as e: # 静默处理日志写入错误,避免影响主流程 logger.debug(f"写入会话日志失败: {e}") def _cleanup_old_cache(self) -> None: """ 清理过期的内存缓存记录 - 保留最近1小时的记录 - 最多保留 max_memory_records 条记录 """ now = datetime.now() expire_time = now - timedelta(hours=1) # 清理过期记录 expired_pids = [ pid for pid, timestamp in self._cache_timestamps.items() if timestamp < expire_time ] for pid in expired_pids: self._memory_cache.pop(pid, None) self._cache_timestamps.pop(pid, None) # 如果记录数仍然超过限制,删除最旧的记录 if len(self._memory_cache) > self.max_memory_records: # 按时间戳排序,删除最旧的 sorted_pids = sorted( self._cache_timestamps.items(), key=lambda x: x[1] ) # 计算需要删除的数量 to_remove = len(self._memory_cache) - self.max_memory_records for pid, _ in sorted_pids[:to_remove]: self._memory_cache.pop(pid, None) self._cache_timestamps.pop(pid, None) def create_session(self, record: Dict[str, Any]) -> None: """ 创建新会话记录。 Args: record (Dict[str, Any]): 会话信息字典 """ record = dict(record) record.setdefault('created_at', datetime.now().isoformat()) pid = record.get('pid') if pid is not None: with self._lock: # 保存到内存缓存 self._memory_cache[pid] = record self._cache_timestamps[pid] = datetime.now() # 清理过期记录 self._cleanup_old_cache() # 写入日志文件(追加模式,性能好) self._write_log('CREATE', record) def update_session(self, pid: int, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ 按 PID 更新会话记录。 Args: pid (int): 进程ID updates (Dict[str, Any]): 更新字段字典 Returns: Optional[Dict[str, Any]]: 更新后的会话记录 """ with self._lock: # 从内存缓存获取 record = self._memory_cache.get(pid) if record: record.update(updates) record.setdefault('updated_at', datetime.now().isoformat()) self._cache_timestamps[pid] = datetime.now() else: # 如果内存中没有,创建一个新记录 record = {'pid': pid} record.update(updates) record.setdefault('created_at', datetime.now().isoformat()) record.setdefault('updated_at', datetime.now().isoformat()) self._memory_cache[pid] = record self._cache_timestamps[pid] = datetime.now() if record: # 写入日志文件 self._write_log('UPDATE', record) return record def get_session_by_pid(self, pid: int) -> Optional[Dict[str, Any]]: """ 按 PID 查询会话记录(仅从内存缓存查询,性能好) Args: pid (int): 进程ID Returns: Optional[Dict[str, Any]]: 会话记录 """ with self._lock: return self._memory_cache.get(pid) def list_sessions(self, status: Optional[int] = None) -> List[Dict[str, Any]]: """ 列出会话记录,可按状态过滤(仅从内存缓存查询) Args: status (Optional[int]): 状态码过滤(如 100 运行中、200 已结束、500 失败) Returns: List[Dict[str, Any]]: 会话记录列表 """ with self._lock: records = list(self._memory_cache.values()) if status is None: return records return [r for r in records if r.get('status') == status]