Files
C-exp-collection/exp1/task67.c
2026-06-09 06:43:13 +02:00

273 lines
7.4 KiB
C
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/*
任务7任选编写一个多线程并发应用程序task67.c。
1主线程预先创建5个工作线程然后通过缓冲区发放任务对等线程从缓冲区提取任务执行当缓冲区为空时等待任务。
2每个任务内容是以秒为单位的整数等待时间对等线程取得任务后调用sleep睡眠指定的秒数就算任务执行完成。
3主线程从终端读取命令发布任务命令格式为"<任务数> <秒数>",如输入"10 5"表示创建10个任务每个任务的运行时间是5秒。
4应用程序应支持动态地增加或减少工作线程的数目一个策略是当缓冲区变满时将线程数量翻倍而当缓冲区变为空时将线程数目减半。
5每次工作线程发送变动时应输出相关信息。
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#define DEFAULT_BUF_SIZE 20
#define INITIAL_THREADS 5
#define MAX_THREADS 256
typedef struct {
int *buf;
int n;
int inpos;
int outpos;
sem_t mutex;
sem_t slots;
sem_t items;
} sbuf_t;
/* 线程池管理 */
int current_thread_count = INITIAL_THREADS;
int thread_should_exit[MAX_THREADS];
pthread_t worker_tids[MAX_THREADS];
pthread_mutex_t pool_mutex = PTHREAD_MUTEX_INITIALIZER;
volatile int program_running = 1;
sbuf_t buf;
void sbuf_init(sbuf_t *sp, int n)
{
sp->buf = (int *)malloc(n * sizeof(int));
sp->n = n;
sp->inpos = 0;
sp->outpos = 0;
sem_init(&sp->mutex, 0, 1);
sem_init(&sp->slots, 0, n);
sem_init(&sp->items, 0, 0);
}
void sbuf_deinit(sbuf_t *sp)
{
free(sp->buf);
sem_destroy(&sp->mutex);
sem_destroy(&sp->slots);
sem_destroy(&sp->items);
}
void sbuf_insert(sbuf_t *sp, int item)
{
sem_wait(&sp->slots);
sem_wait(&sp->mutex);
sp->buf[sp->inpos] = item;
sp->inpos = (sp->inpos + 1) % sp->n;
sem_post(&sp->mutex);
sem_post(&sp->items);
}
int sbuf_remove(sbuf_t *sp)
{
sem_wait(&sp->items);
sem_wait(&sp->mutex);
int item = sp->buf[sp->outpos];
sp->outpos = (sp->outpos + 1) % sp->n;
sem_post(&sp->mutex);
sem_post(&sp->slots);
return item;
}
int sbuf_is_full(sbuf_t *sp)
{
int val;
sem_getvalue(&sp->slots, &val);
return val == 0;
}
int sbuf_is_empty(sbuf_t *sp)
{
int val;
sem_getvalue(&sp->items, &val);
return val == 0;
}
void *worker(void *arg)
{
int my_id = *((int *)arg);
while (1)
{
/* 检查是否应该退出 */
pthread_mutex_lock(&pool_mutex);
if (thread_should_exit[my_id])
{
pthread_mutex_unlock(&pool_mutex);
break;
}
pthread_mutex_unlock(&pool_mutex);
int task_seconds = sbuf_remove(&buf);
/* 再次检查退出标志(可能在等待期间被标记) */
pthread_mutex_lock(&pool_mutex);
if (thread_should_exit[my_id])
{
pthread_mutex_unlock(&pool_mutex);
/* 任务还在缓冲区中被取出了,需要重新放回去 */
sbuf_insert(&buf, task_seconds);
break;
}
pthread_mutex_unlock(&pool_mutex);
printf("[Worker %d] executing task: sleep %d seconds\n", my_id, task_seconds);
sleep(task_seconds);
printf("[Worker %d] task completed\n", my_id);
}
printf("[Worker %d] exiting\n", my_id);
return NULL;
}
void adjust_threads_up()
{
pthread_mutex_lock(&pool_mutex);
int new_count = current_thread_count * 2;
if (new_count > MAX_THREADS) new_count = MAX_THREADS;
if (new_count <= current_thread_count)
{
pthread_mutex_unlock(&pool_mutex);
return;
}
int added = new_count - current_thread_count;
for (int i = 0; i < added; i++)
{
int tid = current_thread_count + i;
thread_should_exit[tid] = 0;
int *arg = malloc(sizeof(int));
*arg = tid;
pthread_create(&worker_tids[tid], NULL, worker, arg);
}
current_thread_count = new_count;
printf("缓冲区变满,工作线程数翻倍,当前工作线程数为%d个\n", current_thread_count);
pthread_mutex_unlock(&pool_mutex);
}
void adjust_threads_down()
{
pthread_mutex_lock(&pool_mutex);
int new_count = current_thread_count / 2;
if (new_count < 1) new_count = 1;
if (new_count >= current_thread_count)
{
pthread_mutex_unlock(&pool_mutex);
return;
}
int removed = current_thread_count - new_count;
/* 标记尾部线程为退出 */
for (int i = 0; i < removed; i++)
{
int tid = current_thread_count - 1 - i;
thread_should_exit[tid] = 1;
}
current_thread_count = new_count;
printf("缓冲区变空,工作线程数减半,当前工作线程数为%d个\n", current_thread_count);
/* 插入额外的空槽通知以便标记线程能退出 */
for (int i = 0; i < removed; i++)
{
sbuf_insert(&buf, 0);
}
pthread_mutex_unlock(&pool_mutex);
}
int main(int argc, char *argv[])
{
int buf_size = DEFAULT_BUF_SIZE;
if (argc >= 2)
buf_size = atoi(argv[1]);
printf("Dynamic Thread Pool\n");
printf("Buffer size: %d, Initial threads: %d\n", buf_size, INITIAL_THREADS);
printf("Commands: <task_count> <seconds> | 'quit' to exit\n\n");
sbuf_init(&buf, buf_size);
/* 创建初始线程 */
for (int i = 0; i < INITIAL_THREADS; i++)
{
thread_should_exit[i] = 0;
int *arg = malloc(sizeof(int));
*arg = i;
pthread_create(&worker_tids[i], NULL, worker, arg);
}
/* 主线程命令循环 */
char line[256];
while (program_running)
{
printf("> ");
fflush(stdout);
if (fgets(line, sizeof(line), stdin) == NULL)
break;
if (strncmp(line, "quit", 4) == 0)
break;
int task_count, task_seconds;
if (sscanf(line, "%d %d", &task_count, &task_seconds) != 2)
{
printf("Invalid command. Usage: <task_count> <seconds>\n");
continue;
}
printf("Adding %d tasks, each %d seconds...\n", task_count, task_seconds);
for (int i = 0; i < task_count; i++)
{
/* 插入前检查是否满 */
if (sbuf_is_full(&buf))
{
adjust_threads_up();
}
sbuf_insert(&buf, task_seconds);
}
/* 检查插入后是否需要调整 */
int slots_val, items_val;
sem_getvalue(&buf.slots, &slots_val);
sem_getvalue(&buf.items, &items_val);
printf("Buffer status: %d/%d slots free, %d items pending\n",
slots_val, buf_size, items_val);
}
/* 等待所有任务完成并通知线程退出 */
printf("\nWaiting for pending tasks to complete...\n");
while (!sbuf_is_empty(&buf))
{
sleep(1);
}
/* 标记所有线程退出并唤醒它们 */
pthread_mutex_lock(&pool_mutex);
for (int i = 0; i < current_thread_count; i++)
{
thread_should_exit[i] = 1;
}
/* 插入足够多的信号量以唤醒所有等待的线程 */
for (int i = 0; i < current_thread_count; i++)
{
sbuf_insert(&buf, 0);
}
pthread_mutex_unlock(&pool_mutex);
/* 等待所有线程结束 */
for (int i = 0; i < current_thread_count; i++)
{
pthread_join(worker_tids[i], NULL);
}
printf("All workers exited. Program terminating.\n");
sbuf_deinit(&buf);
return 0;
}