#include "threadpool.h" #include "list.h" #include <stdlib.h> #include <stdio.h> #include <assert.h> #include <pthread.h> typedef enum { NOT_STARTED, WORKING, DONE, }state; struct thread_pool { pthread_mutex_t lock; pthread_cond_t cond; struct list global_queue; bool shutdown; int threads; struct worker *workers; pthread_barrier_t sync; }; struct future { fork_join_task_t task; void* args; void* result; struct list_elem elem; struct thread_pool * threadpool; state status; pthread_cond_t cond; }; struct worker { struct thread_pool* worker_threadpool; pthread_t internal; }; static void *start_routine(void * arg); static void *start_routine(void * arg) { struct worker *worker = (struct worker *) arg; pthread_barrier_wait(&worker->worker_threadpool->sync); while (1) { pthread_mutex_lock(&worker->worker_threadpool->lock); if (list_empty(&worker->worker_threadpool->global_queue)) { pthread_cond_wait(&worker->worker_threadpool->cond, &worker->worker_threadpool->lock); } if (&worker->worker_threadpool->shutdown) { break; } struct future *worker_future = list_entry(list_pop_front(&worker->worker_threadpool->global_queue), struct future, elem); pthread_mutex_unlock(&worker->worker_threadpool->lock); worker_future->result = worker_future->task(worker->worker_threadpool, worker_future->args); pthread_mutex_lock(&worker->worker_threadpool->lock); } pthread_mutex_unlock(&worker->worker_threadpool->lock); pthread_exit(NULL); return NULL; } struct thread_pool * thread_pool_new(int nthreads) { struct thread_pool *threadpool = malloc(sizeof(struct thread_pool)); threadpool->threads = nthreads; pthread_cond_init(&threadpool->cond, NULL); pthread_mutex_init(&threadpool->lock, NULL); pthread_barrier_init(&threadpool->sync, NULL, nthreads + 1); pthread_mutex_lock(&threadpool->lock); list_init(&threadpool->global_queue); threadpool->shutdown = false; threadpool->workers = malloc(sizeof(struct worker) * nthreads); for (int i = 0; i < nthreads; i++) { threadpool->workers[i].worker_threadpool = threadpool; pthread_create(&threadpool->workers[i].internal, NULL, start_routine, &threadpool->workers[i]); } pthread_mutex_unlock(&threadpool->lock); pthread_barrier_wait(&threadpool->sync); return threadpool; } void thread_pool_shutdown_and_destroy(struct thread_pool *threadpool) { pthread_mutex_lock(&threadpool->lock); threadpool->shutdown = true; pthread_cond_broadcast(&threadpool->cond); pthread_mutex_unlock(&threadpool->lock); for (int i = 0; i < threadpool->threads; i++) { printf("%d\n", i); pthread_join(threadpool->workers[i].internal, NULL); } } void * future_get(struct future *future) { pthread_mutex_lock(&future->threadpool->lock); return future; } void future_free(struct future *future) { free(future); } struct future * thread_pool_submit(struct thread_pool *pool, fork_join_task_t task, void * data) { struct future *future = malloc(sizeof(struct future)); future->args = data; future->task = task; list_push_back(&pool->global_queue, &future->elem); future->threadpool = pool; pthread_cond_signal(&pool->cond); pthread_mutex_unlock(&pool->lock); return future; }