Skip to content
Snippets Groups Projects
fjdriver.py 16.5 KiB
Newer Older
  • Learn to ignore specific revisions
  • gback's avatar
    gback committed
    #!/usr/bin/python3
    
    
    #
    # fjdriver.py for Fork/Join pool projects
    #
    
    gback's avatar
    gback committed
    # Written by Godmar Back and Scott Pruett for CS 3214
    
    #
    # https://git-scm.com/docs/pretty-formats
    version = "$Format:%H committed by %cn$"
    
    #
    import getopt, sys, os, subprocess, signal, re, json, resource, time, socket
    from datetime import datetime
    from collections import namedtuple, defaultdict
    
    # add location of this script to python sys.path
    # add directory in which script is located to python path
    script_dir = "/".join(__file__.split("/")[:-1])
    if script_dir == "":
        script_dir = "."
    
    script_dir = os.path.realpath(script_dir)
    if script_dir not in sys.path:
        sys.path.append(script_dir)
    
    from fjtests import load_test_module
    
    
    src_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) + "/tests"
    
    results_file = "full-results.json"
    filelist = src_dir + "/FILELIST"
    timestamp = str(datetime.now()).replace(" ", "_")
    workdir = "fj_testdir_" + timestamp
    poolfile = "./threadpool.c"
    verbose = False
    silent = False
    list_tests = False
    grade_mode = False
    benchmark_runs = 1
    
    # Benchmark info
    
    # Times of serial runs for benchmarked tests
    
    gback's avatar
    gback committed
    # XXX these are out of date currently and ultimately not used.
    
    gback's avatar
    gback committed
        'nqueens 14':      17.158,
    
        'quicksort large': 19.81442,
        'mergesort large': 22.33417
    }
    
    tests = load_test_module('standard')
    
    #
    # getopt
    #
    # look for threadpool.c in current dir, or point at with flag
    #
    def usage():
    
    gback's avatar
    gback committed
        print ("""
    
    Usage: %s [options]
        -v              Verbose
        -V              Print Version and exit
        -a              Run benchmark anyway even if machine is not idle
        -r              Only run required tests.
        -h              Show help 
        -p              <file> - Location of threadpool implementation, default ./threadpool.c
        -l              List available tests
        -t              Filter test by name, given as a comma separated list.
                        e.g.: -t basic1,psum
    
    gback's avatar
    gback committed
        """ % (sys.argv[0]))
    
    
    try:
        opts, args = getopt.getopt(sys.argv[1:], "Varvhlp:t:o:B:gL", ["verbose", "help", "list-tests"])
    
    gback's avatar
    gback committed
    except getopt.GetoptError as err:
        print (str(err)) # will print something like "option -a not recognized"
    
        usage()
        sys.exit(2)
    
    runfilter = lambda test : True
    ignore_if_not_idle = False
    
    for opt, arg in opts: 
        if opt == "-r":
            oldrunfilter = runfilter
            runfilter = lambda test: test.is_required and oldrunfilter(test)
    
        elif opt == "-V":
    
    gback's avatar
    gback committed
            print ("Version", version)
    
            sys.exit(0)
        elif opt == "-a":
            ignore_if_not_idle = True
        elif opt == "-v":
            verbose = True
        elif opt in ("-h", "--help"):
            usage()
            sys.exit()
        elif opt == '-p':
            poolfile = arg
        elif opt == '-l':
            list_tests = True
        elif opt == '-o':
            results_file = arg
        elif opt == '-B':
            benchmark_runs = int(arg)
        elif opt == '-g':
            grade_mode = True
        elif opt == '-t':
            filtered = arg.split(',')
    
            for _filter in filtered:
    
                    if _filter == test.name:
    
                    print ('Unknown test: %s. Use -l to list test names.' % _filter)
    
                    usage()
                    sys.exit()
            oldrunfilter = runfilter
            runfilter = lambda test: test.name in filtered and oldrunfilter(test)
        else:
            assert False, "unhandled option"
    
    if list_tests:
    
    gback's avatar
    gback committed
        print ('Available tests (with applied filters):')
        print (80 * '=')
    
        for test in tests:
            if runfilter(test):
    
    gback's avatar
    gback committed
                print ('%s: %s' % (test.name, test.description))
    
        sys.exit()
    
    def copyfile(src, dst):
        cmd = "cp %s %s" % (src, dst)
        if verbose:
    
    gback's avatar
    gback committed
            print (cmd)
    
        ex = os.system(cmd)
        if ex:
            sys.exit(ex)
    
    def setup_working_directory():
        if verbose:
    
    gback's avatar
    gback committed
            print ("Creating working directory",  workdir)
    
    gback's avatar
    gback committed
            print ("Copying files")
    
    
        if not os.access(poolfile, os.R_OK):
    
    gback's avatar
    gback committed
            print ()
            print ("I cannot find %s" % poolfile)
    
            usage()
            sys.exit(2)
    
        copyfile(poolfile, workdir + "/threadpool.c")
    
        flist = open(filelist, 'r')
        for file in flist:
            if file.startswith("#"):
                continue
            file = file.strip()
            copyfile(src_dir + "/" + file, workdir)
    
        flist.close()
        if verbose:
    
    gback's avatar
    gback committed
            print ("Copying %s" % poolfile)
    
    gback's avatar
    gback committed
        if os.system("make -j 20"):
    
            if grade_mode:
                op = open(results_file, 'w')
                op.write(json.dumps({'error': 'did not compile'}))
                op.close()
            raise Exception("make failed, run 'make' in %s to see why" % workdir)
    
    def check_software_engineering(objfile, allowedsymbols):
        hex = "[0-9A-Fa-f]{8,16}"
        if verbose:
    
    gback's avatar
    gback committed
            print ("Performing some checks that %s conforms to accepted software engineering practice..." % objfile)
    
    
        symbols = subprocess.Popen(["nm", objfile], stdout=subprocess.PIPE)\
    
    gback's avatar
    gback committed
            .communicate()[0].decode().split("\n")
    
    
        for sym in symbols:
            if sym == "" or re.match("\s+U (\S+)", sym):
                continue
    
            m = re.match(hex + " (\S) (\S+)", sym)
            if not m:
                raise Exception("unexpected line in nm:\n" + sym)
    
            if m.group(1).islower():    # local symbols are fine
                continue
    
            if m.group(1) == "T":
                if m.group(2) in allowedsymbols:
                    continue
                
                if grade_mode:
                    op = open(results_file, 'w')
                    op.write(json.dumps({'error': 'defines global function %s' % m.group(2)}))
                    op.close()
                raise Exception(("%s defines global function '%s'\n"
                    +"allowed functions are: %s") % (objfile, m.group(2), str(allowedsymbols)))
    
            if grade_mode:
                op = open(results_file, 'w')
                op.write(json.dumps({'error': 'defines global symbol %s' % m.group(2)}))
                op.close()
            raise Exception(("%s must not define any global or static variables"
                    +", but you define: %s") % (objfile, m.group(2)))
    
    allowedsymbols = [ "future_free", "future_get",
                       "thread_pool_new", "thread_pool_shutdown_and_destroy", 
                       "thread_pool_submit" ] 
    
    #
    # build it (like check.py)
    #
    
    def count_number_of_processes():
    
    gback's avatar
    gback committed
        proc = subprocess.Popen(["ps", "ux", "-L"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    
    
        stdout, stderr = proc.communicate()
        # -2 for ps and header
    
    gback's avatar
    gback committed
        return len(stdout.decode().strip().split('\n')) - 2
    
    
    def get_load_average():
        """
        Returns tuple nproc, loadavg where nproc is the current number of
        running threads (minus 1) and loadavg is the load average
        """
        # 0.57 0.65 0.54 1/426 28121
    
    gback's avatar
    gback committed
        with open("/proc/loadavg") as f:
            c = f.read().strip()
            m = re.match(r'(\S+) \S+ \S+ (\d+)/\d+ \d+', c)
            load = float(m.groups()[0])
            nprocs = int(m.groups()[1])
    
    
        return nprocs - 1, load
    
    def wait_for_load_to_go_down():
        while True:
    
    gback's avatar
    gback committed
            time.sleep(0.2)
    
            nprocs, load = get_load_average()
            if nprocs == 0 and load < 1.0:
                break
    
    
    gback's avatar
    gback committed
            print ("Warning. There are other %d processes running on this machine, loadavg %f." % (nprocs, load))
            print ("Sleeping for 1 second.  Use the -a switch to run the benchmarks regardless.")
            time.sleep(0.8)
    
    
    # run tests
    #
    # echo test:
    #  progname  + set of inputs/cmdline parameters
    #
    # each should support -n <thread> for number of threads
    #
    
    #
    # result: expected output + exit code(?)
    #
    
    def set_threadlimit(nthreads):
    
    gback's avatar
    gback committed
        def limit_threads():
    
            resource.setrlimit(resource.RLIMIT_NPROC, (nthreads, nthreads))
    
    gback's avatar
    gback committed
        return limit_threads
    
    
    def run_single_test(test, run, threads):
        cmdline = ['timeout', str(run.timeout), test.command, '-n', str(threads)] + run.args
        rundata = {
            'command' : ' '.join(cmdline),
            'nthreads' : threads
        }
        def addrundata(d):
            for k, v in d.items():
                rundata[k] = v
    
        if not silent:
    
    gback's avatar
    gback committed
            print ('Running:', ' '.join(cmdline), end=' ')
    
            sys.stdout.flush()
        infile = None
        if run.input_file:
            infile = open(run.input_file, 'r')
        # preexec_fn sets the system-wide NPROC for this user.
        # we set it to #threads + 1 (for the main thread)
        # plus existing procs
        starttime = time.time()
    
    gback's avatar
    gback committed
        preexec_fn=set_threadlimit(threads + 2 + number_of_existing_processes) if test.limit_threads \
                    else (lambda : None)
    
        proc = subprocess.Popen(cmdline, stdout=subprocess.PIPE,
                                stderr=subprocess.PIPE, stdin=infile,
    
    gback's avatar
    gback committed
                                preexec_fn=preexec_fn)
    
    
        stdout, stderr = proc.communicate()
        if grade_mode:
            stdout = stdout[:100]
            stderr = stderr[:100]
        runningtime = time.time() - starttime
    
        if infile:
            infile.close()
    
    
    gback's avatar
    gback committed
        signames = dict((k, v) for v, k in signal.__dict__.items() if v.startswith('SIG'))
    
        signum = proc.returncode - 128
        if proc.returncode < 0:
            signum = -proc.returncode
        if proc.returncode >= 128 or proc.returncode < 0 and signum in signames:
            timeoutmsg = ''
            if runningtime >= run.timeout:
                timeoutmsg = '\nProgram ran and most likely timed out at %.3fs' % (runningtime)
            error = """\
            Program terminated with signal %d (%s) %s
            --------------------------------------------
            Program output:
            %s
            StdErr output:
            %s
    
    gback's avatar
    gback committed
            """ % (signum, signames[signum], timeoutmsg, stdout.decode(), stderr.decode())
    
            addrundata({
                'error': error
            })
            if not silent:
    
    gback's avatar
    gback committed
                print ('[ ]')
    
    gback's avatar
    gback committed
                    print (error)
    
    
        elif proc.returncode > 0:
            # non-zero exit code
            timeoutmsg = ''
            if proc.returncode == 124:
                timeoutmsg = '\nProgram ran and most likely timed out at %.3fs' % (runningtime)
    
            error = """\
            Program exited with non-zero exit status %d. %s
            --------------------------------------------
            Program output:
            %s
            StdErr output:
            %s
    
    gback's avatar
    gback committed
            """ % (proc.returncode, timeoutmsg, stdout.decode(), stderr.decode())
    
    
            addrundata({
                'error': error
            })
            if not silent:
    
    gback's avatar
    gback committed
                print ('[ ]')
    
    gback's avatar
    gback committed
                    print (error)
    
    gback's avatar
    gback committed
                print ('[+]')
    
    
            outfile = 'runresult.%d.json' % (proc.pid)
    
    gback's avatar
    gback committed
            addrundata({'stdout': stdout.decode()})
            if len(stderr) > 0: 
                addrundata({'stderr': stderr.decode()})
    
    
            if not os.access(outfile, os.R_OK):
                addrundata({
                    'error': 'The benchmark did not create the expected result file %s' % outfile
                })
            else:
    
    gback's avatar
    gback committed
                with open(outfile, 'r') as f:
                    data = f.read()
    
    
                addrundata(json.loads(data))
                os.unlink(outfile)
        return rundata
        
    def average_run(runs):
        data = {
            'nthreads': runs[0]['nthreads'],
            'command': runs[0]['command']
        }
        totalrtime = 0.0
        totalstime = 0.0
        totalutime = 0.0
        error = None
        passed = 0
        for run in runs:
            if 'error' in run:
                error = run['error']
            else:
                passed += 1
                totalrtime += run['realtime']
                totalstime += run['ru_stime']
                totalutime += run['ru_utime']
    
        if error or passed != len(runs):
            data['error'] = error
            data['run_count'] = len(runs)
            data['passed'] = passed
        else:
            data['realtime'] = totalrtime / len(runs)
            data['cputime'] = (totalstime + totalutime) / len(runs)
            data['ru_stime'] = totalstime / len(runs)
            data['ru_utime'] = totalutime / len(runs)
            data['run_count'] = len(runs)
            data['passed'] = passed
    
        return data
    
    def benchmark_speedup(data, testname):
        if 'realtime' not in data or 'cputime' not in data:
            return
        serial_time = benchmark_times[testname]
        data['speedup'] = serial_time / data['realtime']
        data['cpu_overuse'] = data['cputime'] / data['realtime']
    
    def run_tests(tests):
        results = defaultdict(dict)
    
        summary = {}
        for test in tests:
            if not runfilter(test):
                if verbose:
    
    gback's avatar
    gback committed
                    print ('Skipping test: ' + test.description)
    
    gback's avatar
    gback committed
                print ('')
                print ('Starting test: ' + test.description)
                print ('=' * 80)
    
    
            results[test.name] = {}
            for run in test.runs:
                perthreadresults = []
                results[test.name][run.name] = perthreadresults
    
                for threads in run.thread_count:
                    if grade_mode:
                        repeats = benchmark_runs if test.is_required or run.is_benchmarked else 1
                        runs = []
                        for repeat in range(repeats):
                            runs.append(run_single_test(test, run, threads))
                        rundata = average_run(runs)
                        rundata['runs'] = runs
                        if run.is_benchmarked:
                            benchmark_speedup(rundata, run.name)
                        perthreadresults.append(rundata)
                    else:
                        runs = [run_single_test(test, run, threads)]
                        rundata = average_run(runs)
                        rundata['runs'] = runs
                        perthreadresults.append(rundata)
        return results
    
    def print_results(results):
    
    gback's avatar
    gback committed
        print (json.dumps(results, indent = 4, sort_keys = True, separators = (',', ': ')))
        # try pretting printing errors for better readability
        try:
            def print_info(o):
                for k in ['error', 'stderr', 'stdout']:
                    if k in o:
                        print (o[k])
    
            for test, r in results.items():
                for rr in r.values():
                    for o in rr:
                        print_info(o)
                        if 'runs' in o:
                            map(print_info, o['runs'])
    
        except Exception as e:
            print(e)
            pass
    
    
    
    def write_results_to_json(filename):
        jfile = open(results_file, "w")
    
    gback's avatar
    gback committed
        print (json.dumps(results, indent = 4, sort_keys = True, separators = (',', ': ')), file=jfile)
    
        jfile.close()
    
    def find_thread_run(perthreadresults, threadcount):
    
    gback's avatar
    gback committed
        return next(filter(lambda result: result['nthreads'] == threadcount, perthreadresults), None)
    
    
    def print_grade_table(results, tests):
    
    gback's avatar
    gback committed
        # report the results of running each tests with 
        thread_headers = [1, 2, 4, 8, 16, 32]
        print ('')
        print ('Test name:' + (16 * ' ') + ''.join(map(lambda x: '%-10s' % str(x), thread_headers)))
        print ('='*80)
    
        minimum_requirements = True
        for test in tests:
            if not runfilter(test) and test.is_required:
                if not silent:
    
    gback's avatar
    gback committed
                    print ('WARNING: Skipping minimum requirement test (%s), will not say you passed!' % test.name)
    
                minimum_requirements = False
    
            if not runfilter(test):
                continue
            if not test.name in results:
    
    gback's avatar
    gback committed
                print ('%s: could not find test data!' % test.name)
    
    gback's avatar
    gback committed
            print ('%s:' % test.name.upper() + '  ' + test.description)
    
        
            passed = True
            for run in test.runs:
                statuses = []
                for threads in thread_headers:
    
    gback's avatar
    gback committed
                    thread_run = find_thread_run(res[run.name], threads)
    
    gback's avatar
    gback committed
                        statuses.append('')  # MISSING
    
                    elif 'error' in thread_run:
                        passed = False
                        statuses.append('[ ]')
                    elif run.is_benchmarked:
                        statuses.append('[%.3fs]' % thread_run['realtime'])
                    else:
                        statuses.append('[X]')
    
    
    gback's avatar
    gback committed
                print ('  %-23s' % (run.name) + ''.join(map(lambda x: '%-10s' % x, statuses)))
    
            
            if not passed and test.is_required:
                minimum_requirements = False
    
    
    gback's avatar
    gback committed
        print ('='*80)
        if minimum_requirements:
            print ('You have met minimum requirements, your performance score will count.')
        else:
            print ('You did not meet minimum requirements; your performance score will be zero.')
    
    
    
    setup_working_directory()
    check_software_engineering("threadpool.o", allowedsymbols)
    number_of_existing_processes = count_number_of_processes()
    if verbose:
    
    gback's avatar
    gback committed
        print ("There are %d process currently running for user %s" % (number_of_existing_processes, os.getlogin()))
    
    
    if not ignore_if_not_idle:
        wait_for_load_to_go_down()
        
    results = run_tests(tests)
    if verbose:
        print_results(results)
    if not silent:
        print_grade_table(results, tests)
    
    
    gback's avatar
    gback committed
    write_results_to_json(results_file)
    print ("Wrote full results to %s/%s" % (workdir, results_file))