1 package builder
2 3 // This file implements a job runner for the compiler, which runs jobs in
4 // parallel while taking care of dependencies.
5 6 import (
7 "container/heap"
8 "errors"
9 "fmt"
10 "runtime"
11 "sort"
12 "strings"
13 "time"
14 )
15 16 // Set to true to enable logging in the job runner. This may help to debug
17 // concurrency or performance issues.
18 const jobRunnerDebug = false
19 20 // compileJob is a single compiler job, comparable to a single Makefile target.
21 // It is used to orchestrate various compiler tasks that can be run in parallel
22 // but that have dependencies and thus have limitations in how they can be run.
23 type compileJob struct {
24 description string // description, only used for logging
25 dependencies []*compileJob
26 result string // result (path)
27 run func(*compileJob) (err error)
28 err error // error if finished
29 duration time.Duration // how long it took to run this job (only set after finishing)
30 }
31 32 // dummyCompileJob returns a new *compileJob that produces an output without
33 // doing anything. This can be useful where a *compileJob producing an output is
34 // expected but nothing needs to be done, for example for a load from a cache.
35 func dummyCompileJob(result string) *compileJob {
36 return &compileJob{
37 description: "<dummy>",
38 result: result,
39 }
40 }
41 42 // runJobs runs the indicated job and all its dependencies. For every job, all
43 // the dependencies are run first. It returns the error of the first job that
44 // fails.
45 // It runs all jobs in the order of the dependencies slice, depth-first.
46 // Therefore, if some jobs are preferred to run before others, they should be
47 // ordered as such in the job dependencies.
48 func runJobs(job *compileJob, sema chan struct{}) error {
49 if sema == nil {
50 // Have a default, if the semaphore isn't set. This is useful for tests.
51 sema = make(chan struct{}, runtime.NumCPU())
52 }
53 if cap(sema) == 0 {
54 return errors.New("cannot run 0 jobs at a time")
55 }
56 57 // Create a slice of jobs to run, where all dependencies are run in order.
58 jobs := []*compileJob{}
59 addedJobs := map[*compileJob]struct{}{}
60 var addJobs func(*compileJob)
61 addJobs = func(job *compileJob) {
62 if _, ok := addedJobs[job]; ok {
63 return
64 }
65 for _, dep := range job.dependencies {
66 addJobs(dep)
67 }
68 jobs = append(jobs, job)
69 addedJobs[job] = struct{}{}
70 }
71 addJobs(job)
72 73 waiting := make(map[*compileJob]map[*compileJob]struct{}, len(jobs))
74 dependents := make(map[*compileJob][]*compileJob, len(jobs))
75 compileJobs := make(map[*compileJob]int)
76 var ready intHeap
77 for i, job := range jobs {
78 compileJobs[job] = i
79 if len(job.dependencies) == 0 {
80 // This job is ready to run.
81 ready.Push(i)
82 continue
83 }
84 85 // Construct a map for dependencies which the job is currently waiting on.
86 waitDeps := make(map[*compileJob]struct{})
87 waiting[job] = waitDeps
88 89 // Add the job to the dependents list of each dependency.
90 for _, dep := range job.dependencies {
91 dependents[dep] = append(dependents[dep], job)
92 waitDeps[dep] = struct{}{}
93 }
94 }
95 96 // Create a channel to accept notifications of completion.
97 doneChan := make(chan *compileJob)
98 99 // Send each job in the jobs slice to a worker, taking care of job dependencies.
100 numRunningJobs := 0
101 var totalTime time.Duration
102 start := time.Now()
103 for len(ready.IntSlice) > 0 || numRunningJobs != 0 {
104 var completed *compileJob
105 if len(ready.IntSlice) > 0 {
106 select {
107 case sema <- struct{}{}:
108 // Start a job.
109 job := jobs[heap.Pop(&ready).(int)]
110 if jobRunnerDebug {
111 fmt.Println("## start: ", job.description)
112 }
113 go runJob(job, doneChan)
114 numRunningJobs++
115 continue
116 117 case completed = <-doneChan:
118 // A job completed.
119 }
120 } else {
121 // Wait for a job to complete.
122 completed = <-doneChan
123 }
124 numRunningJobs--
125 <-sema
126 if jobRunnerDebug {
127 fmt.Println("## finished:", completed.description, "(time "+completed.duration.String()+")")
128 }
129 if completed.err != nil {
130 // Wait for any current jobs to finish.
131 for numRunningJobs != 0 {
132 <-doneChan
133 numRunningJobs--
134 }
135 136 // The build failed.
137 return completed.err
138 }
139 140 // Update total run time.
141 totalTime += completed.duration
142 143 // Update dependent jobs.
144 for _, j := range dependents[completed] {
145 wait := waiting[j]
146 delete(wait, completed)
147 if len(wait) == 0 {
148 // This job is now ready to run.
149 ready.Push(compileJobs[j])
150 delete(waiting, j)
151 }
152 }
153 }
154 if len(waiting) != 0 {
155 // There is a dependency cycle preventing some jobs from running.
156 return errDependencyCycle{waiting}
157 }
158 159 // Some statistics, if debugging.
160 if jobRunnerDebug {
161 // Total duration of running all jobs.
162 duration := time.Since(start)
163 fmt.Println("## total: ", duration)
164 165 // The individual time of each job combined. On a multicore system, this
166 // should be lower than the total above.
167 fmt.Println("## job sum: ", totalTime)
168 }
169 170 return nil
171 }
172 173 type errDependencyCycle struct {
174 waiting map[*compileJob]map[*compileJob]struct{}
175 }
176 177 func (err errDependencyCycle) Error() string {
178 waits := make([]string, 0, len(err.waiting))
179 for j, wait := range err.waiting {
180 deps := make([]string, 0, len(wait))
181 for dep := range wait {
182 deps = append(deps, dep.description)
183 }
184 sort.Strings(deps)
185 186 waits = append(waits, fmt.Sprintf("\t%s is waiting for [%s]",
187 j.description, strings.Join(deps, ", "),
188 ))
189 }
190 sort.Strings(waits)
191 return "deadlock:\n" + strings.Join(waits, "\n")
192 }
193 194 type intHeap struct {
195 sort.IntSlice
196 }
197 198 func (h *intHeap) Push(x interface{}) {
199 h.IntSlice = append(h.IntSlice, x.(int))
200 }
201 202 func (h *intHeap) Pop() interface{} {
203 x := h.IntSlice[len(h.IntSlice)-1]
204 h.IntSlice = h.IntSlice[:len(h.IntSlice)-1]
205 return x
206 }
207 208 // runJob runs a compile job and notifies doneChan of completion.
209 func runJob(job *compileJob, doneChan chan *compileJob) {
210 start := time.Now()
211 if job.run != nil {
212 err := job.run(job)
213 if err != nil {
214 job.err = err
215 }
216 }
217 job.duration = time.Since(start)
218 doneChan <- job
219 }
220