#include #include #include #include #include #include typedef struct { int *buf; int n; int outpos; int inpos; sem_t mutex; sem_t slots; sem_t items; } sbuf_t; int num_workers = 5; int target_workers = 5; pthread_mutex_t worker_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_t *workers = NULL; sbuf_t *sbuf_ptr = NULL; volatile int running = 1; void sbuf_init(sbuf_t *sp, int n) { sp->buf = (int *)malloc(n * sizeof(int)); sp->n = n; sp->outpos = 0; sp->inpos = 0; sem_init(&sp->mutex, 0, 1); sem_init(&sp->slots, 0, n); sem_init(&sp->items, 0, 0); } void sbuf_deinit(sbuf_t *sp) { free(sp->buf); sem_destroy(&sp->mutex); sem_destroy(&sp->slots); sem_destroy(&sp->items); } void sbuf_insert(sbuf_t *sp, int item) { sem_wait(&sp->slots); sem_wait(&sp->mutex); sp->buf[sp->inpos] = item; sp->inpos = (sp->inpos + 1) % sp->n; sem_post(&sp->mutex); sem_post(&sp->items); } int sbuf_remove(sbuf_t *sp) { sem_wait(&sp->items); sem_wait(&sp->mutex); int item = sp->buf[sp->outpos]; sp->outpos = (sp->outpos + 1) % sp->n; sem_post(&sp->mutex); sem_post(&sp->slots); return item; } void *worker_thread(void *arg) { int id = *(int *)arg; free(arg); sbuf_t *sp = sbuf_ptr; while (running) { int seconds = sbuf_remove(sp); if (seconds == -1) break; printf("[Worker %d] executing task: sleep %d seconds\n", id, seconds); sleep(seconds); printf("[Worker %d] task completed\n", id); } return NULL; } void adjust_workers() { pthread_mutex_lock(&worker_mutex); int current = num_workers; int target = target_workers; if (target > current) { workers = realloc(workers, target * sizeof(pthread_t)); for (int i = current; i < target; i++) { int *id = malloc(sizeof(int)); *id = i + 1; pthread_create(&workers[i], NULL, worker_thread, id); } num_workers = target; } else if (target < current) { int to_remove = current - target; for (int i = 0; i < to_remove; i++) { sbuf_insert(sbuf_ptr, -1); } for (int i = target; i < current; i++) { pthread_join(workers[i], NULL); } num_workers = target; } pthread_mutex_unlock(&worker_mutex); } int main(int argc, char *argv[]) { if (argc != 2) { fprintf(stderr, "Usage: %s \n", argv[0]); return 1; } int buf_size = atoi(argv[1]); sbuf_t buf; sbuf_ptr = &buf; sbuf_init(&buf, buf_size); workers = malloc(5 * sizeof(pthread_t)); for (int i = 0; i < 5; i++) { int *id = malloc(sizeof(int)); *id = i + 1; pthread_create(&workers[i], NULL, worker_thread, id); } printf("=== Dynamic Thread Pool ===\n"); printf("Commands:\n"); printf(" - Add tasks\n"); printf(" quit - Exit\n"); printf("Initial workers: %d, Buffer size: %d\n\n", num_workers, buf_size); char line[256]; while (1) { printf("> "); fflush(stdout); if (!fgets(line, sizeof(line), stdin)) break; if (strncmp(line, "quit", 4) == 0) break; int task_count, seconds; if (sscanf(line, "%d %d", &task_count, &seconds) == 2) { printf("Adding %d tasks, each %d seconds...\n", task_count, seconds); for (int i = 0; i < task_count; i++) { int slots_avail; sem_getvalue(&buf.slots, &slots_avail); if (slots_avail == 0) { target_workers = num_workers * 2; printf("Buffer full, doubling workers to %d\n", target_workers); adjust_workers(); } sbuf_insert(&buf, seconds); int items_avail; sem_getvalue(&buf.items, &items_avail); if (items_avail == 0 && num_workers > 2) { target_workers = num_workers / 2; if (target_workers < 2) target_workers = 2; printf("Buffer empty, halving workers to %d\n", target_workers); adjust_workers(); } } } else { printf("Invalid command. Usage: \n"); } } running = 0; target_workers = 0; adjust_workers(); sbuf_deinit(&buf); free(workers); printf("Done.\n"); return 0; }