#include "threadpool.h" #include "list.h" #include <stdlib.h> #include <stdio.h> #include <assert.h> #include <pthread.h> struct thread_pool { pthread_mutex_t lock; pthread_cond_t cond; struct list global_queue; bool shutdown; int threads; }; struct future { fork_join_task_t task; void* args; void* result; struct list_elem elem; }; struct worker { struct thread_pool* pool; pthread_t internal; }; static __thread struct worker* current_worker = NULL; struct thread_pool * thread_pool_new(int nthreads) { struct thread_pool *threadpool = malloc(sizeof(struct thread_pool)); threadpool->threads = nthreads; pthread_cond_init(&threadpool->cond, NULL); pthread_mutex_init(&threadpool->lock, NULL); list_init(&threadpool->global_queue); for (int i = 0; i < nthreads; i++) { pool->workers[i].pool = pool; pool->workers[i].index = i; list_init(&pool->workers[i].deque); pthread_create(&pool->workers[i].internal, NULL, start_routine, &pool->workers[i]); } return threadpool; } void thread_pool_shutdown_and_destroy(struct thread_pool *threadpool) { threadpool->shutdown = true; return; } void * future_get(struct future *future) { pthread_mutex_unlock(&pool->lock); future->result = future->task(pool, future->args); pthread_mutex_lock(&pool->lock); return future; } void future_free(struct future *future) { free(future); } struct future * thread_pool_submit(struct thread_pool *pool, fork_join_task_t task, void * data) { struct future *future = malloc(sizeof(struct future)); future->args = data; future->task = task; list_push_back(&pool->global_queue, &future->elem); pthread_cond_signal(&pool->cond); pthread_mutex_unlock(&pool->lock); free(pool); return future; }