今天上午买了机票,因为周六我要去上海晃一圈,所以买的周末中午的票,从上海虹桥到重庆江北,700多啊,肉疼~

之后就一边在做webai-lite的局部和全局屏蔽区,一边看推理客户端的C++代码,主要看了agnetserver调度模块,了解了推理部分的三个线程是如何工作的,如何用队列通信的。

我chovy,这个代码缩进都是错的,写代码给我写好的啊!

下午也一直在看这边的代码,后面的一长串 webai2.0-device AgentServer 代码分析webai2.0-device Watchdog 与升级机制分析都是我让AI根据我和它的对话总结的

今天感觉差不多了,剩下的时间想做自己的事,比如做一个实时监控电脑资源的小插件,实时监测网络、内存、CPU、GPU资源,内存占用低,曲线平滑
后来我让GPT帮我找了一下,发现一个开源项目还挺好用的:LiteMonitor 除了没有曲线,完全满足我的要求~

接下来就都在更新简历了,在GPT的帮助下也是成功更新出一版,但是我觉得这版简历上的技术名词有点少

先这样,后续再改吧,最后复习两道算法题就完事~


今日工作内容

  1. 完善webai-lite局部、全局屏蔽区画笔
  2. 整理交接代码、文件

下阶段计划

  1. 做好交接相关事宜

webai2.0-device AgentServer 代码分析

分析日期:2026-06-25
基于 d:\webai2.0-device 源码阅读,涵盖 agentserver.h/cpp、trainingthread.h/cpp、inferencethread.h/cpp、resultmanager.h/cpp、httpserver.h 等核心文件。


一、架构概览:C++/Qt 设备管理守护进程

webai2.0-device 是一个设备端 Agent 守护进程,以 C++/Qt 编写,负责设备与云端管理端之间的通信、任务调度和 ML 训练/推理编排。

核心定位:C++ 是管家(设备管理),Python 是厨师(ML 计算)

类组织结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
MainWindow(Qt 主窗口,根对象)
├─ AgentServer(任务调度中枢,类工厂)
│ ├─ QTcpSocket m_socket ← 主命令通道(自定义二进制协议)
│ ├─ QTcpSocket m_msg_socket ← 消息推送通道(裸 JSON)
│ ├─ std::map<QString, TaskInfo> ← 任务仓库
│ ├─ SendResultManager(单例) ← 结果回传串行调度
│ ├─ CleanThread ← 数据清理
│ ├─ TrainingThread(动态创建) ← 训练任务
│ └─ InferenceThread(动态创建) ← 推理任务

├─ HTTPServerThread(独立线程)
│ └─ HTTPServer(QTcpServer) ← HTTP 状态查询 API

├─ DatabaseHelper × 2 ← SQLite 缓存/RES 库
└─ InferenceTestThread / InferenceBatchThread ← 本地测试推理

二、三通道通信协议

① m_socket — 自定义二进制协议

管理端主动连接,承载命令+文件+心跳+结果回传。

上行(管理端 → Agent):

阶段 格式 说明
命令 16 字节定长,右补 \0 start_train / start_infer / start_upgrade / heartbeat
Header 训练/推理/升级各自定长 UUID + 模型类型 + 各文件元数据(文件名长度+文件大小)
文件名列表 变长拼接 按 Header 中声明的顺序排列
文件数据 CHUNK_SIZE 分片 裸文件写入

下行(Agent → 管理端):

内容 格式
心跳 HEARTBEAT 命令
结果回传 命令(train_res / infer_res) + UUID + 文件名长度(8B BE) + 文件大小(8B BE) + 文件名 + 文件分片

所有整型字段使用 QDataStream + BigEndian

② m_msg_socket — 裸 JSON 文本

设备主动连接管理端的 MSG_SERVER_PORT,推送进度和错误消息。每条消息是一个完整 JSON,无额外协议封装:

1
2
{"task_id":"<uuid>", "type":"info", "message":"训练进度: iter: 0, epoch: 4, ..."}
{"task_id":"<uuid>", "type":"error", "message":"training process start fail"}

③ HTTP Server — HTTP 1.0

HTTPServer 跑在独立线程,监听本地端口(可读配置),提供只读查询 API 供本地 Web UI 使用:

  • GET /status?uuid=xxx → 任务状态快照
  • GET /devicefree / using
  • GET /upgradecanUpgrade / 升级中
  • POST /cancel → 取消任务

HTTP 读的是 m_httpTaskStatusSnapshot 快照哈希表,不直接读 m_taskInfo,避免锁竞争。


三、任务生命周期(11 态)

1
2
3
NONE → TRAINING/INFERENCE → SEND_PENDING → SENDING → FINISHED
→ FAIL → SENDING_ERROR
→ CANCELLING → CANCELLED

训练结束后,C++ 把结果 zip 加入 SendResultManager 串行队列,通过主 TCP 连接发回管理端。


四、HTTP 快照机制(读写分离)

问题起源: HTTP 线程直接读 m_taskInfo 时,如果工作线程正持有 QMutex(如写训练曲线),HTTP 线程会阻塞,导致前端请求超时。

解决方案:

1
2
3
4
5
工作线程更新状态 → publishHttpTaskStatusSnapshot()

写入快照哈希表(极小锁)

HTTP 线程读快照(几乎不阻塞)

两套接口分离:

读快照(推荐) 直接读(旧,保留兼容)
httpSnapshotGetTaskStatusJson httpGetTaskStatusJson
httpSnapshotGetTaskStatusInt httpGetTaskStatus
httpSnapshotGetDeviceState httpGetDeviceState
httpSnapshotGetCanUpgrade 无对应旧接口

五、推理多线程流水线

推理使用 Reader → Infer → Writer 三线程流水线实现图片批量推理,最大化 GPU/CPU 并发。

1
2
3
4
5
6
7
8
9
10
11
12
13
ReaderThread          InferThread           WriterThread
(读磁盘) (GPU 推理) (写结果 JSON)
│ │ │
│ m_imageQueue │ m_resultQueue │
│ (有界队列 cap=10) │ (有界队列 cap=20) │
│ m_imageMutex │ m_resultMutex │
│ │ │
├─ 读图片入队 │ │
│ ├─ 取图推理 │
│ ├─ 结果入队 ├─ 取结果写磁盘
│ │ │
└─────────────────────┴─────────────────────┘
互斥锁 QMutex 互斥锁 QMutex

存在问题

  • 轮询空转:三线程均使用 msleep(100) 轮询检查队列状态,空闲时每秒空唤醒 30 次
  • 复制粘贴的轮询逻辑:内外两层 while 检查几乎相同
  • 模板代码重复:4 个线程类各需声明 stop()/run()/析构清理,~400 行样板代码
  • 空参占位容错:空图也入队空 param 保持 count 对齐,而非真正处理错误
  • 锁窗口期unlock → 判断 → lock 之间状态可能已变化

优化方向

  • QWaitCondition 替代 msleep 轮询,实现事件驱动的有界阻塞队列
  • 用 3 个 std::thread + lambda 替代 4 个 QThread 子类
  • std::optional / std::variant<Result, Error> 处理异常路径

六、训练:QProcess 管理 Python 子进程

训练不包含 C++ 推理逻辑,而是启动 Python 子进程:

1
2
3
4
5
m_process->start(ClientConfig::GetPythonPath(),
QStringList() << scriptPath
<< "--work_root" << workPath
<< "--pyd_root" << trainEnvPath
<< "--pretrained_root" << pretrainedPath);

准备阶段(TrainPrepareThread)

依次解压:代码 zip → 样本 zip(或使用缓存)→ 增量数据 → 预训练模型 → 配置文件。每步检查取消标志。

进度反馈

C++ 通过正则从 Python stdout/stderr 解析训练曲线:

1
2
3
QRegularExpression m_regexCurve{
R"(fsnb:\s*iter:\s(\d+),\sepoch:\s(\d+),\s(.)iter:\s*(\d+),\s*epoch:\s*(\d+),\s*(.*))"
};

解析结果通过 m_msg_socket 推送到管理端。对应 Python 输出格式:

1
fsnb: [iter:0, epoch:4, train_loss:0.1790, train_acc:0.9843, val_loss:0.2770, val_acc:0.9693]

后处理(processResult)

  1. 收集 .fsnb / .pb / .json 模型文件,打包 zip
  2. 序列化训练曲线到 CSV 文件
  3. 启动 Python 子进程生成报表(xlsx)
  4. 打包增量数据

七、结果回传(SendResultManager)

单例 + QThreadPool(1),强制训练/推理结果串行发送,避免争抢主 TCP 连接。

1
2
3
4
5
6
7
8
SendTask::run() {
emit taskStarted(); // 通知主线程停心跳
emit sendData(command+uuid); // 发命令
emit sendFile(uuid, files); // 发文件
m_condition->wait(m_mutex); // 阻塞等文件发完
// 状态更新
emit taskFinished(); // 通知主线程重启心跳
}

等待机制: SendTask 通过 QWaitCondition 阻塞自己,等待主线程的 sendResFile 发完文件后 wakeOne。原因是 QTcpSocket 在主线程创建,不能跨线程使用。

存在问题

  • wait() 无超时机制,发文件异常时永久阻塞
  • sendFile 信号是异步队列调用,taskFinished 信号也得等主线程事件循环处理
  • file_mutexm_taskInfo.mutex 的加锁顺序不固定,存在潜在死锁风险

八、为什么用 C++ 而非纯 Python

真正无法被纯 Python 替代的(3 项)

功能 文件 原因
SafeNet HASP 硬件加密锁 main.cpp:112-138 SDK 只有 C 头文件 + 二进制库
崩溃 Minidump 生成 main.cpp:77-98 需要 SetUnhandledExceptionFilter 在 C 层拦截 SEH 异常
__cpuid CPU 指令 devinfo.cpp:26-36 汇编级硬件指纹采集

可以替代但代价大的

功能 Python 方案 代价
CUDA 设备查询 pynvml 多一个 pip 依赖
加密锁 ctypes 调 hasp API 声明结构体、回调指针、排错困难
Minidump 无法获得 C 层 dump 需要 C wrapper
GPU 调用 TensorRT Python API 与 C++ 推理引擎接口不对齐

根本原因

这个项目最初是一个 Qt 桌面客户端 GUIMainWindow + 数据库管理 + 本地推理测试),后来附着上云管理功能(AgentServer + HTTPServer)。从 C++/Qt 生长出 TCP 管理功能是自然的演化,而非从零做技术选型。如果今天只写 Agent 守护进程,Python 是合理选择。


九、设备信息采集

devinfo.cpp 采集以下系统信息:

  • CPU__cpuid 指令取品牌名 + 核心数(Win)/ /proc/cpuinfo(Linux)
  • 内存GlobalMemoryStatusEx(Win)/ /proc/meminfo(Linux)
  • 磁盘QStorageInfo 取数据目录所在盘的总容量/剩余空间
  • 网络GetAdaptersAddresses + GetIfEntry2(Win)/ /sys/class/net/(Linux)
  • GPUcudaGetDeviceCount + cudaGetDeviceProperties + cudaMemGetInfo

十、缓存策略

用同一样本多次训练,不用反复传样本
用同一样本/模型多次推理,不用反复传样本/模型

服务端把 file_len 设为 0,把 file_name 字段复用为 hash 值传过来。设备收到后查本地 SQLite,找到上次任务留下的文件路径,直接在本地解压/拷贝,不需要通过网络接收这个大文件。

通过 SQLite 数据库实现两级缓存,避免重复传输大文件:

缓存库 用途
webai.db training 训练样本缓存(按 hash 查找)
webai.db inference 推理模型/样本缓存
res.db resource 大模型资源(OCR/GD/SAM)版本管理

缓存命中时,服务端只下发 hash,客户端从历史任务目录拷贝,跳过文件传输。


十一、数据清理(CleanThread)

  • 任务执行完成后在 AgentServer 回调中触发 cleanData()
  • 按保留天数删除过期的工作目录和日志文件
  • 记录上次清理时间到 lastclean 文件,避免每次 GC 操作

十二、关键风险管理

风险 现状 建议
TCP 粘包 命令定长 + header 定长 + 协议状态机区分,能正确处理 无需改动
磁盘写满 receviceFilewrite 失败时设 FAIL 状态 可考虑预留空间检测
Python 子进程崩溃 QProcess 信号 handleProcessFinished → 读 error.json 无超时保护,可能挂起
取消未响应 主线程置 cancel_requested → 工作线程 200ms 轮询 kill 对阻塞的系统调用无效
socket 断线 socketDisconnected → 延迟重连 重连期间任务正常继续
加密狗失效 启动时弹框退出 运行时无法检测

webai2.0-device Watchdog 与升级机制分析

分析日期:2026-06-25
基于 watchdog/watchdog.cppsrc/agentserver.cppsrc/mainwindow.cpp 等源码阅读。


一、双进程保活架构

WebAIClient 由两个独立的 .exe 组成,构成双进程保活系统

1
2
3
4
5
6
7
8
Watchdog.exe(看守进程)                    WebAIClient.exe(主进程)
┌──────────────────────────┐ ┌──────────────────────────┐
│ main(): │ │ MainWindow │
│ ① 系统托盘图标 │ │ ├─ AgentServer │
│ ② 定时器每 5 秒检查心跳 │ 共享内存 │ ├─ HTTPServerThread │
│ ③ 主进程挂了 → 自动重启 │ ◄─────────► │ ├─ updateSharedMemory() │
│ ④ 升级指令 → 接管升级流程 │ webai_shm │ └─ upgrade() │
└──────────────────────────┘ └──────────────────────────┘

关键特性

特性 说明
解耦 QProcess::startDetached 启动,两进程完全独立,谁挂了都不影响对方
唯一实例 QSharedMemory("watchdog_instance") + QSystemSemaphore 防重复启动
系统托盘 Watchdog 显示托盘图标,提示语”webai 监控程序,请勿退出”
文件锁 主进程运行时 .exe 被锁定,无法覆盖,所以升级必须由 Watchdog 执行

二、心跳协议(共享内存)

两进程通过 QSharedMemory("webai_shm") 通信,共享内存布局:

1
2
3
4
5
6
7
偏移 0: type (1 字节)
- 0 = 正常心跳
- 1 = 升级指令

偏移 1+: timestamp (8 字节,Unix 纪元秒,仅 type=0 时有效)
- 主进程定时写入当前时间
- Watchdog 检查时间差是否超过 MAX_WAIT_HEARTBEAT(15 秒)

Watchdog 侧(读)

watchdog.cpp:166-198

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void MonitorSharedMemory() {
sharedMemory.lock();
memcpy(&type, data, sizeof(type));

if (type == 0) {
// 检查心跳是否超时
memcpy(&timestamp, data + sizeof(type), sizeof(timestamp));
if (interval > MAX_WAIT_HEARTBEAT && !checkWebaiclientRunning()) {
startWebaiclient(); // 重启主进程
}
} else if (type == 1) {
// 升级指令
timer.stop();
upgradeWebaiclient();
timer.start();
}
sharedMemory.unlock();
}

主进程侧(写)

通过 QTimer 周期性调用 MainWindow::updateSharedMemory(),写入当前时间戳到共享内存。为什么用共享内存而不是 socket/pipe?

  • 共享内存无连接建立开销
  • 写入只是一个 memcpy + lock/unlock,极轻量
  • Watchdog 可以随时读取,不需要主进程主动推送

三、QProcess::startDetached vs fork

fork startDetached
关系 父子进程,树形结构 两个独立进程,平级
内存 写时复制,共享地址空间 完全独立
子进程死 父进程收到 SIGCHLD 无法感知
父进程死 子进程变孤儿(init 收养) 无影响,子进程继续运行
跨平台 Unix only Windows/Linux/macOS
用途 自身分裂 启动一个完全不同的 exe

Watchdog 用 startDetached 的原因:

  1. Windows 没有 fork
  2. 要启动的是不同的 exe(WebAIClient.exe),不是自身副本
  3. 刻意解耦——Watchdog 崩溃不影响主进程,主进程崩溃 Watchdog 能通过 tasklist 发现并重启

四、共享内存:线程间还是进程间?

维度 线程间通信 进程间通信(IPC)
地址空间 共享(同一进程) 隔离(不同进程)
数据可见性 全局变量直接读写 需要操作系统映射
同步机制 QMutex / std::atomic QSharedMemory::lock/unlock
适用场景 线程池任务队列 跨进程心跳/数据交换

共享内存是进程间通信(IPC)的核心机制。 同一进程的线程不需要共享内存——它们共享地址空间,直接读写全局变量即可。不同进程的地址空间隔离,才需要操作系统帮忙映射一块双方都能访问的物理内存。

Watchdog 场景:Watchdog.exeWebAIClient.exe 是完全独立的两个进程,走共享内存 IPC。


五、升级完整链路

升级涉及三个角色:管理端 → AgentServer → 主进程 → Watchdog

5.1 AgentServer 接收升级包

管理端通过 m_socket 发送 start_upgrade 命令,随后传输 webai_update.zip

阶段一:解析命令 agentserver.cpp:898-930

1
2
3
4
5
6
7
void AgentServer::parseCommand() {
if (commandstr == START_UPGRADE) {
m_type = TaskType::UPGRADE;
buffer.remove(0, COMMAND_LEN);
publishHttpUpgradeReceiving(true); // HTTP 标记:升级接收中
}
}

阶段二:解析 Header agentserver.cpp:1112-1118

1
2
3
4
5
6
7
} else if (m_type == TaskType::UPGRADE) {
QDataStream stream(buffer);
stream.setByteOrder(QDataStream::BigEndian);
stream >> upgrade_header.file_len; // 读取升级包总大小
buffer.remove(0, UPGRADE_HEADER_SIZE);
upgrade_header.status = ParsedStatus::FIXEDPARSED;
}

升级包的 header 只有 8 字节(一个 quint64):file_len,表示 webai_update.zip 的大小。

阶段三:接收文件 agentserver.cpp:1450-1496

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
} else if (m_type == TaskType::UPGRADE) {
// 首次打开文件:当前目录/webai_update.zip
if (!m_file.isOpen()) {
m_file.setFileName(QDir::currentPath() + "/webai_update.zip");
m_file.open(QIODevice::WriteOnly);
}
// 分片写入,与训练/推理文件写入方式一致
quint64 left = upgrade_header.file_len - upgrade_header.write_size;
written_size = m_file.write(buffer.left(safe_left));
buffer.remove(0, written_size);
upgrade_header.write_size += written_size;

// 写入完成 → 通知 Watchdog
if (upgrade_header.write_size == upgrade_header.file_len) {
m_file.close();
m_type = TaskType::NONETYPE;
upgrade_header = {};
publishHttpUpgradeReceiving(false);
Q_EMIT UpgradeReceiveFinished(); // → 触发写共享内存 type=1
}
}

5.2 主进程触发 Watchdog

UpgradeReceiveFinished 信号触发的链路:

  1. 信号发射
  2. 主进程将 type=1 写入共享内存 webai_shm
  3. Watchdog 的 5 秒定时器触发 MonitorSharedMemory()
  4. 读到 type=1 → 执行 upgradeWebaiclient()

5.3 Watchdog 执行升级

watchdog.cpp:119-163

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void upgradeWebaiclient() {
// 1. 等主进程正常退出(最多 60 秒,每秒检查一次)
while (checkWebaiclientRunning() && sleepTime < MAX_WAIT_CLOSE) {
QThread::sleep(1);
}

// 2. 60 秒后还不退就强杀
if (checkWebaiclientRunning())
killProcess("WebAIClient.exe"); // taskkill /F /IM WebAIClient.exe

// 3. 解压更新包覆盖当前目录
JlCompress::extractDir(upgrade_zip, QDir::currentPath());

// 4. 记录本次更新的 MD5 到 lastupdate 文件
QString hash = calculateFileHash(upgrade_zip, QCryptographicHash::Md5);

// 5. 删除更新包
QFile::remove(upgrade_zip);

// 6. 启动新版本
startWebaiclient(); // startDetached
}

为什么升级必须由 Watchdog 做?

主进程运行时 Windows 锁定了 WebAIClient.exe,自己无法覆盖自己。Watchdog 先把主进程杀了(释放文件锁),解压覆盖,再启动。这是 Windows 平台应用自更新的标准做法。


六、TCP 协议解析状态机(parseData

AgentServer 的 receiveData() 是一个四阶段状态机,处理管理端发来的二进制数据流:

1
2
3
4
5
6
7
8
9
10
11
12
buffer 中的原始 TCP 数据流:

┌──────────┬──────────────┬──────────────────┬──────────────────────────┐
│ Command │ Header │ Filenames[] │ File Data │
│ (16B) │ (定长) │ (变长拼接) │ (变长,多个文件) │
└──────────┴──────────────┴──────────────────┴──────────────────────────┘
↑ ↑ ↑ ↑
parseCommand parseHeader parseFilename parseData
│ │ │ │
m_type 从 header.status header.status header.status
NONETYPE → FIXEDPARSED → FILENAMEPARSED → 按 receive_status
变成具体类型 写文件

各阶段职责

阶段 函数 触发条件 产出
① 解析命令 parseCommand() buffer 中有 ‘s’ 开头 + 满 16 字节 设置 m_type(TRAINING/INFERENCE/UPGRADE)
② 解析 Header parseHeader() header 数据到齐(各类型定长) 填充 UUID、模型类型、各文件元数据
③ 解析文件名 parseFilename() header 完成后文件名数据到齐 解析 6 个文件名,创建 UUID 工作目录
写文件 parseData() 文件名解析完,buffer 有数据 按 receive_status 状态机逐个写文件到磁盘

parseData 内部状态机

训练数据接收有 6 个文件,按顺序流转:

1
RECEIVE_CODE → RECEIVE_CONFIG → RECEIVE_SAMPLE → RECEIVE_PREMODEL → RECEIVE_ENV → RECEIVE_INCREMENT → NONE

推理数据接收也有 6 个文件:

1
RECEIVE_FLOW → RECEIVE_FLOW_RUN → RECEIVE_MODEL → RECEIVE_SAMPLE → RECEIVE_ENV → RECEIVE_SAM → NONE

每个文件完整写入后,receive_status 推进到下一状态。每写完一个文件调用 checkRecvStatus() 检查是否全部完成,完成后发射 TrainReceiveFinished / InferReceiveFinished 信号触发任务线程。

四个阶段 vs 四层保活

层级 机制 守护谁
进程级 Watchdog 共享内存心跳 + tasklist WebAIClient.exe
线程级 TrainingWorker waitForFinished(200) + kill Python 训练子进程
线程级 InferenceWorker shouldStopPostProcess() 轮询 Reader/Infer/Writer 三线程
TCP 级 sendHeartbeat() + 断线重连(延迟 5 秒) 管理端连接

七、WATCHog 存在的意义

Watchdog 只做 4 件事:

  1. 保活 — 主进程崩溃后自动拉起,保证设备持续在线
  2. 升级 — 代理升级流程,解决”自己不能覆盖自己”的问题
  3. 可视化 — 系统托盘图标提示用户”此进程不可关闭”
  4. 自身唯一 — 共享内存 + 信号量防多开

它不负责

  • 任务调度(那是 AgentServer 的事)
  • 网络通信(那是 m_socket/m_msg_socket 的事)
  • ML 计算(那是 Python 子进程的事)

Watchdog 的职责范围非常窄,代码仅 261 行,但它处于整个系统的最底层安全网位置。