178 lines
6.2 KiB
Python
178 lines
6.2 KiB
Python
import os
|
||
import json
|
||
import threading
|
||
from datetime import datetime, timedelta
|
||
from typing import Optional, Dict, Any, List
|
||
from loguru import logger
|
||
|
||
|
||
class SessionStore:
|
||
"""
|
||
会话持久化存储(日志文件版 + 内存缓存)
|
||
|
||
优化方案:
|
||
1. 使用日志文件记录(追加模式,性能好,不会因为文件变大而变慢)
|
||
2. 在内存中保留最近的会话记录(用于快速查询)
|
||
3. 定期清理过期的内存记录(保留最近1小时或最多1000条)
|
||
"""
|
||
|
||
def __init__(self, file_path: str = 'logs/sessions.log', enable_log: bool = True, max_memory_records: int = 1000):
|
||
"""
|
||
初始化会话存储。
|
||
|
||
Args:
|
||
file_path (str): 日志文件路径(默认 logs/sessions.log)
|
||
enable_log (bool): 是否启用日志记录,False 则不记录到文件
|
||
max_memory_records (int): 内存中保留的最大记录数,默认1000
|
||
"""
|
||
self.file_path = file_path
|
||
self.enable_log = enable_log
|
||
self.max_memory_records = max_memory_records
|
||
self._lock = threading.Lock()
|
||
# 内存中的会话记录 {pid: record}
|
||
self._memory_cache: Dict[int, Dict[str, Any]] = {}
|
||
# 记录创建时间,用于清理过期记录
|
||
self._cache_timestamps: Dict[int, datetime] = {}
|
||
|
||
if enable_log:
|
||
os.makedirs(os.path.dirname(file_path), exist_ok=True)
|
||
|
||
def _write_log(self, action: str, record: Dict[str, Any]) -> None:
|
||
"""
|
||
写入日志文件(追加模式,性能好)
|
||
|
||
Args:
|
||
action (str): 操作类型(CREATE/UPDATE)
|
||
record (Dict[str, Any]): 会话记录
|
||
"""
|
||
if not self.enable_log:
|
||
return
|
||
|
||
try:
|
||
with self._lock:
|
||
log_line = json.dumps({
|
||
'action': action,
|
||
'timestamp': datetime.now().isoformat(),
|
||
'data': record
|
||
}, ensure_ascii=False)
|
||
with open(self.file_path, 'a', encoding='utf-8') as f:
|
||
f.write(log_line + '\n')
|
||
except Exception as e:
|
||
# 静默处理日志写入错误,避免影响主流程
|
||
logger.debug(f"写入会话日志失败: {e}")
|
||
|
||
def _cleanup_old_cache(self) -> None:
|
||
"""
|
||
清理过期的内存缓存记录
|
||
- 保留最近1小时的记录
|
||
- 最多保留 max_memory_records 条记录
|
||
"""
|
||
now = datetime.now()
|
||
expire_time = now - timedelta(hours=1)
|
||
|
||
# 清理过期记录
|
||
expired_pids = [
|
||
pid for pid, timestamp in self._cache_timestamps.items()
|
||
if timestamp < expire_time
|
||
]
|
||
for pid in expired_pids:
|
||
self._memory_cache.pop(pid, None)
|
||
self._cache_timestamps.pop(pid, None)
|
||
|
||
# 如果记录数仍然超过限制,删除最旧的记录
|
||
if len(self._memory_cache) > self.max_memory_records:
|
||
# 按时间戳排序,删除最旧的
|
||
sorted_pids = sorted(
|
||
self._cache_timestamps.items(),
|
||
key=lambda x: x[1]
|
||
)
|
||
# 计算需要删除的数量
|
||
to_remove = len(self._memory_cache) - self.max_memory_records
|
||
for pid, _ in sorted_pids[:to_remove]:
|
||
self._memory_cache.pop(pid, None)
|
||
self._cache_timestamps.pop(pid, None)
|
||
|
||
def create_session(self, record: Dict[str, Any]) -> None:
|
||
"""
|
||
创建新会话记录。
|
||
|
||
Args:
|
||
record (Dict[str, Any]): 会话信息字典
|
||
"""
|
||
record = dict(record)
|
||
record.setdefault('created_at', datetime.now().isoformat())
|
||
pid = record.get('pid')
|
||
|
||
if pid is not None:
|
||
with self._lock:
|
||
# 保存到内存缓存
|
||
self._memory_cache[pid] = record
|
||
self._cache_timestamps[pid] = datetime.now()
|
||
# 清理过期记录
|
||
self._cleanup_old_cache()
|
||
|
||
# 写入日志文件(追加模式,性能好)
|
||
self._write_log('CREATE', record)
|
||
|
||
def update_session(self, pid: int, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
按 PID 更新会话记录。
|
||
|
||
Args:
|
||
pid (int): 进程ID
|
||
updates (Dict[str, Any]): 更新字段字典
|
||
|
||
Returns:
|
||
Optional[Dict[str, Any]]: 更新后的会话记录
|
||
"""
|
||
with self._lock:
|
||
# 从内存缓存获取
|
||
record = self._memory_cache.get(pid)
|
||
if record:
|
||
record.update(updates)
|
||
record.setdefault('updated_at', datetime.now().isoformat())
|
||
self._cache_timestamps[pid] = datetime.now()
|
||
else:
|
||
# 如果内存中没有,创建一个新记录
|
||
record = {'pid': pid}
|
||
record.update(updates)
|
||
record.setdefault('created_at', datetime.now().isoformat())
|
||
record.setdefault('updated_at', datetime.now().isoformat())
|
||
self._memory_cache[pid] = record
|
||
self._cache_timestamps[pid] = datetime.now()
|
||
|
||
if record:
|
||
# 写入日志文件
|
||
self._write_log('UPDATE', record)
|
||
|
||
return record
|
||
|
||
def get_session_by_pid(self, pid: int) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
按 PID 查询会话记录(仅从内存缓存查询,性能好)
|
||
|
||
Args:
|
||
pid (int): 进程ID
|
||
|
||
Returns:
|
||
Optional[Dict[str, Any]]: 会话记录
|
||
"""
|
||
with self._lock:
|
||
return self._memory_cache.get(pid)
|
||
|
||
def list_sessions(self, status: Optional[int] = None) -> List[Dict[str, Any]]:
|
||
"""
|
||
列出会话记录,可按状态过滤(仅从内存缓存查询)
|
||
|
||
Args:
|
||
status (Optional[int]): 状态码过滤(如 100 运行中、200 已结束、500 失败)
|
||
|
||
Returns:
|
||
List[Dict[str, Any]]: 会话记录列表
|
||
"""
|
||
with self._lock:
|
||
records = list(self._memory_cache.values())
|
||
if status is None:
|
||
return records
|
||
return [r for r in records if r.get('status') == status]
|