Skip to content
Snippets Groups Projects
mergesort.c 7.34 KiB
Newer Older
  • Learn to ignore specific revisions
  • spruett3's avatar
    spruett3 committed
    /*
     * Parallel Mergesort.
     *
     * Demo application that shows how one might use threadpools/futures
     * in an application.
     *
     * Requires threadpool.c/threadpool.h
     *
     * Written by Godmar Back gback@cs.vt.edu for CS3214 Fall 2014.
    
    gback's avatar
    gback committed
     * Updated Summer 2020
    
    spruett3's avatar
    spruett3 committed
     */
    
    #include <stdlib.h>
    #include <stdbool.h>
    #include <stdint.h>
    #include <stdio.h>
    #include <string.h>
    #include <unistd.h>
    #include <sys/time.h>
    #include <sys/resource.h>
    #include <pthread.h>
    #include <getopt.h>
    
    /* When to switch from parallel to serial */
    #define SERIAL_MERGE_SORT_THRESHOLD    1000
    static int min_task_size = SERIAL_MERGE_SORT_THRESHOLD;
    
    #define INSERTION_SORT_THRESHOLD    16
    static int insertion_sort_threshold = INSERTION_SORT_THRESHOLD;
    
    #include "threadpool.h"
    #include "threadpool_lib.h"
    #define DEFAULT_THREADS 4
    static int nthreads = DEFAULT_THREADS;
    
    typedef void (*sort_func)(int *, int);
    
    /* Return true if array 'a' is sorted. */
    static bool
    check_sorted(int a[], int n) 
    {
        int i;
        for (i = 0; i < n-1; i++)
            if (a[i] > a[i+1])
                return false;
        return true;
    }
    
    /* ------------------------------------------------------------- 
     * Built-in qsort.
     */
    static int cmp_int(const void *a, const void *b)
    {
        return *(int *)a - *(int *)b;
    }
    
    static void builtin_qsort(int *a, int N)
    {
        qsort(a, N, sizeof(int), cmp_int);
    }
    
    /* ------------------------------------------------------------- 
     * Utilities: insertion sort.
     */
    static void insertionsort(int *a, int lo, int hi) 
    {
        int i;
        for (i = lo+1; i <= hi; i++) {
            int j = i;
            int t = a[j];
            while (j > lo && t < a[j - 1]) {
                a[j] = a[j - 1];
                --j;
            }
            a[j] = t;
        }
    }
    
    static void
    merge(int * a, int * b, int bstart, int left, int m, int right)
    {
        if (a[m] <= a[m+1])
            return;
    
        memcpy(b + bstart, a + left, (m - left + 1) * sizeof (a[0]));
        int i = bstart;
        int j = m + 1;
        int k = left;
    
        while (k < j && j <= right) {
            if (b[i] < a[j])
                a[k++] = b[i++];
            else
                a[k++] = a[j++];
        }
        memcpy(a + k, b + i, (j - k) * sizeof (a[0]));
    }
    
    /* ------------------------------------------------------------- 
     * Serial implementation.
     */
    static void
    mergesort_internal(int * array, int * tmp, int left, int right)
    {
        if (right - left < insertion_sort_threshold) {
            insertionsort(array, left, right);
        } else {
            int m = (left + right) / 2;
            mergesort_internal(array, tmp, left, m);
            mergesort_internal(array, tmp, m + 1, right);
            merge(array, tmp, 0, left, m, right);
        }
    }
    
    static void
    mergesort_serial(int * array, int n)
    {
        if (n < insertion_sort_threshold) {
            insertionsort(array, 0, n);
        } else {
            int * tmp = malloc(sizeof(int) * (n / 2 + 1));
            mergesort_internal(array, tmp, 0, n-1);
            free (tmp);
        }
    }
    
    /* ------------------------------------------------------------- 
     * Parallel implementation.
     */
    
    /* msort_task describes a unit of parallel work */
    struct msort_task {
        int *array;
        int *tmp;
        int left, right;
    }; 
    
    /* Parallel mergesort */
    static void  
    mergesort_internal_parallel(struct thread_pool * threadpool, struct msort_task * s)
    {
        int * array = s->array;
        int * tmp = s->tmp;
        int left = s->left;
        int right = s->right;
    
        if (right - left <= min_task_size) {
            mergesort_internal(array, tmp + left, left, right);
        } else {
            int m = (left + right) / 2;
    
            struct msort_task mleft = {
                .left = left,
                .right = m,
                .array = array,
                .tmp = tmp
            };
            struct future * lhalf = thread_pool_submit(threadpool, 
                                       (fork_join_task_t) mergesort_internal_parallel,  
                                       &mleft);
    
            struct msort_task mright = {
                .left = m + 1,
                .right = right,
                .array = array,
                .tmp = tmp
            };
            mergesort_internal_parallel(threadpool, &mright);
            future_get(lhalf);
            future_free(lhalf);
            merge(array, tmp, left, left, m, right);
        }
    }
    
    static void 
    mergesort_parallel(int *array, int N) 
    {
        int * tmp = malloc(sizeof(int) * (N));
        struct msort_task root = {
            .left = 0, .right = N-1, .array = array, .tmp = tmp
        };
    
        struct thread_pool * threadpool = thread_pool_new(nthreads);
    
    gback's avatar
    gback committed
        struct future * top = thread_pool_submit(threadpool,
                                                 (fork_join_task_t) mergesort_internal_parallel,
                                                 &root);
        future_get(top);
        future_free(top);
    
    spruett3's avatar
    spruett3 committed
        thread_pool_shutdown_and_destroy(threadpool);
        free (tmp);
    }
    
    /*
     * Benchmark one run of sort_func sorter
     */
    static void 
    benchmark(const char *benchmark_name, sort_func sorter, int *a0, int N, bool report)
    {
        int *a = malloc(N * sizeof(int));
        memcpy(a, a0, N * sizeof(int));
    
        struct benchmark_data * bdata = start_benchmark();
    
        // parallel section here, including thread pool startup and shutdown
        sorter(a, N);
    
        stop_benchmark(bdata);
    
        // consistency check
        if (!check_sorted(a, N)) {
            fprintf(stderr, "Sort failed\n");
            abort();
        }
    
        // report only if successful
        if (report) {
            report_benchmark_results(bdata);
        }
    
        printf("%s result ok. Timings follow\n", benchmark_name);
        report_benchmark_results_to_human(stdout, bdata);
    
        free(bdata);
        free(a);
    }
    
    static void
    usage(char *av0, int exvalue)
    {
        fprintf(stderr, "Usage: %s [-i <n>] [-n <n>] [-b] [-q] [-s <n>] <N>\n"
                        " -i        insertion sort threshold, default %d\n"
                        " -m        minimum task size before using serial mergesort, default %d\n"
                        " -n        number of threads in pool, default %d\n"
                        " -b        run built-in qsort\n"
                        " -s        specify srand() seed\n"
                        " -q        also run serial mergesort\n"
                        , av0, INSERTION_SORT_THRESHOLD, SERIAL_MERGE_SORT_THRESHOLD, DEFAULT_THREADS);
        exit(exvalue);
    }
    
    int 
    main(int ac, char *av[]) 
    {
        int c;
        bool run_builtin_qsort = false;
        bool run_serial_msort = false;
    
        while ((c = getopt(ac, av, "i:n:bhs:qm:")) != EOF) {
            switch (c) {
            case 'i':
                insertion_sort_threshold = atoi(optarg);
                break;
            case 'm':
                min_task_size = atoi(optarg);
                break;
            case 'n':
                nthreads = atoi(optarg);
                break;
            case 's':
                srand(atoi(optarg));
                break;
            case 'b':
                run_builtin_qsort = true;
                break;
            case 'q':
                run_serial_msort = true;
                break;
            case 'h':
                usage(av[0], EXIT_SUCCESS);
            }
        }
        if (optind == ac)
            usage(av[0], EXIT_FAILURE);
    
        int N = atoi(av[optind]);
    
        int i, * a0 = malloc(N * sizeof(int));
        for (i = 0; i < N; i++)
            a0[i] = random();
    
        if (run_builtin_qsort)
            benchmark("Built-in qsort", builtin_qsort, a0, N, false);
    
        if (run_serial_msort)
            benchmark("mergesort serial", mergesort_serial, a0, N, false);
    
        printf("Using %d threads, parallel/serials threshold=%d insertion sort threshold=%d\n", 
            nthreads, min_task_size, insertion_sort_threshold);
        benchmark("mergesort parallel", mergesort_parallel, a0, N, true);
    
        return EXIT_SUCCESS;
    }