Skip to content
Snippets Groups Projects
Commit df9db51d authored by Max Barrett's avatar Max Barrett
Browse files

Check discord

parent c40c5d48
No related branches found
No related tags found
No related merge requests found
......@@ -5,6 +5,11 @@
#include <assert.h>
#include <pthread.h>
typedef enum {
NOT_STARTED,
WORKING,
DONE,
}state;
struct thread_pool {
pthread_mutex_t lock;
......@@ -13,6 +18,7 @@ struct thread_pool {
bool shutdown;
int threads;
struct worker *workers;
pthread_barrier_t sync;
};
struct future {
......@@ -20,6 +26,9 @@ struct future {
void* args;
void* result;
struct list_elem elem;
struct thread_pool * threadpool;
state status;
pthread_cond_t cond;
};
struct worker {
......@@ -27,18 +36,27 @@ struct worker {
pthread_t internal;
};
static void *start_routine(void * arg);
static void *start_routine(void * arg) {
struct worker *worker = (struct worker *) arg;
while (!worker->worker_threadpool->shutdown) {
pthread_barrier_wait(&worker->worker_threadpool->sync);
while (1) {
pthread_mutex_lock(&worker->worker_threadpool->lock);
struct future *worker_future = list_entry(list_pop_front(&worker->worker_threadpool->global_queue), struct future, elem);
if (list_empty(&worker->worker_threadpool->global_queue)) {
pthread_cond_wait(&worker->worker_threadpool->cond, &worker->worker_threadpool->lock);
}
if (&worker->worker_threadpool->shutdown) {
break;
}
struct future *worker_future = list_entry(list_pop_front(&worker->worker_threadpool->global_queue), struct future, elem);
pthread_mutex_unlock(&worker->worker_threadpool->lock);
worker_future->result = worker_future->task(worker->worker_threadpool, worker_future->args);
pthread_mutex_lock(&worker->worker_threadpool->lock);
}
pthread_mutex_unlock(&worker->worker_threadpool->lock);
pthread_exit(NULL);
return NULL;
}
struct thread_pool * thread_pool_new(int nthreads) {
......@@ -46,12 +64,17 @@ struct thread_pool * thread_pool_new(int nthreads) {
threadpool->threads = nthreads;
pthread_cond_init(&threadpool->cond, NULL);
pthread_mutex_init(&threadpool->lock, NULL);
pthread_barrier_init(&threadpool->sync, NULL, nthreads + 1);
pthread_mutex_lock(&threadpool->lock);
list_init(&threadpool->global_queue);
threadpool->shutdown = false;
threadpool->workers = malloc(sizeof(struct worker) * nthreads);
for (int i = 0; i < nthreads; i++) {
threadpool->workers[i].worker_threadpool = threadpool;
pthread_create(&threadpool->workers[i].internal, NULL, start_routine, &threadpool->workers[i]);
}
pthread_mutex_unlock(&threadpool->lock);
pthread_barrier_wait(&threadpool->sync);
return threadpool;
}
......@@ -67,6 +90,8 @@ void thread_pool_shutdown_and_destroy(struct thread_pool *threadpool) {
}
void * future_get(struct future *future) {
pthread_mutex_lock(&future->threadpool->lock);
return future;
}
......@@ -79,8 +104,8 @@ struct future * thread_pool_submit(struct thread_pool *pool, fork_join_task_t ta
future->args = data;
future->task = task;
list_push_back(&pool->global_queue, &future->elem);
future->threadpool = pool;
pthread_cond_signal(&pool->cond);
pthread_mutex_unlock(&pool->lock);
free(pool);
return future;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment