This commit is contained in:
2026-05-14 09:27:48 +08:00
parent 1100043d73
commit f8ae30583d
17 changed files with 1007 additions and 0 deletions

View File

@@ -0,0 +1,177 @@
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <string.h>
#include <unistd.h>
typedef struct {
int *buf;
int n;
int outpos;
int inpos;
sem_t mutex;
sem_t slots;
sem_t items;
} sbuf_t;
int num_workers = 5;
int target_workers = 5;
pthread_mutex_t worker_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_t *workers = NULL;
sbuf_t *sbuf_ptr = NULL;
volatile int running = 1;
void sbuf_init(sbuf_t *sp, int n) {
sp->buf = (int *)malloc(n * sizeof(int));
sp->n = n;
sp->outpos = 0;
sp->inpos = 0;
sem_init(&sp->mutex, 0, 1);
sem_init(&sp->slots, 0, n);
sem_init(&sp->items, 0, 0);
}
void sbuf_deinit(sbuf_t *sp) {
free(sp->buf);
sem_destroy(&sp->mutex);
sem_destroy(&sp->slots);
sem_destroy(&sp->items);
}
void sbuf_insert(sbuf_t *sp, int item) {
sem_wait(&sp->slots);
sem_wait(&sp->mutex);
sp->buf[sp->inpos] = item;
sp->inpos = (sp->inpos + 1) % sp->n;
sem_post(&sp->mutex);
sem_post(&sp->items);
}
int sbuf_remove(sbuf_t *sp) {
sem_wait(&sp->items);
sem_wait(&sp->mutex);
int item = sp->buf[sp->outpos];
sp->outpos = (sp->outpos + 1) % sp->n;
sem_post(&sp->mutex);
sem_post(&sp->slots);
return item;
}
void *worker_thread(void *arg) {
int id = *(int *)arg;
free(arg);
sbuf_t *sp = sbuf_ptr;
while (running) {
int seconds = sbuf_remove(sp);
if (seconds == -1) break;
printf("[Worker %d] executing task: sleep %d seconds\n", id, seconds);
sleep(seconds);
printf("[Worker %d] task completed\n", id);
}
return NULL;
}
void adjust_workers() {
pthread_mutex_lock(&worker_mutex);
int current = num_workers;
int target = target_workers;
if (target > current) {
/* Increase workers */
workers = realloc(workers, target * sizeof(pthread_t));
for (int i = current; i < target; i++) {
int *id = malloc(sizeof(int));
*id = i + 1;
pthread_create(&workers[i], NULL, worker_thread, id);
}
num_workers = target;
} else if (target < current) {
/* Decrease workers - insert poison pills */
int to_remove = current - target;
for (int i = 0; i < to_remove; i++) {
sbuf_insert(sbuf_ptr, -1);
}
for (int i = target; i < current; i++) {
pthread_join(workers[i], NULL);
}
num_workers = target;
}
pthread_mutex_unlock(&worker_mutex);
}
int main(int argc, char *argv[]) {
if (argc != 2) {
fprintf(stderr, "Usage: %s <buffer_size>\n", argv[0]);
return 1;
}
int buf_size = atoi(argv[1]);
sbuf_t buf;
sbuf_ptr = &buf;
sbuf_init(&buf, buf_size);
/* Create initial 5 workers */
workers = malloc(5 * sizeof(pthread_t));
for (int i = 0; i < 5; i++) {
int *id = malloc(sizeof(int));
*id = i + 1;
pthread_create(&workers[i], NULL, worker_thread, id);
}
printf("=== Dynamic Thread Pool ===\n");
printf("Commands:\n");
printf(" <task_count> <seconds> - Add tasks\n");
printf(" quit - Exit\n");
printf("Initial workers: %d, Buffer size: %d\n\n", num_workers, buf_size);
char line[256];
while (1) {
printf("> ");
fflush(stdout);
if (!fgets(line, sizeof(line), stdin)) break;
if (strncmp(line, "quit", 4) == 0) break;
int task_count, seconds;
if (sscanf(line, "%d %d", &task_count, &seconds) == 2) {
printf("Adding %d tasks, each %d seconds...\n", task_count, seconds);
for (int i = 0; i < task_count; i++) {
/* Check if buffer is full - double workers */
int slots_avail;
sem_getvalue(&buf.slots, &slots_avail);
if (slots_avail == 0) {
target_workers = num_workers * 2;
printf("Buffer full, doubling workers to %d\n", target_workers);
adjust_workers();
}
sbuf_insert(&buf, seconds);
/* Check if buffer is empty - halve workers */
int items_avail;
sem_getvalue(&buf.items, &items_avail);
if (items_avail == 0 && num_workers > 2) {
target_workers = num_workers / 2;
if (target_workers < 2) target_workers = 2;
printf("Buffer empty, halving workers to %d\n", target_workers);
adjust_workers();
}
}
} else {
printf("Invalid command. Usage: <task_count> <seconds>\n");
}
}
/* Shutdown */
running = 0;
target_workers = 0;
adjust_workers();
sbuf_deinit(&buf);
free(workers);
printf("Done.\n");
return 0;
}