Skip to content
Snippets Groups Projects
threadpool.c 3.38 KiB
Newer Older
Max Barrett's avatar
Max Barrett committed
#include "threadpool.h"
#include "list.h"
#include <stdlib.h>
#include <stdio.h>
#include <assert.h>
Max Barrett's avatar
Max Barrett committed
typedef enum {
    NOT_STARTED,
    WORKING,
    DONE,
}state;
Max Barrett's avatar
Max Barrett committed

struct thread_pool {
    pthread_mutex_t lock;
    pthread_cond_t cond;
Max Barrett's avatar
Max Barrett committed
    struct list global_queue;
    bool shutdown;
    struct worker *workers;
Max Barrett's avatar
Max Barrett committed
    pthread_barrier_t sync;
Max Barrett's avatar
Max Barrett committed
};

struct future {
    fork_join_task_t task;
    void* args;
    void* result;
Max Barrett's avatar
Max Barrett committed
    struct thread_pool * threadpool;
    state status;
    pthread_cond_t cond;
    struct thread_pool* worker_threadpool;
Max Barrett's avatar
Max Barrett committed
};

Max Barrett's avatar
Max Barrett committed
static void *start_routine(void * arg);

static void *start_routine(void * arg) {
    struct worker *worker = (struct worker *) arg;
Max Barrett's avatar
Max Barrett committed
    pthread_barrier_wait(&worker->worker_threadpool->sync);
    while (1) {
        pthread_mutex_lock(&worker->worker_threadpool->lock);
        if (list_empty(&worker->worker_threadpool->global_queue)) {
            pthread_cond_wait(&worker->worker_threadpool->cond, &worker->worker_threadpool->lock);
        }
Max Barrett's avatar
Max Barrett committed
        if (&worker->worker_threadpool->shutdown) {
            break;
        }
        struct future *worker_future = list_entry(list_pop_front(&worker->worker_threadpool->global_queue), struct future, elem);
        pthread_mutex_unlock(&worker->worker_threadpool->lock);
        worker_future->result = worker_future->task(worker->worker_threadpool, worker_future->args);
Max Barrett's avatar
Max Barrett committed
        pthread_mutex_lock(&worker->worker_threadpool->lock);
    }
    pthread_mutex_unlock(&worker->worker_threadpool->lock);
Max Barrett's avatar
Max Barrett committed
    pthread_exit(NULL);
Max Barrett's avatar
Max Barrett committed
struct thread_pool * thread_pool_new(int nthreads) {
    struct thread_pool *threadpool = malloc(sizeof(struct thread_pool));
    threadpool->threads = nthreads;
    pthread_cond_init(&threadpool->cond, NULL);
    pthread_mutex_init(&threadpool->lock, NULL);
Max Barrett's avatar
Max Barrett committed
    pthread_barrier_init(&threadpool->sync, NULL, nthreads + 1);
    pthread_mutex_lock(&threadpool->lock);
    list_init(&threadpool->global_queue);
Max Barrett's avatar
Max Barrett committed
    threadpool->shutdown = false;
    threadpool->workers = malloc(sizeof(struct worker) * nthreads);
    for (int i = 0; i < nthreads; i++) {
        threadpool->workers[i].worker_threadpool = threadpool;
        pthread_create(&threadpool->workers[i].internal, NULL, start_routine, &threadpool->workers[i]);
Max Barrett's avatar
Max Barrett committed
    pthread_mutex_unlock(&threadpool->lock);
    pthread_barrier_wait(&threadpool->sync);
Max Barrett's avatar
Max Barrett committed
}

void thread_pool_shutdown_and_destroy(struct thread_pool *threadpool) {
    pthread_mutex_lock(&threadpool->lock);
    threadpool->shutdown = true;
    pthread_cond_broadcast(&threadpool->cond);
    pthread_mutex_unlock(&threadpool->lock);
    for (int i = 0; i < threadpool->threads; i++) {
        printf("%d\n", i);
        pthread_join(threadpool->workers[i].internal, NULL);
    }
Max Barrett's avatar
Max Barrett committed
}

void * future_get(struct future *future) {
Max Barrett's avatar
Max Barrett committed
    pthread_mutex_lock(&future->threadpool->lock);
    
Max Barrett's avatar
Max Barrett committed
    return future;
}

void future_free(struct future *future) {
Max Barrett's avatar
Max Barrett committed
}

struct future * thread_pool_submit(struct thread_pool *pool, fork_join_task_t task, void * data) {
    struct future *future = malloc(sizeof(struct future));
    future->args = data;
    future->task = task;
    list_push_back(&pool->global_queue, &future->elem);
Max Barrett's avatar
Max Barrett committed
    future->threadpool = pool;
    pthread_cond_signal(&pool->cond);
    pthread_mutex_unlock(&pool->lock);
Max Barrett's avatar
Max Barrett committed
    return future;
}