/* 任务7(任选):编写一个多线程并发应用程序task67.c。 1)主线程预先创建5个工作线程,然后通过缓冲区发放任务,对等线程从缓冲区提取任务执行,当缓冲区为空时,等待任务。 2)每个任务内容是以秒为单位的整数等待时间,对等线程取得任务后,调用sleep睡眠指定的秒数,就算任务执行完成。 3)主线程从终端读取命令,发布任务,命令格式为"<任务数> <秒数>",如输入"10 5"表示创建10个任务,每个任务的运行时间是5秒。 4)应用程序应支持动态地增加或减少工作线程的数目,一个策略是当缓冲区变满时,将线程数量翻倍,而当缓冲区变为空时,将线程数目减半。 5)每次工作线程发送变动时,应输出相关信息。 */ #include #include #include #include #include #include #define DEFAULT_BUF_SIZE 20 #define INITIAL_THREADS 5 #define MAX_THREADS 256 typedef struct { int *buf; int n; int inpos; int outpos; sem_t mutex; sem_t slots; sem_t items; } sbuf_t; /* 线程池管理 */ int current_thread_count = INITIAL_THREADS; int thread_should_exit[MAX_THREADS]; pthread_t worker_tids[MAX_THREADS]; pthread_mutex_t pool_mutex = PTHREAD_MUTEX_INITIALIZER; volatile int program_running = 1; sbuf_t buf; void sbuf_init(sbuf_t *sp, int n) { sp->buf = (int *)malloc(n * sizeof(int)); sp->n = n; sp->inpos = 0; sp->outpos = 0; sem_init(&sp->mutex, 0, 1); sem_init(&sp->slots, 0, n); sem_init(&sp->items, 0, 0); } void sbuf_deinit(sbuf_t *sp) { free(sp->buf); sem_destroy(&sp->mutex); sem_destroy(&sp->slots); sem_destroy(&sp->items); } void sbuf_insert(sbuf_t *sp, int item) { sem_wait(&sp->slots); sem_wait(&sp->mutex); sp->buf[sp->inpos] = item; sp->inpos = (sp->inpos + 1) % sp->n; sem_post(&sp->mutex); sem_post(&sp->items); } int sbuf_remove(sbuf_t *sp) { sem_wait(&sp->items); sem_wait(&sp->mutex); int item = sp->buf[sp->outpos]; sp->outpos = (sp->outpos + 1) % sp->n; sem_post(&sp->mutex); sem_post(&sp->slots); return item; } int sbuf_is_full(sbuf_t *sp) { int val; sem_getvalue(&sp->slots, &val); return val == 0; } int sbuf_is_empty(sbuf_t *sp) { int val; sem_getvalue(&sp->items, &val); return val == 0; } void *worker(void *arg) { int my_id = *((int *)arg); while (1) { /* 检查是否应该退出 */ pthread_mutex_lock(&pool_mutex); if (thread_should_exit[my_id]) { pthread_mutex_unlock(&pool_mutex); break; } pthread_mutex_unlock(&pool_mutex); int task_seconds = sbuf_remove(&buf); /* 再次检查退出标志(可能在等待期间被标记) */ pthread_mutex_lock(&pool_mutex); if (thread_should_exit[my_id]) { pthread_mutex_unlock(&pool_mutex); /* 任务还在缓冲区中被取出了,需要重新放回去 */ sbuf_insert(&buf, task_seconds); break; } pthread_mutex_unlock(&pool_mutex); printf("[Worker %d] executing task: sleep %d seconds\n", my_id, task_seconds); sleep(task_seconds); printf("[Worker %d] task completed\n", my_id); } printf("[Worker %d] exiting\n", my_id); return NULL; } void adjust_threads_up() { pthread_mutex_lock(&pool_mutex); int new_count = current_thread_count * 2; if (new_count > MAX_THREADS) new_count = MAX_THREADS; if (new_count <= current_thread_count) { pthread_mutex_unlock(&pool_mutex); return; } int added = new_count - current_thread_count; for (int i = 0; i < added; i++) { int tid = current_thread_count + i; thread_should_exit[tid] = 0; int *arg = malloc(sizeof(int)); *arg = tid; pthread_create(&worker_tids[tid], NULL, worker, arg); } current_thread_count = new_count; printf("缓冲区变满,工作线程数翻倍,当前工作线程数为%d个\n", current_thread_count); pthread_mutex_unlock(&pool_mutex); } void adjust_threads_down() { pthread_mutex_lock(&pool_mutex); int new_count = current_thread_count / 2; if (new_count < 1) new_count = 1; if (new_count >= current_thread_count) { pthread_mutex_unlock(&pool_mutex); return; } int removed = current_thread_count - new_count; /* 标记尾部线程为退出 */ for (int i = 0; i < removed; i++) { int tid = current_thread_count - 1 - i; thread_should_exit[tid] = 1; } current_thread_count = new_count; printf("缓冲区变空,工作线程数减半,当前工作线程数为%d个\n", current_thread_count); /* 插入额外的空槽通知以便标记线程能退出 */ for (int i = 0; i < removed; i++) { sbuf_insert(&buf, 0); } pthread_mutex_unlock(&pool_mutex); } int main(int argc, char *argv[]) { int buf_size = DEFAULT_BUF_SIZE; if (argc >= 2) buf_size = atoi(argv[1]); printf("Dynamic Thread Pool\n"); printf("Buffer size: %d, Initial threads: %d\n", buf_size, INITIAL_THREADS); printf("Commands: | 'quit' to exit\n\n"); sbuf_init(&buf, buf_size); /* 创建初始线程 */ for (int i = 0; i < INITIAL_THREADS; i++) { thread_should_exit[i] = 0; int *arg = malloc(sizeof(int)); *arg = i; pthread_create(&worker_tids[i], NULL, worker, arg); } /* 主线程命令循环 */ char line[256]; while (program_running) { printf("> "); fflush(stdout); if (fgets(line, sizeof(line), stdin) == NULL) break; if (strncmp(line, "quit", 4) == 0) break; int task_count, task_seconds; if (sscanf(line, "%d %d", &task_count, &task_seconds) != 2) { printf("Invalid command. Usage: \n"); continue; } printf("Adding %d tasks, each %d seconds...\n", task_count, task_seconds); for (int i = 0; i < task_count; i++) { /* 插入前检查是否满 */ if (sbuf_is_full(&buf)) { adjust_threads_up(); } sbuf_insert(&buf, task_seconds); } /* 检查插入后是否需要调整 */ int slots_val, items_val; sem_getvalue(&buf.slots, &slots_val); sem_getvalue(&buf.items, &items_val); printf("Buffer status: %d/%d slots free, %d items pending\n", slots_val, buf_size, items_val); } /* 等待所有任务完成并通知线程退出 */ printf("\nWaiting for pending tasks to complete...\n"); while (!sbuf_is_empty(&buf)) { sleep(1); } /* 标记所有线程退出并唤醒它们 */ pthread_mutex_lock(&pool_mutex); for (int i = 0; i < current_thread_count; i++) { thread_should_exit[i] = 1; } /* 插入足够多的信号量以唤醒所有等待的线程 */ for (int i = 0; i < current_thread_count; i++) { sbuf_insert(&buf, 0); } pthread_mutex_unlock(&pool_mutex); /* 等待所有线程结束 */ for (int i = 0; i < current_thread_count; i++) { pthread_join(worker_tids[i], NULL); } printf("All workers exited. Program terminating.\n"); sbuf_deinit(&buf); return 0; }