Skip to content
Snippets Groups Projects
threadpool.c 7.28 KiB
Newer Older
#define _GNU_SOURCE
Max Barrett's avatar
Max Barrett committed
#include "threadpool.h"
#include "list.h"
#include <stdlib.h>
#include <stdio.h>
#include <assert.h>
#include <sched.h>
#include <unistd.h>

Max Barrett's avatar
Max Barrett committed
typedef enum {
    NOT_STARTED,
    WORKING,
    DONE,
Max Barrett's avatar
Max Barrett committed

struct thread_pool {
    pthread_mutex_t lock;
    pthread_cond_t cond;
Max Barrett's avatar
Max Barrett committed
    struct list global_queue;
    bool shutdown;
    struct worker *workers;
Max Barrett's avatar
Max Barrett committed
    pthread_barrier_t sync;
    char padding[50];
Max Barrett's avatar
Max Barrett committed
};

struct future {
    fork_join_task_t task;
    void* args;
    void* result;
Max Barrett's avatar
Max Barrett committed
    struct thread_pool * threadpool;
    state status;
    pthread_cond_t cond;
    char padding[50];
    struct thread_pool* worker_threadpool;
    struct list worker_queue;
    char padding[50];
Max Barrett's avatar
Max Barrett committed
};

static __thread struct worker *future_worker;

static void *run_future(struct thread_pool *threadpool);
static bool check_for_futures(struct thread_pool *threadpool);
Max Barrett's avatar
Max Barrett committed

static void *start_routine(void *arg) {
    //get the worker and associated threadpool
    struct worker *worker = (struct worker*) arg;
    struct thread_pool *threadpool = worker->worker_threadpool;
    //CPU pinning
    cpu_set_t cpuset;
    CPU_ZERO(&cpuset);
    int num_cores = sysconf(_SC_NPROCESSORS_ONLN);
    for (int i = 0; i < threadpool->threads; i++) { 
        CPU_SET(i % num_cores, &cpuset);
    }
    pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
    pthread_barrier_wait(&threadpool->sync);
    //set up the thread local worker
    for (int i = 0; i < threadpool->threads; i++) {
        if (pthread_self() == threadpool->workers[i].internal) {
            future_worker = &threadpool->workers[i];
            break;
        }
    }
Max Barrett's avatar
Max Barrett committed
    while (1) {
        pthread_mutex_lock(&threadpool->lock);
        while (!check_for_futures(threadpool) && !threadpool->shutdown) {
            pthread_cond_wait(&threadpool->cond, &threadpool->lock);
        if (threadpool->shutdown) {
            pthread_mutex_unlock(&threadpool->lock);
            return NULL;
Max Barrett's avatar
Max Barrett committed
        }
        //remove future from global list and run it
        run_future(threadpool);
        pthread_mutex_unlock(&threadpool->lock);
static void *run_future(struct thread_pool *threadpool) {
    struct future *curr_future = NULL;
    if (!list_empty(&future_worker->worker_queue)) {
        //first check worker queue
        curr_future = list_entry(list_pop_front(&future_worker->worker_queue), struct future, elem);
    }
    else if (!list_empty(&threadpool->global_queue)) {
        //if worker queue is empty check global queue
        curr_future = list_entry(list_pop_front(&threadpool->global_queue), struct future, elem);
    }
    else {
        //if global and worker queue are empty start looking to steal a future
        for (int i = 0; i < threadpool->threads; i++) {
            if (!list_empty(&threadpool->workers[i].worker_queue)) {
                curr_future = list_entry(list_pop_back(&threadpool->workers[i].worker_queue), struct future, elem);
                break;
            }
        }
    }
    curr_future->status = WORKING;
    pthread_mutex_unlock(&threadpool->lock);
    curr_future->result = curr_future->task(threadpool, curr_future->args);
    pthread_mutex_lock(&threadpool->lock);
    curr_future->status = DONE;
    pthread_cond_signal(&curr_future->cond);
    return NULL;
}

static bool check_for_futures(struct thread_pool *threadpool) {
    for (int i = 0; i < threadpool->threads; i++) {
        if (!list_empty(&threadpool->workers[i].worker_queue)) {
            return true;
        }
    }
    return !list_empty(&threadpool->global_queue);
}

Max Barrett's avatar
Max Barrett committed
struct thread_pool * thread_pool_new(int nthreads) {
    //create new threadpool and init varibles
    struct thread_pool *threadpool = (struct thread_pool *) malloc(sizeof(struct thread_pool));
    pthread_mutex_init(&threadpool->lock, NULL);
    pthread_cond_init(&threadpool->cond, NULL);
    pthread_mutex_lock(&threadpool->lock);
    list_init(&threadpool->global_queue);
Max Barrett's avatar
Max Barrett committed
    threadpool->shutdown = false;
    threadpool->threads = nthreads;
    threadpool->workers = (struct worker *) malloc(sizeof(struct worker) * nthreads);
    //create threads and wait until they are all made
    pthread_barrier_init(&threadpool->sync, NULL, nthreads + 1);
    for (int i = 0; i < nthreads; i++) {
        list_init(&threadpool->workers[i].worker_queue);
        threadpool->workers[i].worker_threadpool = threadpool;
        pthread_create(&threadpool->workers[i].internal, NULL, start_routine, &threadpool->workers[i]);
    pthread_mutex_unlock(&threadpool->lock);
Max Barrett's avatar
Max Barrett committed
    pthread_barrier_wait(&threadpool->sync);
Max Barrett's avatar
Max Barrett committed
}

void thread_pool_shutdown_and_destroy(struct thread_pool *threadpool) {
    pthread_mutex_lock(&threadpool->lock);
    threadpool->shutdown = true;
    //tell all threads to check their shutdown flags
    pthread_cond_broadcast(&threadpool->cond);
    pthread_mutex_unlock(&threadpool->lock);
    for (int i = 0; i < threadpool->threads; i++) {
        pthread_join(threadpool->workers[i].internal, NULL);
    }
    free(future_worker);
    pthread_mutex_destroy(&threadpool->lock);
    pthread_cond_destroy(&threadpool->cond);
    pthread_barrier_destroy(&threadpool->sync);
    free(threadpool->workers);
    free(threadpool);
Max Barrett's avatar
Max Barrett committed
}

void * future_get(struct future *future) {
Max Barrett's avatar
Max Barrett committed
    pthread_mutex_lock(&future->threadpool->lock);
    //if the future has not started make sure to start it and get result
    if (future->status == NOT_STARTED) {
        list_remove(&future->elem);
        future->status = WORKING;
        pthread_mutex_unlock(&future->threadpool->lock);
        future->result = future->task(future->threadpool, future->args);
        pthread_mutex_lock(&future->threadpool->lock);
        future->status = DONE;
    } else { //if the task is still being completed wait for it
        while (future->status != DONE) {
            pthread_cond_wait(&future->cond, &future->threadpool->lock);
        }
    }
    pthread_mutex_unlock(&future->threadpool->lock);
Max Barrett's avatar
Max Barrett committed
}
Max Barrett's avatar
Max Barrett committed
void future_free(struct future *future) {
Max Barrett's avatar
Max Barrett committed
}
struct future * thread_pool_submit(struct thread_pool *pool, fork_join_task_t task, void *data) {
    //create future and set values
Max Barrett's avatar
Max Barrett committed
    struct future *future = malloc(sizeof(struct future));
    future->args = data;
    future->result = NULL;
    future->threadpool = pool;
    future->status = NOT_STARTED;
    pthread_cond_init(&future->cond, NULL);
    pthread_mutex_lock(&pool->lock);
    //if there is an associated worker then it is an interal submission and it goes to the worker queue
    if (future_worker != NULL) {
        list_push_front(&future_worker->worker_queue, &future->elem);
    } else { 
        //if there is no worker then it gets added to its global queue
        list_push_back(&pool->global_queue, &future->elem);
    }
    //signal to the threads that a future has been added to the queue
    pthread_cond_signal(&pool->cond);
    pthread_mutex_unlock(&pool->lock);
Max Barrett's avatar
Max Barrett committed
    return future;