jobs.go raw

   1  package builder
   2  
   3  // This file implements a job runner for the compiler, which runs jobs in
   4  // parallel while taking care of dependencies.
   5  
   6  import (
   7  	"container/heap"
   8  	"errors"
   9  	"fmt"
  10  	"runtime"
  11  	"sort"
  12  	"strings"
  13  	"time"
  14  )
  15  
  16  // Set to true to enable logging in the job runner. This may help to debug
  17  // concurrency or performance issues.
  18  const jobRunnerDebug = false
  19  
  20  // compileJob is a single compiler job, comparable to a single Makefile target.
  21  // It is used to orchestrate various compiler tasks that can be run in parallel
  22  // but that have dependencies and thus have limitations in how they can be run.
  23  type compileJob struct {
  24  	description  string // description, only used for logging
  25  	dependencies []*compileJob
  26  	result       string // result (path)
  27  	run          func(*compileJob) (err error)
  28  	err          error         // error if finished
  29  	duration     time.Duration // how long it took to run this job (only set after finishing)
  30  }
  31  
  32  // dummyCompileJob returns a new *compileJob that produces an output without
  33  // doing anything. This can be useful where a *compileJob producing an output is
  34  // expected but nothing needs to be done, for example for a load from a cache.
  35  func dummyCompileJob(result string) *compileJob {
  36  	return &compileJob{
  37  		description: "<dummy>",
  38  		result:      result,
  39  	}
  40  }
  41  
  42  // runJobs runs the indicated job and all its dependencies. For every job, all
  43  // the dependencies are run first. It returns the error of the first job that
  44  // fails.
  45  // It runs all jobs in the order of the dependencies slice, depth-first.
  46  // Therefore, if some jobs are preferred to run before others, they should be
  47  // ordered as such in the job dependencies.
  48  func runJobs(job *compileJob, sema chan struct{}) error {
  49  	if sema == nil {
  50  		// Have a default, if the semaphore isn't set. This is useful for tests.
  51  		sema = make(chan struct{}, runtime.NumCPU())
  52  	}
  53  	if cap(sema) == 0 {
  54  		return errors.New("cannot run 0 jobs at a time")
  55  	}
  56  
  57  	// Create a slice of jobs to run, where all dependencies are run in order.
  58  	jobs := []*compileJob{}
  59  	addedJobs := map[*compileJob]struct{}{}
  60  	var addJobs func(*compileJob)
  61  	addJobs = func(job *compileJob) {
  62  		if _, ok := addedJobs[job]; ok {
  63  			return
  64  		}
  65  		for _, dep := range job.dependencies {
  66  			addJobs(dep)
  67  		}
  68  		jobs = append(jobs, job)
  69  		addedJobs[job] = struct{}{}
  70  	}
  71  	addJobs(job)
  72  
  73  	waiting := make(map[*compileJob]map[*compileJob]struct{}, len(jobs))
  74  	dependents := make(map[*compileJob][]*compileJob, len(jobs))
  75  	compileJobs := make(map[*compileJob]int)
  76  	var ready intHeap
  77  	for i, job := range jobs {
  78  		compileJobs[job] = i
  79  		if len(job.dependencies) == 0 {
  80  			// This job is ready to run.
  81  			ready.Push(i)
  82  			continue
  83  		}
  84  
  85  		// Construct a map for dependencies which the job is currently waiting on.
  86  		waitDeps := make(map[*compileJob]struct{})
  87  		waiting[job] = waitDeps
  88  
  89  		// Add the job to the dependents list of each dependency.
  90  		for _, dep := range job.dependencies {
  91  			dependents[dep] = append(dependents[dep], job)
  92  			waitDeps[dep] = struct{}{}
  93  		}
  94  	}
  95  
  96  	// Create a channel to accept notifications of completion.
  97  	doneChan := make(chan *compileJob)
  98  
  99  	// Send each job in the jobs slice to a worker, taking care of job dependencies.
 100  	numRunningJobs := 0
 101  	var totalTime time.Duration
 102  	start := time.Now()
 103  	for len(ready.IntSlice) > 0 || numRunningJobs != 0 {
 104  		var completed *compileJob
 105  		if len(ready.IntSlice) > 0 {
 106  			select {
 107  			case sema <- struct{}{}:
 108  				// Start a job.
 109  				job := jobs[heap.Pop(&ready).(int)]
 110  				if jobRunnerDebug {
 111  					fmt.Println("## start:   ", job.description)
 112  				}
 113  				go runJob(job, doneChan)
 114  				numRunningJobs++
 115  				continue
 116  
 117  			case completed = <-doneChan:
 118  				// A job completed.
 119  			}
 120  		} else {
 121  			// Wait for a job to complete.
 122  			completed = <-doneChan
 123  		}
 124  		numRunningJobs--
 125  		<-sema
 126  		if jobRunnerDebug {
 127  			fmt.Println("## finished:", completed.description, "(time "+completed.duration.String()+")")
 128  		}
 129  		if completed.err != nil {
 130  			// Wait for any current jobs to finish.
 131  			for numRunningJobs != 0 {
 132  				<-doneChan
 133  				numRunningJobs--
 134  			}
 135  
 136  			// The build failed.
 137  			return completed.err
 138  		}
 139  
 140  		// Update total run time.
 141  		totalTime += completed.duration
 142  
 143  		// Update dependent jobs.
 144  		for _, j := range dependents[completed] {
 145  			wait := waiting[j]
 146  			delete(wait, completed)
 147  			if len(wait) == 0 {
 148  				// This job is now ready to run.
 149  				ready.Push(compileJobs[j])
 150  				delete(waiting, j)
 151  			}
 152  		}
 153  	}
 154  	if len(waiting) != 0 {
 155  		// There is a dependency cycle preventing some jobs from running.
 156  		return errDependencyCycle{waiting}
 157  	}
 158  
 159  	// Some statistics, if debugging.
 160  	if jobRunnerDebug {
 161  		// Total duration of running all jobs.
 162  		duration := time.Since(start)
 163  		fmt.Println("## total:   ", duration)
 164  
 165  		// The individual time of each job combined. On a multicore system, this
 166  		// should be lower than the total above.
 167  		fmt.Println("## job sum: ", totalTime)
 168  	}
 169  
 170  	return nil
 171  }
 172  
 173  type errDependencyCycle struct {
 174  	waiting map[*compileJob]map[*compileJob]struct{}
 175  }
 176  
 177  func (err errDependencyCycle) Error() string {
 178  	waits := make([]string, 0, len(err.waiting))
 179  	for j, wait := range err.waiting {
 180  		deps := make([]string, 0, len(wait))
 181  		for dep := range wait {
 182  			deps = append(deps, dep.description)
 183  		}
 184  		sort.Strings(deps)
 185  
 186  		waits = append(waits, fmt.Sprintf("\t%s is waiting for [%s]",
 187  			j.description, strings.Join(deps, ", "),
 188  		))
 189  	}
 190  	sort.Strings(waits)
 191  	return "deadlock:\n" + strings.Join(waits, "\n")
 192  }
 193  
 194  type intHeap struct {
 195  	sort.IntSlice
 196  }
 197  
 198  func (h *intHeap) Push(x interface{}) {
 199  	h.IntSlice = append(h.IntSlice, x.(int))
 200  }
 201  
 202  func (h *intHeap) Pop() interface{} {
 203  	x := h.IntSlice[len(h.IntSlice)-1]
 204  	h.IntSlice = h.IntSlice[:len(h.IntSlice)-1]
 205  	return x
 206  }
 207  
 208  // runJob runs a compile job and notifies doneChan of completion.
 209  func runJob(job *compileJob, doneChan chan *compileJob) {
 210  	start := time.Now()
 211  	if job.run != nil {
 212  		err := job.run(job)
 213  		if err != nil {
 214  			job.err = err
 215  		}
 216  	}
 217  	job.duration = time.Since(start)
 218  	doneChan <- job
 219  }
 220