Skip to content
Snippets Groups Projects
Commit d398e1f3 authored by Max Barrett's avatar Max Barrett
Browse files

work stealing done with cpu pinning

parent a751c037
No related branches found
No related tags found
No related merge requests found
#define _GNU_SOURCE
#include "threadpool.h" #include "threadpool.h"
#include "list.h" #include "list.h"
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include <assert.h> #include <assert.h>
#include <pthread.h> #include <pthread.h>
#include <sched.h>
#include <unistd.h>
typedef enum { typedef enum {
NOT_STARTED, NOT_STARTED,
...@@ -19,6 +23,7 @@ struct thread_pool { ...@@ -19,6 +23,7 @@ struct thread_pool {
int threads; int threads;
struct worker *workers; struct worker *workers;
pthread_barrier_t sync; pthread_barrier_t sync;
char padding[50];
}; };
struct future { struct future {
...@@ -29,52 +34,106 @@ struct future { ...@@ -29,52 +34,106 @@ struct future {
struct thread_pool * threadpool; struct thread_pool * threadpool;
state status; state status;
pthread_cond_t cond; pthread_cond_t cond;
char padding[50];
}; };
struct worker { struct worker {
struct thread_pool* worker_threadpool; struct thread_pool* worker_threadpool;
pthread_t internal; pthread_t internal;
struct list worker_queue;
char padding[50];
}; };
static __thread struct worker *future_worker;
static void *start_routine(void *arg); static void *start_routine(void *arg);
static void *run_future(struct thread_pool *threadpool);
static bool check_for_futures(struct thread_pool *threadpool);
static void *start_routine(void *arg) { static void *start_routine(void *arg) {
//get the worker and associated threadpool //get the worker and associated threadpool
struct worker *worker = (struct worker*)arg; struct worker *worker = (struct worker*) arg;
struct thread_pool *pool = worker->worker_threadpool; struct thread_pool *threadpool = worker->worker_threadpool;
//CPU pinning
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
int num_cores = sysconf(_SC_NPROCESSORS_ONLN);
for (int i = 0; i < threadpool->threads; i++) {
CPU_SET(i % num_cores, &cpuset);
}
pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
//use barrier to make locks easier //use barrier to make locks easier
pthread_barrier_wait(&pool->sync); pthread_barrier_wait(&threadpool->sync);
//set up the thread local worker
for (int i = 0; i < threadpool->threads; i++) {
if (pthread_self() == threadpool->workers[i].internal) {
future_worker = &threadpool->workers[i];
break;
}
}
//make worker continue to loop //make worker continue to loop
while (1) { while (1) {
pthread_mutex_lock(&pool->lock); pthread_mutex_lock(&threadpool->lock);
//if global queue is empty wait //if global queue is empty wait
while (list_empty(&pool->global_queue) && !pool->shutdown) { while (!check_for_futures(threadpool) && !threadpool->shutdown) {
pthread_cond_wait(&pool->cond, &pool->lock); pthread_cond_wait(&threadpool->cond, &threadpool->lock);
} }
//if shutdown unlock and exit //if shutdown unlock and exit
if (pool->shutdown) { if (threadpool->shutdown) {
pthread_mutex_unlock(&pool->lock); pthread_mutex_unlock(&threadpool->lock);
pthread_exit(NULL); pthread_exit(NULL);
return NULL;
} }
//remove future from global list and run it //remove future from global list and run it
struct list_elem *elem = list_pop_front(&pool->global_queue); run_future(threadpool);
struct future *future = list_entry(elem, struct future, elem); pthread_mutex_unlock(&threadpool->lock);
future->status = WORKING;
pthread_mutex_unlock(&pool->lock);
future->result = future->task(pool, future->args);
pthread_mutex_lock(&pool->lock);
future->status = DONE;
pthread_cond_signal(&future->cond);
pthread_mutex_unlock(&pool->lock);
} }
return NULL; return NULL;
} }
static void *run_future(struct thread_pool *threadpool) {
struct future *curr_future = NULL;
if (!list_empty(&future_worker->worker_queue)) {
//first check worker queue
curr_future = list_entry(list_pop_front(&future_worker->worker_queue), struct future, elem);
}
else if (!list_empty(&threadpool->global_queue)) {
//if worker queue is empty check global queue
curr_future = list_entry(list_pop_front(&threadpool->global_queue), struct future, elem);
}
else {
//if global and worker queue are empty start looking to steal a future
for (int i = 0; i < threadpool->threads; i++) {
if (!list_empty(&threadpool->workers[i].worker_queue)) {
curr_future = list_entry(list_pop_back(&threadpool->workers[i].worker_queue), struct future, elem);
break;
}
}
}
curr_future->status = WORKING;
pthread_mutex_unlock(&threadpool->lock);
curr_future->result = curr_future->task(threadpool, curr_future->args);
pthread_mutex_lock(&threadpool->lock);
curr_future->status = DONE;
pthread_cond_signal(&curr_future->cond);
return NULL;
}
static bool check_for_futures(struct thread_pool *threadpool) {
for (int i = 0; i < threadpool->threads; i++) {
if (!list_empty(&threadpool->workers[i].worker_queue)) {
return true;
}
}
return !list_empty(&threadpool->global_queue);
}
struct thread_pool * thread_pool_new(int nthreads) { struct thread_pool * thread_pool_new(int nthreads) {
//create new threadpool and init varibles //create new threadpool and init varibles
struct thread_pool *threadpool = (struct thread_pool *) malloc(sizeof(struct thread_pool)); struct thread_pool *threadpool = (struct thread_pool *) malloc(sizeof(struct thread_pool));
pthread_mutex_init(&threadpool->lock, NULL); pthread_mutex_init(&threadpool->lock, NULL);
pthread_cond_init(&threadpool->cond, NULL); pthread_cond_init(&threadpool->cond, NULL);
pthread_mutex_lock(&threadpool->lock);
list_init(&threadpool->global_queue); list_init(&threadpool->global_queue);
threadpool->shutdown = false; threadpool->shutdown = false;
threadpool->threads = nthreads; threadpool->threads = nthreads;
...@@ -82,9 +141,11 @@ struct thread_pool * thread_pool_new(int nthreads) { ...@@ -82,9 +141,11 @@ struct thread_pool * thread_pool_new(int nthreads) {
//create threads and wait until they are all made //create threads and wait until they are all made
pthread_barrier_init(&threadpool->sync, NULL, nthreads + 1); pthread_barrier_init(&threadpool->sync, NULL, nthreads + 1);
for (int i = 0; i < nthreads; i++) { for (int i = 0; i < nthreads; i++) {
list_init(&threadpool->workers[i].worker_queue);
threadpool->workers[i].worker_threadpool = threadpool; threadpool->workers[i].worker_threadpool = threadpool;
pthread_create(&threadpool->workers[i].internal, NULL, start_routine, &threadpool->workers[i]); pthread_create(&threadpool->workers[i].internal, NULL, start_routine, &threadpool->workers[i]);
} }
pthread_mutex_unlock(&threadpool->lock);
pthread_barrier_wait(&threadpool->sync); pthread_barrier_wait(&threadpool->sync);
return threadpool; return threadpool;
} }
...@@ -100,8 +161,10 @@ void thread_pool_shutdown_and_destroy(struct thread_pool *threadpool) { ...@@ -100,8 +161,10 @@ void thread_pool_shutdown_and_destroy(struct thread_pool *threadpool) {
pthread_join(threadpool->workers[i].internal, NULL); pthread_join(threadpool->workers[i].internal, NULL);
} }
//clean up memory //clean up memory
free(future_worker);
pthread_mutex_destroy(&threadpool->lock); pthread_mutex_destroy(&threadpool->lock);
pthread_cond_destroy(&threadpool->cond); pthread_cond_destroy(&threadpool->cond);
pthread_barrier_destroy(&threadpool->sync);
free(threadpool->workers); free(threadpool->workers);
free(threadpool); free(threadpool);
} }
...@@ -116,7 +179,7 @@ void * future_get(struct future *future) { ...@@ -116,7 +179,7 @@ void * future_get(struct future *future) {
future->result = future->task(future->threadpool, future->args); future->result = future->task(future->threadpool, future->args);
pthread_mutex_lock(&future->threadpool->lock); pthread_mutex_lock(&future->threadpool->lock);
future->status = DONE; future->status = DONE;
} else { // if it has been started wait for it to finish and get result } else { //if the task is still being completed wait for it
while (future->status != DONE) { while (future->status != DONE) {
pthread_cond_wait(&future->cond, &future->threadpool->lock); pthread_cond_wait(&future->cond, &future->threadpool->lock);
} }
...@@ -126,10 +189,10 @@ void * future_get(struct future *future) { ...@@ -126,10 +189,10 @@ void * future_get(struct future *future) {
return future->result; return future->result;
} }
void future_free(struct future *future) { void future_free(struct future *future) {
free(future); free(future);
} }
struct future * thread_pool_submit(struct thread_pool *pool, fork_join_task_t task, void *data) { struct future * thread_pool_submit(struct thread_pool *pool, fork_join_task_t task, void *data) {
//create future and set values //create future and set values
struct future *future = malloc(sizeof(struct future)); struct future *future = malloc(sizeof(struct future));
...@@ -140,10 +203,15 @@ struct future * thread_pool_submit(struct thread_pool *pool, fork_join_task_t ta ...@@ -140,10 +203,15 @@ struct future * thread_pool_submit(struct thread_pool *pool, fork_join_task_t ta
future->status = NOT_STARTED; future->status = NOT_STARTED;
pthread_cond_init(&future->cond, NULL); pthread_cond_init(&future->cond, NULL);
pthread_mutex_lock(&pool->lock); pthread_mutex_lock(&pool->lock);
//add to global queue //if there is an associated worker then it is an interal submission and it goes to the worker queue
list_push_back(&pool->global_queue, &future->elem); if (future_worker != NULL) {
list_push_front(&future_worker->worker_queue, &future->elem);
} else {
//if there is no worker then it gets added to its global queue
list_push_back(&pool->global_queue, &future->elem);
}
//signal to the threads that a future has been added to the queue //signal to the threads that a future has been added to the queue
pthread_cond_signal(&pool->cond); pthread_cond_signal(&pool->cond);
pthread_mutex_unlock(&pool->lock); pthread_mutex_unlock(&pool->lock);
return future; return future;
} }
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment