13 KiB
13 KiB
实验03 Linux 多线程编程
实验目的
- 掌握 POSIX 线程(pthread)的创建、等待和终止
- 理解线程间共享地址空间的特性
- 掌握信号量(semaphore)在线程同步中的使用
- 理解竞态条件(race condition)的成因及修复方法
- 学会使用生产者-消费者模型解决缓冲区同步问题
- 了解并行计算中的加速比概念
涉及知识点
pthread_create/pthread_join/pthread_detach- POSIX 信号量:
sem_init/sem_wait(P 操作) /sem_post(V 操作) - 互斥锁(mutex)与信号量的区别
- 生产者-消费者问题
- 竞态条件与临界区保护
- 并行求和与加速比测量
forkvspthread_create的开销对比
任务一:task61.c —— 三线程交替打印
任务要求
创建 3 个线程 T1、T2、T3,每个线程执行 5 次 printf 打印自己的线程 ID 和当前是第几次打印,每次打印后随机等待 1~5 秒。
关键代码提示
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <time.h>
#define NUM_THREADS 3
#define PRINT_COUNT 5
void *thread_func(void *arg) {
int id = *(int *)arg;
for (int i = 0; i < PRINT_COUNT; i++) {
printf("T%d: 第%d次打印 (PID=%d, TID=%lu)\n",
id, i + 1, getpid(), (unsigned long)pthread_self());
int wait_time = rand() % 5 + 1; // 1~5 秒
sleep(wait_time);
}
return NULL;
}
int main() {
pthread_t threads[NUM_THREADS];
int ids[NUM_THREADS];
srand(time(NULL));
for (int i = 0; i < NUM_THREADS; i++) {
ids[i] = i + 1;
pthread_create(&threads[i], NULL, thread_func, &ids[i]);
}
for (int i = 0; i < NUM_THREADS; i++)
pthread_join(threads[i], NULL);
printf("所有线程执行完毕\n");
return 0;
}
// 编译:gcc -o task61 task61.c -lpthread
常见问题
| 问题 | 原因 | 解决方法 |
|---|---|---|
| 输出交错混乱 | 多线程并发写 stdout | 正常现象,可用互斥锁保护 printf |
| 传参错误 | 循环变量地址被覆盖 | 使用数组存储各线程参数,而非循环变量地址 |
| 编译报错 undefined reference | 未链接 pthread | 加 -lpthread |
任务二:task62.c —— 用信号量修复竞态条件
任务要求
给定一个存在竞态条件的 badcount.c 程序(多线程同时对共享计数器自增),使用信号量修复该问题,使最终结果正确。
关键代码提示
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#define NTHREADS 4
#define NITERS 1000000
volatile long counter = 0; // 共享计数器
sem_t mutex; // 信号量(用作互斥锁)
void *badcount(void *arg) {
for (int i = 0; i < NITERS; i++) {
sem_wait(&mutex); // P 操作:进入临界区
counter++;
sem_post(&mutex); // V 操作:离开临界区
}
return NULL;
}
int main() {
pthread_t threads[NTHREADS];
sem_init(&mutex, 0, 1); // 初始值为 1,相当于互斥锁
for (int i = 0; i < NTHREADS; i++)
pthread_create(&threads[i], NULL, badcount, NULL);
for (int i = 0; i < NTHREADS; i++)
pthread_join(threads[i], NULL);
printf("期望值: %d\n", NTHREADS * NITERS);
printf("实际值: %ld\n", counter);
printf("差值: %ld\n",
(long)(NTHREADS * NITERS) - counter);
sem_destroy(&mutex);
return 0;
}
修复原理
- 不加信号量时,
counter++不是原子操作(读-改-写三步),多线程交错执行导致丢失更新 sem_wait/sem_post将counter++包裹为临界区,保证同一时刻只有一个线程执行
常见问题
| 问题 | 原因 | 解决方法 |
|---|---|---|
| 修复后差值仍不为 0 | 信号量使用错误 | 确保 sem_wait/sem_post 配对,且初始值为 1 |
| 程序死锁 | sem_wait 多次但 sem_post 不足 |
检查每个 sem_wait 是否有对应的 sem_post |
| 性能下降严重 | 信号量粒度太大 | 可尝试减小临界区范围 |
任务三:task63.c —— 生产者-消费者问题
任务要求
实现 k 个生产者线程和 m 个消费者线程,共享一个大小为 N 的环形缓冲区。生产者向缓冲区放入数据,消费者从缓冲区取出数据,使用信号量实现同步。
关键代码提示
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#define N 10 // 缓冲区大小
#define K 3 // 生产者数量
#define M 2 // 消费者数量
#define ITERS 20 // 每个生产者生产数量
int buffer[N]; // 环形缓冲区
int in = 0; // 生产者写入位置
int out = 0; // 消费者读取位置
sem_t mutex; // 互斥访问缓冲区
sem_t slots; // 空闲槽位数(初始 N)
sem_t items; // 已有物品数(初始 0)
void *producer(void *arg) {
int id = *(int *)arg;
for (int i = 0; i < ITERS; i++) {
int item = id * 100 + i;
sem_wait(&slots); // 等待空闲槽位
sem_wait(&mutex); // 进入临界区
buffer[in] = item;
printf("生产者%d: 放入 buffer[%d] = %d\n", id, in, item);
in = (in + 1) % N;
sem_post(&mutex); // 离开临界区
sem_post(&items); // 增加物品计数
}
return NULL;
}
void *consumer(void *arg) {
int id = *(int *)arg;
int total = (K * ITERS) / M; // 每个消费者消费数量
for (int i = 0; i < total; i++) {
sem_wait(&items); // 等待物品
sem_wait(&mutex); // 进入临界区
int item = buffer[out];
printf("消费者%d: 取出 buffer[%d] = %d\n", id, out, item);
out = (out + 1) % N;
sem_post(&mutex); // 离开临界区
sem_post(&slots); // 增加空闲槽位
}
return NULL;
}
int main() {
pthread_t ptid[K], ctid[M];
int pids[K], cids[M];
sem_init(&mutex, 0, 1);
sem_init(&slots, 0, N);
sem_init(&items, 0, 0);
for (int i = 0; i < K; i++) {
pids[i] = i;
pthread_create(&ptid[i], NULL, producer, &pids[i]);
}
for (int i = 0; i < M; i++) {
cids[i] = i;
pthread_create(&ctid[i], NULL, consumer, &cids[i]);
}
for (int i = 0; i < K; i++) pthread_join(ptid[i], NULL);
for (int i = 0; i < M; i++) pthread_join(ctid[i], NULL);
sem_destroy(&mutex);
sem_destroy(&slots);
sem_destroy(&items);
printf("所有生产者和消费者完成\n");
return 0;
}
信号量使用要点
| 信号量 | 初始值 | 含义 |
|---|---|---|
mutex |
1 | 互斥锁,保护临界区 |
slots |
N | 空闲槽位数,生产者 P(slots)、消费者 V(slots) |
items |
0 | 已有物品数,消费者 P(items)、生产者 V(items) |
关键顺序: P 操作时先 P(slots/items) 再 P(mutex),否则可能死锁。
常见问题
| 问题 | 原因 | 解决方法 |
|---|---|---|
| 死锁 | P 操作顺序不对 | 先 P(slots) 再 P(mutex) |
| 缓冲区越界 | 环形索引计算错误 | 使用 % N 取模 |
| 生产者/消费者数量不匹配 | 总生产量 != 总消费量 | 合理分配每个线程的生产/消费数量 |
任务四:task64.c —— 并行求和与加速比
任务要求
实现 psum64.c:将长度为 n 的数组分成若干段,每个线程计算一段的部分和,最后汇总得到总和。分别测试 1、2、4、8、16 个线程的执行时间,计算加速比。
关键代码提示
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/time.h>
#define MAXN 100000000 // 1 亿元素
#define MAX_THREADS 16
long a[MAXN]; // 全局数组
long psum[MAX_THREADS]; // 各线程的部分和
int n, num_threads;
void *sum_thread(void *arg) {
int id = *(int *)arg;
long chunk = n / num_threads;
long start = id * chunk;
long end = (id == num_threads - 1) ? n : start + chunk;
psum[id] = 0;
for (long i = start; i < end; i++)
psum[id] += a[i];
return NULL;
}
long get_time_us() {
struct timeval tv;
gettimeofday(&tv, NULL);
return tv.tv_sec * 1000000L + tv.tv_usec;
}
int main() {
n = MAXN;
for (int i = 0; i < n; i++)
a[i] = i + 1; // 初始化数组
int thread_counts[] = {1, 2, 4, 8, 16};
long base_time = 0;
for (int t = 0; t < 5; t++) {
num_threads = thread_counts[t];
pthread_t threads[MAX_THREADS];
int ids[MAX_THREADS];
long start = get_time_us();
for (int i = 0; i < num_threads; i++) {
ids[i] = i;
pthread_create(&threads[i], NULL, sum_thread, &ids[i]);
}
for (int i = 0; i < num_threads; i++)
pthread_join(threads[i], NULL);
long total = 0;
for (int i = 0; i < num_threads; i++)
total += psum[i];
long elapsed = get_time_us() - start;
if (t == 0) base_time = elapsed;
printf("线程数=%2d, 总和=%ld, 耗时=%ldus, 加速比=%.2f\n",
num_threads, total, elapsed,
(double)base_time / elapsed);
}
return 0;
}
常见问题
| 问题 | 原因 | 解决方法 |
|---|---|---|
| 加速比不理想 | 线程创建开销、缓存竞争 | 正常现象,线程数超过核心数后收益递减 |
| 总和不对 | 分段边界计算错误 | 注意最后一段包含剩余元素 |
| 加速比超过线程数 | 计时误差 | 多次测量取平均值 |
任务五:task66.c —— fork 与 pthread_create 开销对比(选做)
任务要求
分别测量 fork() 和 pthread_create() 的执行时间,对比进程创建与线程创建的开销差异。
关键代码提示
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/wait.h>
#include <sys/time.h>
#define N 10000
int main() {
struct timeval start, end;
// 测量 fork
gettimeofday(&start, NULL);
for (int i = 0; i < N; i++) {
pid_t pid = fork();
if (pid == 0) _exit(0);
wait(NULL);
}
gettimeofday(&end, NULL);
printf("fork x%d: %ld us (avg %.1f us)\n", N,
end.tv_sec * 1000000L + end.tv_usec
- start.tv_sec * 1000000L - start.tv_usec,
(double)(end.tv_sec * 1000000L + end.tv_usec
- start.tv_sec * 1000000L - start.tv_usec) / N);
// 测量 pthread_create
pthread_t tid;
void *dummy(void *a) { return NULL; }
gettimeofday(&start, NULL);
for (int i = 0; i < N; i++) {
pthread_create(&tid, NULL, dummy, NULL);
pthread_join(tid, NULL);
}
gettimeofday(&end, NULL);
printf("pthread_create x%d: %ld us (avg %.1f us)\n", N,
end.tv_sec * 1000000L + end.tv_usec
- start.tv_sec * 1000000L - start.tv_usec,
(double)(end.tv_sec * 1000000L + end.tv_usec
- start.tv_sec * 1000000L - start.tv_usec) / N);
return 0;
}
任务六:task67.c —— 动态线程池(选做)
任务要求
实现一个动态线程池 sbuf_t,当缓冲区满时容量翻倍,当缓冲区空且线程数过多时容量减半。
关键代码提示
typedef struct {
int *buf; // 缓冲区
int n; // 容量
int front; // 队头
int rear; // 队尾
sem_t mutex; // 互斥
sem_t slots; // 空闲槽位
sem_t items; // 已有物品
} sbuf_t;
void sbuf_init(sbuf_t *sp, int n) {
sp->buf = malloc(n * sizeof(int));
sp->n = n;
sp->front = sp->rear = 0;
sem_init(&sp->mutex, 0, 1);
sem_init(&sp->slots, 0, n);
sem_init(&sp->items, 0, 0);
}
// 动态扩容:满时翻倍
void sbuf_insert(sbuf_t *sp, int item) {
sem_wait(&sp->slots);
sem_wait(&sp->mutex);
sp->buf[sp->rear] = item;
sp->rear = (sp->rear + 1) % sp->n;
// 检查是否需要扩容(简化判断)
// 实际实现需要更精细的逻辑
sem_post(&sp->mutex);
sem_post(&sp->items);
}
int sbuf_remove(sbuf_t *sp) {
sem_wait(&sp->items);
sem_wait(&sp->mutex);
int item = sp->buf[sp->front];
sp->front = (sp->front + 1) % sp->n;
sem_post(&sp->mutex);
sem_post(&sp->slots);
return item;
}
实验总结
通过本实验,应掌握以下能力:
- 使用
pthread_create/pthread_join创建和管理线程 - 使用 POSIX 信号量实现线程同步
- 理解竞态条件的成因,学会用信号量/互斥锁保护临界区
- 实现经典的生产者-消费者同步模型
- 理解并行计算中的加速比及其限制因素
- 了解
fork与pthread_create的开销差异