Newer
Older
#include "threadpool.h"
#include "list.h"
#include <stdlib.h>
#include <stdio.h>
#include <assert.h>
#include <pthread.h>
#include <sched.h>
#include <unistd.h>
Max Barrett
committed
} state;
pthread_mutex_t lock;
pthread_cond_t cond;
struct worker *workers;
};
struct future {
fork_join_task_t task;
void* args;
void* result;
struct list_elem elem;
struct thread_pool * threadpool;
state status;
pthread_cond_t cond;
};
struct worker {
struct thread_pool* worker_threadpool;
pthread_t internal;
struct list worker_queue;
char padding[50];
static __thread struct worker *future_worker;
Max Barrett
committed
static void *start_routine(void *arg);
static void *run_future(struct thread_pool *threadpool);
static bool check_for_futures(struct thread_pool *threadpool);
Max Barrett
committed
static void *start_routine(void *arg) {
//get the worker and associated threadpool
struct worker *worker = (struct worker*) arg;
struct thread_pool *threadpool = worker->worker_threadpool;
//CPU pinning
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
int num_cores = sysconf(_SC_NPROCESSORS_ONLN);
for (int i = 0; i < threadpool->threads; i++) {
CPU_SET(i % num_cores, &cpuset);
}
pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
Max Barrett
committed
//use barrier to make locks easier
pthread_barrier_wait(&threadpool->sync);
//set up the thread local worker
for (int i = 0; i < threadpool->threads; i++) {
if (pthread_self() == threadpool->workers[i].internal) {
future_worker = &threadpool->workers[i];
break;
}
}
Max Barrett
committed
//make worker continue to loop
Max Barrett
committed
//if global queue is empty wait
while (!check_for_futures(threadpool) && !threadpool->shutdown) {
pthread_cond_wait(&threadpool->cond, &threadpool->lock);
Max Barrett
committed
//if shutdown unlock and exit
if (threadpool->shutdown) {
pthread_mutex_unlock(&threadpool->lock);
Max Barrett
committed
pthread_exit(NULL);
Max Barrett
committed
//remove future from global list and run it
run_future(threadpool);
pthread_mutex_unlock(&threadpool->lock);
}
return NULL;
}
Max Barrett
committed
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
static void *run_future(struct thread_pool *threadpool) {
struct future *curr_future = NULL;
if (!list_empty(&future_worker->worker_queue)) {
//first check worker queue
curr_future = list_entry(list_pop_front(&future_worker->worker_queue), struct future, elem);
}
else if (!list_empty(&threadpool->global_queue)) {
//if worker queue is empty check global queue
curr_future = list_entry(list_pop_front(&threadpool->global_queue), struct future, elem);
}
else {
//if global and worker queue are empty start looking to steal a future
for (int i = 0; i < threadpool->threads; i++) {
if (!list_empty(&threadpool->workers[i].worker_queue)) {
curr_future = list_entry(list_pop_back(&threadpool->workers[i].worker_queue), struct future, elem);
break;
}
}
}
curr_future->status = WORKING;
pthread_mutex_unlock(&threadpool->lock);
curr_future->result = curr_future->task(threadpool, curr_future->args);
pthread_mutex_lock(&threadpool->lock);
curr_future->status = DONE;
pthread_cond_signal(&curr_future->cond);
return NULL;
}
static bool check_for_futures(struct thread_pool *threadpool) {
for (int i = 0; i < threadpool->threads; i++) {
if (!list_empty(&threadpool->workers[i].worker_queue)) {
return true;
}
}
return !list_empty(&threadpool->global_queue);
}
Max Barrett
committed
//create new threadpool and init varibles
struct thread_pool *threadpool = (struct thread_pool *) malloc(sizeof(struct thread_pool));
pthread_mutex_init(&threadpool->lock, NULL);
Max Barrett
committed
pthread_cond_init(&threadpool->cond, NULL);
pthread_mutex_lock(&threadpool->lock);
list_init(&threadpool->global_queue);
Max Barrett
committed
threadpool->threads = nthreads;
threadpool->workers = (struct worker *) malloc(sizeof(struct worker) * nthreads);
//create threads and wait until they are all made
pthread_barrier_init(&threadpool->sync, NULL, nthreads + 1);
for (int i = 0; i < nthreads; i++) {
list_init(&threadpool->workers[i].worker_queue);
threadpool->workers[i].worker_threadpool = threadpool;
pthread_create(&threadpool->workers[i].internal, NULL, start_routine, &threadpool->workers[i]);
pthread_mutex_unlock(&threadpool->lock);
return threadpool;
}
void thread_pool_shutdown_and_destroy(struct thread_pool *threadpool) {
pthread_mutex_lock(&threadpool->lock);
threadpool->shutdown = true;
Max Barrett
committed
//tell all threads to check their shutdown flags
pthread_cond_broadcast(&threadpool->cond);
pthread_mutex_unlock(&threadpool->lock);
Max Barrett
committed
//join all threads
for (int i = 0; i < threadpool->threads; i++) {
pthread_join(threadpool->workers[i].internal, NULL);
}
Max Barrett
committed
//clean up memory
Max Barrett
committed
pthread_mutex_destroy(&threadpool->lock);
pthread_cond_destroy(&threadpool->cond);
pthread_barrier_destroy(&threadpool->sync);
Max Barrett
committed
free(threadpool->workers);
free(threadpool);
Max Barrett
committed
//if the future has not started make sure to start it and get result
if (future->status == NOT_STARTED) {
list_remove(&future->elem);
future->status = WORKING;
pthread_mutex_unlock(&future->threadpool->lock);
future->result = future->task(future->threadpool, future->args);
pthread_mutex_lock(&future->threadpool->lock);
future->status = DONE;
} else { //if the task is still being completed wait for it
Max Barrett
committed
while (future->status != DONE) {
pthread_cond_wait(&future->cond, &future->threadpool->lock);
}
}
pthread_mutex_unlock(&future->threadpool->lock);
Max Barrett
committed
//send result
return future->result;
Max Barrett
committed
free(future);
Max Barrett
committed
struct future * thread_pool_submit(struct thread_pool *pool, fork_join_task_t task, void *data) {
//create future and set values
future->task = task;
Max Barrett
committed
future->args = data;
future->result = NULL;
future->threadpool = pool;
future->status = NOT_STARTED;
pthread_cond_init(&future->cond, NULL);
pthread_mutex_lock(&pool->lock);
//if there is an associated worker then it is an interal submission and it goes to the worker queue
if (future_worker != NULL) {
list_push_front(&future_worker->worker_queue, &future->elem);
} else {
//if there is no worker then it gets added to its global queue
list_push_back(&pool->global_queue, &future->elem);
}
Max Barrett
committed
//signal to the threads that a future has been added to the queue
pthread_cond_signal(&pool->cond);
pthread_mutex_unlock(&pool->lock);