Files
obsidian/操作系统/操作系统实践/实践03_线程池与业务分割.md

676 lines
21 KiB
Markdown
Raw Normal View History

2026-06-14 19:05:57 +08:00
# 实践03 Web服务器的线程池和业务分割模型
> [!abstract] 实验概要
> - **实验名称**: 实验三. Web服务器的线程池和业务分割模型
> - **实验目的**: 将webserver改造为预线程版本和业务分割模型并进行性能测试掌握并行网络服务器的设计、性能测试和优化方法
> - **前置知识**: [[07_多线程编程]]、[[10_并发服务器]]、[[实践02_多进程多线程服务器]]
> - **核心概念**: 线程池、生产者-消费者模型、业务分割、性能测试
---
## 一、实验目的
1. 理解预线程化prethreading服务器的工作原理
2. 掌握线程池的设计与实现方法
3. 理解业务分割模型的架构思想
4. 学会使用 `http_load` 进行服务器性能测试
5. 掌握使用 `vmstat``iostat``gprof` 等工具监测系统性能
6. 通过对比分析,理解不同并发模型的性能差异
---
## 二、涉及知识点
- POSIX 线程池设计(详见 [[07_多线程编程]]
- 生产者-消费者模型与信号量同步
- 预线程化服务器模型(详见 [[10_并发服务器]]
- 业务分割Pipeline并行模型
- 消息队列设计与实现
- 性能测试工具:`http_load``vmstat``iostat``gprof`
- 并行服务器的性能瓶颈分析与优化
---
## 三、源代码文件清单
| 文件名 | 说明 |
|--------|------|
| `taskline.c` / `taskline.h` | 预线程服务器任务管理程序 |
| `pool.c` / `pool.h` | 线程池服务器的线程管理和任务管理程序 |
| `urls` | `http_load` 测试 URL 列表 |
> [!tip] 编译命令
> 使用调试模式编译:
> ```bash
> make M="-DDEBUG" rebuild
> ```
---
## 四、实验任务
### 任务一:准备工作
将源代码复制到 Ubuntu 环境并编译:
```bash
# 复制源代码到工作目录
cp -r /path/to/source/* ./
# 使用调试模式编译
make M="-DDEBUG" rebuild
```
> [!warning] 注意事项
> 确保系统已安装 `http_load` 工具,如未安装:
> ```bash
> sudo apt-get install http-load
> ```
### 任务二:阅读和理解线程池实现
阅读并理解 `togglest_pool.c` 的线程池实现,编译运行验证其正确性。
#### 线程池核心数据结构
```c
/* pool.h - 线程池定义 */
typedef struct {
pthread_t *threads; /* 线程数组 */
int thread_count; /* 线程数量 */
int *task_queue; /* 任务队列socket fd */
int queue_size; /* 队列容量 */
int front; /* 队头指针 */
int rear; /* 队尾指针 */
int count; /* 当前队列中任务数 */
pthread_mutex_t mutex; /* 互斥锁 */
sem_t slots; /* 空闲槽位信号量 */
sem_t items; /* 可用任务信号量 */
int shutdown; /* 关闭标志 */
} thread_pool_t;
```
#### 线程池核心操作
```c
/* pool.c - 线程池操作实现 */
/* 初始化线程池 */
void pool_init(thread_pool_t *pool, int threads, int queue_size) {
pool->threads = (pthread_t *)malloc(threads * sizeof(pthread_t));
pool->task_queue = (int *)malloc(queue_size * sizeof(int));
pool->thread_count = threads;
pool->queue_size = queue_size;
pool->front = pool->rear = pool->count = 0;
pool->shutdown = 0;
pthread_mutex_init(&pool->mutex, NULL);
sem_init(&pool->slots, 0, queue_size);
sem_init(&pool->items, 0, 0);
/* 预创建所有工作线程 */
for (int i = 0; i < threads; i++) {
pthread_create(&pool->threads[i], NULL, worker, (void *)pool);
}
}
/* 向任务队列添加任务(生产者) */
void pool_enqueue(thread_pool_t *pool, int conn_fd) {
sem_wait(&pool->slots); /* 等待空闲槽位 */
pthread_mutex_lock(&pool->mutex); /* 进入临界区 */
pool->task_queue[pool->rear] = conn_fd;
pool->rear = (pool->rear + 1) % pool->queue_size;
pool->count++;
pthread_mutex_unlock(&pool->mutex);
sem_post(&pool->items); /* 通知有新任务 */
}
/* 从任务队列取出任务(消费者) */
int pool_dequeue(thread_pool_t *pool) {
sem_wait(&pool->items); /* 等待可用任务 */
pthread_mutex_lock(&pool->mutex); /* 进入临界区 */
int conn_fd = pool->task_queue[pool->front];
pool->front = (pool->front + 1) % pool->queue_size;
pool->count--;
pthread_mutex_unlock(&pool->mutex);
sem_post(&pool->slots); /* 释放槽位 */
return conn_fd;
}
/* 工作线程主函数 */
void *worker(void *arg) {
thread_pool_t *pool = (thread_pool_t *)arg;
while (1) {
int conn_fd = pool_dequeue(pool); /* 取出任务 */
if (conn_fd < 0) break; /* 收到关闭信号 */
handle_request(conn_fd); /* 处理 HTTP 请求 */
close(conn_fd); /* 关闭连接 */
}
return NULL;
}
```
#### 线程池工作流程
```
主线程(Accept) --> 任务队列 --> 工作线程1 (处理请求)
[conn_fd] --> 工作线程2 (处理请求)
--> 工作线程N (处理请求)
```
> [!info] 生产者-消费者模型
> 线程池本质上是一个 **多生产者-多消费者** 模型:
> - **生产者**:主线程不断 `accept` 新连接,将 `conn_fd` 放入队列
> - **消费者**:工作线程从队列取出 `conn_fd`,处理 HTTP 请求
> - 信号量 `slots` 和 `items` 实现了线程间的同步
>
> 该模型在 [[07_多线程编程]] 中已有详细讨论,此处是其在网络服务器中的实际应用。
### 任务三:实现 Web 服务器线程池版本
仿照 `togglest_pool.c` 的设计,将 webserver 改造为线程池版本,并使用 `http_load` 测试性能。
#### 编译与运行
```bash
# 编译线程池版 webserver
make M="-DDEBUG" rebuild
# 启动服务器
./webserver 8080
# 另一终端http_load 性能测试
http_load -parallel 5 -fetches 50 -seconds 20 urls
```
#### urls 文件示例
```
http://localhost:8080/index.html
http://localhost:8080/test.html
http://localhost:8080/image.jpg
```
> [!example] http_load 参数说明
> | 参数 | 含义 |
> |------|------|
> | `-parallel N` | 并发连接数为 N |
> | `-fetches N` | 总共发起 N 次请求 |
> | `-seconds N` | 测试持续 N 秒 |
>
> 两参数同时指定时,先满足的条件生效。
### 任务四:设计实现业务分割模型
将 webserver 改造为业务分割模型,将 HTTP 请求处理拆分为三个阶段,每个阶段由独立的线程池负责。
#### 业务分割架构图
```mermaid
graph LR
C[客户端请求] --> A[read msg<br/>threadpool]
A -->|文件名 + socket| Q1[filename<br/>queue]
Q1 --> B[read file<br/>threadpool]
B -->|内容 + socket| Q2[msg<br/>queue]
Q2 --> D[send msg<br/>threadpool]
D --> R[客户端响应]
style A fill:#4CAF50,color:#fff
style B fill:#2196F3,color:#fff
style D fill:#FF9800,color:#fff
style Q1 fill:#E91E63,color:#fff
style Q2 fill:#9C27B0,color:#fff
```
#### 业务分割详细流程
```mermaid
graph TB
subgraph "阶段一:消息读取与解析"
A1[线程从 socket 读取 HTTP 请求] --> A2[解析请求行<br/>GET /path HTTP/1.1]
A2 --> A3[提取文件名]
A3 --> A4[将 filename + conn_fd<br/>加入 filename queue]
end
subgraph "阶段二:文件读取"
B1[线程从 filename queue<br/>取出 filename + conn_fd] --> B2[打开并读取文件内容]
B2 --> B3[构造 HTTP 响应]
B3 --> B4[将 response + conn_fd<br/>加入 msg queue]
end
subgraph "阶段三:响应发送"
C1[线程从 msg queue<br/>取出 response + conn_fd] --> C2[通过 socket<br/>发送 HTTP 响应]
C2 --> C3[关闭连接]
end
A4 --> B1
B4 --> C1
style A1 fill:#4CAF50,color:#fff
style B1 fill:#2196F3,color:#fff
style C1 fill:#FF9800,color:#fff
```
#### 消息传递时序图
```mermaid
sequenceDiagram
participant Client as 客户端
participant RM as read msg<br/>threadpool
participant FQ as filename<br/>queue
participant RF as read file<br/>threadpool
participant MQ as msg<br/>queue
participant SM as send msg<br/>threadpool
Client->>RM: HTTP 请求 (socket)
activate RM
RM->>RM: 读取 socket 数据
RM->>RM: 解析 HTTP 请求<br/>提取 filename
RM->>FQ: enqueue(filename, socket)
deactivate RM
FQ->>RF: dequeue(filename, socket)
activate RF
RF->>RF: 打开文件
RF->>RF: 读取文件内容
RF->>RF: 构造 HTTP 响应
RF->>MQ: enqueue(response, socket)
deactivate RF
MQ->>SM: dequeue(response, socket)
activate SM
SM->>Client: 发送 HTTP 响应
SM->>SM: 关闭连接
deactivate SM
```
#### 三个线程池的职责
> [!note] 线程池一read msg threadpool
> - **职责**: 从 socket 读取 HTTP 请求消息并解析
> - **输入**: 客户端的 socket 连接
> - **处理**: 调用 `recv()` 读取数据,解析请求行获取文件名
> - **输出**: 将 `filename + conn_fd` 加入 filename queue
> - **关键点**: I/O 密集型操作,需要处理不完整读取
> [!note] 线程池二read file threadpool
> - **职责**: 根据文件名读取文件内容
> - **输入**: 从 filename queue 取出的 `filename + conn_fd`
> - **处理**: 打开文件、读取内容、构造 HTTP 响应头和正文
> - **输出**: 将 `response + conn_fd` 加入 msg queue
> - **关键点**: 磁盘 I/O 密集型操作,是性能瓶颈之一
> [!note] 线程池三send msg threadpool
> - **职责**: 将 HTTP 响应发送回客户端
> - **输入**: 从 msg queue 取出的 `response + conn_fd`
> - **处理**: 调用 `send()` 发送响应数据
> - **输出**: 关闭 socket 连接
> - **关键点**: 网络 I/O 密集型操作
#### 两个消息队列
> [!important] 消息队列设计
> 消息队列是线程池之间通信的桥梁,采用生产者-消费者模型实现:
>
> **filename queue**:
> - 生产者: read msg threadpool
> - 消费者: read file threadpool
> - 数据: `{char filename[256], int conn_fd}`
>
> **msg queue**:
> - 生产者: read file threadpool
> - 消费者: send msg threadpool
> - 数据: `{char *response, int length, int conn_fd}`
#### 消息队列核心实现
```c
/* 消息队列数据结构 */
typedef struct {
int conn_fd;
char filename[256]; /* filename queue 使用 */
char *response; /* msg queue 使用 */
int response_len; /* 响应长度 */
} queue_item_t;
typedef struct {
queue_item_t *items;
int capacity;
int front;
int rear;
int count;
pthread_mutex_t mutex;
sem_t slots;
sem_t items_sem;
} message_queue_t;
/* 初始化消息队列 */
void mq_init(message_queue_t *mq, int capacity) {
mq->items = (queue_item_t *)calloc(capacity, sizeof(queue_item_t));
mq->capacity = capacity;
mq->front = mq->rear = mq->count = 0;
pthread_mutex_init(&mq->mutex, NULL);
sem_init(&mq->slots, 0, capacity);
sem_init(&mq->items_sem, 0, 0);
}
/* 入队操作 */
void mq_enqueue(message_queue_t *mq, queue_item_t *item) {
sem_wait(&mq->slots);
pthread_mutex_lock(&mq->mutex);
mq->items[mq->rear] = *item;
mq->rear = (mq->rear + 1) % mq->capacity;
mq->count++;
pthread_mutex_unlock(&mq->mutex);
sem_post(&mq->items_sem);
}
/* 出队操作 */
void mq_dequeue(message_queue_t *mq, queue_item_t *item) {
sem_wait(&mq->items_sem);
pthread_mutex_lock(&mq->mutex);
*item = mq->items[mq->front];
mq->front = (mq->front + 1) % mq->capacity;
mq->count--;
pthread_mutex_unlock(&mq->mutex);
sem_post(&mq->slots);
}
```
#### 业务分割服务器主框架
```c
/* taskline.c - 业务分割 webserver 主框架 */
#include "taskline.h"
#include "pool.h"
#define READ_MSG_THREADS 4 /* 读消息线程池大小 */
#define READ_FILE_THREADS 4 /* 读文件线程池大小 */
#define SEND_MSG_THREADS 4 /* 发送消息线程池大小 */
#define QUEUE_CAPACITY 64 /* 消息队列容量 */
/* 两个消息队列 */
message_queue_t filename_queue; /* 文件名队列 */
message_queue_t msg_queue; /* 响应消息队列 */
/* 阶段一:读取并解析 HTTP 请求 */
void *read_msg_worker(void *arg) {
while (1) {
int conn_fd = pool_dequeue(&read_msg_pool);
char buf[8192];
int n = recv(conn_fd, buf, sizeof(buf) - 1, 0);
if (n <= 0) { close(conn_fd); continue; }
buf[n] = '\0';
/* 解析 HTTP 请求 */
char method[16], path[256], version[16];
sscanf(buf, "%s %s %s", method, path, version);
/* 构造文件路径 */
char filename[256];
snprintf(filename, sizeof(filename), ".%s", path);
/* 将 filename + conn_fd 加入 filename queue */
queue_item_t item;
item.conn_fd = conn_fd;
strncpy(item.filename, filename, sizeof(item.filename));
mq_enqueue(&filename_queue, &item);
}
return NULL;
}
/* 阶段二:读取文件内容 */
void *read_file_worker(void *arg) {
while (1) {
queue_item_t item;
mq_dequeue(&filename_queue, &item);
/* 打开并读取文件 */
int fd = open(item.filename, O_RDONLY);
if (fd < 0) {
/* 文件不存在,构造 404 响应 */
item.response = build_404_response(&item.response_len);
} else {
/* 读取文件,构造 200 响应 */
item.response = build_200_response(fd, &item.response_len);
close(fd);
}
/* 将 response + conn_fd 加入 msg queue */
mq_enqueue(&msg_queue, &item);
}
return NULL;
}
/* 阶段三:发送 HTTP 响应 */
void *send_msg_worker(void *arg) {
while (1) {
queue_item_t item;
mq_dequeue(&msg_queue, &item);
/* 发送响应 */
send(item.conn_fd, item.response, item.response_len, 0);
free(item.response);
close(item.conn_fd);
}
return NULL;
}
int main(int argc, char **argv) {
if (argc != 2) {
fprintf(stderr, "Usage: %s <port>\n", argv[0]);
exit(1);
}
/* 初始化两个消息队列 */
mq_init(&filename_queue, QUEUE_CAPACITY);
mq_init(&msg_queue, QUEUE_CAPACITY);
/* 初始化三个线程池 */
pool_init(&read_msg_pool, READ_MSG_THREADS, QUEUE_CAPACITY);
pool_init(&read_file_pool, READ_FILE_THREADS, QUEUE_CAPACITY);
pool_init(&send_msg_pool, SEND_MSG_THREADS, QUEUE_CAPACITY);
/* 启动三个阶段的工作线程 */
pthread_t tid;
for (int i = 0; i < READ_MSG_THREADS; i++)
pthread_create(&tid, NULL, read_msg_worker, NULL);
for (int i = 0; i < READ_FILE_THREADS; i++)
pthread_create(&tid, NULL, read_file_worker, NULL);
for (int i = 0; i < SEND_MSG_THREADS; i++)
pthread_create(&tid, NULL, send_msg_worker, NULL);
/* 主线程接受连接 */
int listen_fd = open_listen_sock(atoi(argv[1]));
printf("业务分割 webserver 启动,端口 %s\n", argv[1]);
while (1) {
struct sockaddr_in cliaddr;
socklen_t clien = sizeof(cliaddr);
int conn_fd = accept(listen_fd,
(struct sockaddr *)&cliaddr, &clien);
if (conn_fd < 0) continue;
/* 将新连接放入 read msg 线程池的任务队列 */
pool_enqueue(&read_msg_pool, conn_fd);
}
return 0;
}
```
> [!tip] 与 [[实践02_多进程多线程服务器]] 的对比
> - 实践02 中每个请求由单个线程串行完成「读消息 -> 读文件 -> 发送响应」全部工作
> - 本实验将三个阶段拆分到不同的线程池,实现了 **流水线并行**
> - 当一个线程在发送响应时,其他线程已经在处理新请求的文件读取,提高了吞吐量
---
## 五、性能测试
### 5.1 http_load 测试
```bash
# 低并发测试
http_load -parallel 5 -fetches 50 -seconds 20 urls
# 中并发测试
http_load -parallel 20 -fetches 200 -seconds 20 urls
# 高并发测试
http_load -parallel 50 -fetches 500 -seconds 20 urls
```
> [!example] http_load 输出指标
> | 指标 | 含义 |
> |------|------|
> | fetches | 成功完成的请求总数 |
> | elapsed | 测试耗时(秒) |
> | mean bytes/transfer | 平均每次传输字节数 |
> | fetches/sec | 每秒完成的请求数(吞吐量) |
> | msecs/connect | 平均连接建立时间 |
> | msecs/first-response | 平均首字节响应时间 |
> | HTTP response codes | 各状态码的出现次数 |
### 5.2 系统性能监测
#### vmstat 监控
```bash
# 每 2 秒采集一次,共 10 次
vmstat 2 10
```
> [!info] vmstat 关键字段
> | 字段 | 含义 |
> |------|------|
> | `r` | 运行队列中的进程数 |
> | `b` | 阻塞等待 I/O 的进程数 |
> | `us` | 用户态 CPU 占用百分比 |
> | `sy` | 内核态 CPU 占用百分比 |
> | `wa` | I/O 等待 CPU 时间百分比 |
> | `free` | 空闲内存KB |
> | `buff` | 缓冲区大小KB |
#### iostat 监控
```bash
# 每 2 秒采集一次,共 10 次
iostat -k 2 10
```
> [!info] iostat 关键字段
> | 字段 | 含义 |
> |------|------|
> | `tps` | 每秒传输次数 |
> | `kB_read/s` | 每秒读取数据量KB |
> | `kB_wrtn/s` | 每秒写入数据量KB |
> | `await` | 平均 I/O 等待时间ms |
> | `util` | 设备利用率百分比 |
#### gprof 性能分析
```bash
# 编译时加入性能分析选项
gcc -pg -o webserver webserver.c pool.c taskline.c -lpthread
# 运行服务器并进行测试
./webserver 8080 &
http_load -parallel 20 -fetches 200 -seconds 20 urls
# 生成性能分析报告
gprof ./webserver gmon.out > perf.txt
cat perf.txt
```
> [!warning] gprof 使用注意
> 1. 编译时必须加 `-pg` 选项
> 2. 程序需要正常退出(`Ctrl+C` 终止可能无法生成 `gmon.out`
> 3. 分析结果包括各函数的调用次数和执行时间占比
### 5.3 性能监测指标
> [!important] 实验需要监测的关键指标
>
> **线程池指标**:
> - 线程池中线程的 **平均活跃时间** 及 **阻塞时间**
> - 线程 **最高/最低/平均活跃数量**
>
> **消息队列指标**:
> - filename queue 中的消息长度(积压情况)
> - msg queue 中的消息长度(积压情况)
>
> **系统资源指标**:
> - 系统 I/O 使用率(`iostat`
> - 内存使用情况(`vmstat`
> - CPU 使用率和等待率(`vmstat`
---
## 六、性能对比分析
### 不同模型的性能对比
| 模型 | 优点 | 缺点 | 适用场景 |
|------|------|------|----------|
| 迭代式服务器 | 实现简单 | 无法并发处理 | 学习演示 |
| 多进程服务器([[实践02_多进程多线程服务器]] | 进程隔离,稳定性好 | fork 开销大 | 连接数少 |
| 多线程服务器([[实践02_多进程多线程服务器]] | 共享内存,开销小 | 线程数爆炸风险 | 通用场景 |
| 预线程化服务器 | 线程复用,无创建开销 | 固定线程数 | 高并发 |
| 业务分割模型 | 流水线并行,吞吐量高 | 复杂度高,队列延迟 | 极高并发 |
### 业务分割模型的优缺点
> [!success] 优势
> 1. **流水线并行**: 三个阶段可以同时处理不同的请求
> 2. **资源隔离**: I/O 密集操作和计算操作分别由不同线程池处理
> 3. **可独立扩展**: 每个阶段的线程数可以根据瓶颈独立调整
> 4. **缓存友好**: 文件读取线程可以实现文件内容缓存
> [!failure] 劣势
> 1. **复杂度高**: 需要管理多个线程池和消息队列
> 2. **队列延迟**: 消息在队列中排队等待会产生额外延迟
> 3. **内存开销**: 消息队列和中间数据结构占用额外内存
> 4. **调试困难**: 多线程池间的数据流转增加了调试难度
---
## 七、常见问题与解决
| 问题 | 原因 | 解决方法 |
|------|------|----------|
| 编译报错 `undefined reference to pthread` | 未链接 pthread 库 | 编译时加 `-lpthread` |
| 队列满导致线程阻塞 | 消息队列容量不足 | 增大 `QUEUE_CAPACITY` 或增加下游线程数 |
| 文件读取阶段成为瓶颈 | 磁盘 I/O 速度限制 | 增加 read file 线程数,或引入文件缓存 |
| 内存泄漏 | response 未 `free` | 确保 send 线程发送后释放内存 |
| 连接泄漏 | 异常路径未 `close(conn_fd)` | 在每个异常处理分支都关闭 socket |
| 线程安全问题 | 共享变量未加锁 | 使用互斥锁保护所有共享数据 |
| http_load 测试连接被拒 | 服务器未启动或端口错误 | 确认服务器运行状态和端口号 |
---
## 八、实验总结
通过本实验,应掌握以下能力:
1. **线程池设计与实现**: 理解预线程化技术,掌握基于生产者-消费者模型的线程池实现方法
2. **业务分割架构**: 理解将复杂业务拆分为多个阶段并行处理的设计思想
3. **消息队列设计**: 掌握线程间安全通信的消息队列实现
4. **性能测试方法**: 学会使用 `http_load``vmstat``iostat``gprof` 等工具进行全面的性能测试
5. **性能优化思路**: 通过对比分析不同模型的性能指标,理解并发服务器的优化方向
> [!question] 思考题
> 1. 如果文件读取阶段成为瓶颈,除了增加线程数外,还有哪些优化策略?
> 2. 业务分割模型中,如何确定每个线程池的最佳线程数量?
> 3. 与 [[10_并发服务器]] 中的预线程化模型相比,业务分割模型在什么场景下优势更明显?
> 4. 如果需要支持动态内容(如 CGI业务分割模型应如何调整
---
## 相关链接
- [[07_多线程编程]] - pthread 编程基础、生产者-消费者模型
- [[10_并发服务器]] - 预线程化服务器模型
- [[实践02_多进程多线程服务器]] - 多进程和多线程服务器实现