1 // Copyright 2016 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4 5 package http2
6 7 import (
8 "fmt"
9 "math"
10 "sort"
11 )
12 13 // RFC 7540, Section 5.3.5: the default weight is 16.
14 const priorityDefaultWeightRFC7540 = 15 // 16 = 15 + 1
15 16 // PriorityWriteSchedulerConfig configures a priorityWriteScheduler.
17 type PriorityWriteSchedulerConfig struct {
18 // MaxClosedNodesInTree controls the maximum number of closed streams to
19 // retain in the priority tree. Setting this to zero saves a small amount
20 // of memory at the cost of performance.
21 //
22 // See RFC 7540, Section 5.3.4:
23 // "It is possible for a stream to become closed while prioritization
24 // information ... is in transit. ... This potentially creates suboptimal
25 // prioritization, since the stream could be given a priority that is
26 // different from what is intended. To avoid these problems, an endpoint
27 // SHOULD retain stream prioritization state for a period after streams
28 // become closed. The longer state is retained, the lower the chance that
29 // streams are assigned incorrect or default priority values."
30 MaxClosedNodesInTree int
31 32 // MaxIdleNodesInTree controls the maximum number of idle streams to
33 // retain in the priority tree. Setting this to zero saves a small amount
34 // of memory at the cost of performance.
35 //
36 // See RFC 7540, Section 5.3.4:
37 // Similarly, streams that are in the "idle" state can be assigned
38 // priority or become a parent of other streams. This allows for the
39 // creation of a grouping node in the dependency tree, which enables
40 // more flexible expressions of priority. Idle streams begin with a
41 // default priority (Section 5.3.5).
42 MaxIdleNodesInTree int
43 44 // ThrottleOutOfOrderWrites enables write throttling to help ensure that
45 // data is delivered in priority order. This works around a race where
46 // stream B depends on stream A and both streams are about to call Write
47 // to queue DATA frames. If B wins the race, a naive scheduler would eagerly
48 // write as much data from B as possible, but this is suboptimal because A
49 // is a higher-priority stream. With throttling enabled, we write a small
50 // amount of data from B to minimize the amount of bandwidth that B can
51 // steal from A.
52 ThrottleOutOfOrderWrites bool
53 }
54 55 // NewPriorityWriteScheduler constructs a WriteScheduler that schedules
56 // frames by following HTTP/2 priorities as described in RFC 7540 Section 5.3.
57 // If cfg is nil, default options are used.
58 func NewPriorityWriteScheduler(cfg *PriorityWriteSchedulerConfig) WriteScheduler {
59 if cfg == nil {
60 // For justification of these defaults, see:
61 // https://docs.google.com/document/d/1oLhNg1skaWD4_DtaoCxdSRN5erEXrH-KnLrMwEpOtFY
62 cfg = &PriorityWriteSchedulerConfig{
63 MaxClosedNodesInTree: 10,
64 MaxIdleNodesInTree: 10,
65 ThrottleOutOfOrderWrites: false,
66 }
67 }
68 69 ws := &priorityWriteSchedulerRFC7540{
70 nodes: make(map[uint32]*priorityNodeRFC7540),
71 maxClosedNodesInTree: cfg.MaxClosedNodesInTree,
72 maxIdleNodesInTree: cfg.MaxIdleNodesInTree,
73 enableWriteThrottle: cfg.ThrottleOutOfOrderWrites,
74 }
75 ws.nodes[0] = &ws.root
76 if cfg.ThrottleOutOfOrderWrites {
77 ws.writeThrottleLimit = 1024
78 } else {
79 ws.writeThrottleLimit = math.MaxInt32
80 }
81 return ws
82 }
83 84 type priorityNodeStateRFC7540 int
85 86 const (
87 priorityNodeOpenRFC7540 priorityNodeStateRFC7540 = iota
88 priorityNodeClosedRFC7540
89 priorityNodeIdleRFC7540
90 )
91 92 // priorityNodeRFC7540 is a node in an HTTP/2 priority tree.
93 // Each node is associated with a single stream ID.
94 // See RFC 7540, Section 5.3.
95 type priorityNodeRFC7540 struct {
96 q writeQueue // queue of pending frames to write
97 id uint32 // id of the stream, or 0 for the root of the tree
98 weight uint8 // the actual weight is weight+1, so the value is in [1,256]
99 state priorityNodeStateRFC7540 // open | closed | idle
100 bytes int64 // number of bytes written by this node, or 0 if closed
101 subtreeBytes int64 // sum(node.bytes) of all nodes in this subtree
102 103 // These links form the priority tree.
104 parent *priorityNodeRFC7540
105 kids *priorityNodeRFC7540 // start of the kids list
106 prev, next *priorityNodeRFC7540 // doubly-linked list of siblings
107 }
108 109 func (n *priorityNodeRFC7540) setParent(parent *priorityNodeRFC7540) {
110 if n == parent {
111 panic("setParent to self")
112 }
113 if n.parent == parent {
114 return
115 }
116 // Unlink from current parent.
117 if parent := n.parent; parent != nil {
118 if n.prev == nil {
119 parent.kids = n.next
120 } else {
121 n.prev.next = n.next
122 }
123 if n.next != nil {
124 n.next.prev = n.prev
125 }
126 }
127 // Link to new parent.
128 // If parent=nil, remove n from the tree.
129 // Always insert at the head of parent.kids (this is assumed by walkReadyInOrder).
130 n.parent = parent
131 if parent == nil {
132 n.next = nil
133 n.prev = nil
134 } else {
135 n.next = parent.kids
136 n.prev = nil
137 if n.next != nil {
138 n.next.prev = n
139 }
140 parent.kids = n
141 }
142 }
143 144 func (n *priorityNodeRFC7540) addBytes(b int64) {
145 n.bytes += b
146 for ; n != nil; n = n.parent {
147 n.subtreeBytes += b
148 }
149 }
150 151 // walkReadyInOrder iterates over the tree in priority order, calling f for each node
152 // with a non-empty write queue. When f returns true, this function returns true and the
153 // walk halts. tmp is used as scratch space for sorting.
154 //
155 // f(n, openParent) takes two arguments: the node to visit, n, and a bool that is true
156 // if any ancestor p of n is still open (ignoring the root node).
157 func (n *priorityNodeRFC7540) walkReadyInOrder(openParent bool, tmp *[]*priorityNodeRFC7540, f func(*priorityNodeRFC7540, bool) bool) bool {
158 if !n.q.empty() && f(n, openParent) {
159 return true
160 }
161 if n.kids == nil {
162 return false
163 }
164 165 // Don't consider the root "open" when updating openParent since
166 // we can't send data frames on the root stream (only control frames).
167 if n.id != 0 {
168 openParent = openParent || (n.state == priorityNodeOpenRFC7540)
169 }
170 171 // Common case: only one kid or all kids have the same weight.
172 // Some clients don't use weights; other clients (like web browsers)
173 // use mostly-linear priority trees.
174 w := n.kids.weight
175 needSort := false
176 for k := n.kids.next; k != nil; k = k.next {
177 if k.weight != w {
178 needSort = true
179 break
180 }
181 }
182 if !needSort {
183 for k := n.kids; k != nil; k = k.next {
184 if k.walkReadyInOrder(openParent, tmp, f) {
185 return true
186 }
187 }
188 return false
189 }
190 191 // Uncommon case: sort the child nodes. We remove the kids from the parent,
192 // then re-insert after sorting so we can reuse tmp for future sort calls.
193 *tmp = (*tmp)[:0]
194 for n.kids != nil {
195 *tmp = append(*tmp, n.kids)
196 n.kids.setParent(nil)
197 }
198 sort.Sort(sortPriorityNodeSiblingsRFC7540(*tmp))
199 for i := len(*tmp) - 1; i >= 0; i-- {
200 (*tmp)[i].setParent(n) // setParent inserts at the head of n.kids
201 }
202 for k := n.kids; k != nil; k = k.next {
203 if k.walkReadyInOrder(openParent, tmp, f) {
204 return true
205 }
206 }
207 return false
208 }
209 210 type sortPriorityNodeSiblingsRFC7540 []*priorityNodeRFC7540
211 212 func (z sortPriorityNodeSiblingsRFC7540) Len() int { return len(z) }
213 func (z sortPriorityNodeSiblingsRFC7540) Swap(i, k int) { z[i], z[k] = z[k], z[i] }
214 func (z sortPriorityNodeSiblingsRFC7540) Less(i, k int) bool {
215 // Prefer the subtree that has sent fewer bytes relative to its weight.
216 // See sections 5.3.2 and 5.3.4.
217 wi, bi := float64(z[i].weight)+1, float64(z[i].subtreeBytes)
218 wk, bk := float64(z[k].weight)+1, float64(z[k].subtreeBytes)
219 if bi == 0 && bk == 0 {
220 return wi >= wk
221 }
222 if bk == 0 {
223 return false
224 }
225 return bi/bk <= wi/wk
226 }
227 228 type priorityWriteSchedulerRFC7540 struct {
229 // root is the root of the priority tree, where root.id = 0.
230 // The root queues control frames that are not associated with any stream.
231 root priorityNodeRFC7540
232 233 // nodes maps stream ids to priority tree nodes.
234 nodes map[uint32]*priorityNodeRFC7540
235 236 // maxID is the maximum stream id in nodes.
237 maxID uint32
238 239 // lists of nodes that have been closed or are idle, but are kept in
240 // the tree for improved prioritization. When the lengths exceed either
241 // maxClosedNodesInTree or maxIdleNodesInTree, old nodes are discarded.
242 closedNodes, idleNodes []*priorityNodeRFC7540
243 244 // From the config.
245 maxClosedNodesInTree int
246 maxIdleNodesInTree int
247 writeThrottleLimit int32
248 enableWriteThrottle bool
249 250 // tmp is scratch space for priorityNode.walkReadyInOrder to reduce allocations.
251 tmp []*priorityNodeRFC7540
252 253 // pool of empty queues for reuse.
254 queuePool writeQueuePool
255 }
256 257 func (ws *priorityWriteSchedulerRFC7540) OpenStream(streamID uint32, options OpenStreamOptions) {
258 // The stream may be currently idle but cannot be opened or closed.
259 if curr := ws.nodes[streamID]; curr != nil {
260 if curr.state != priorityNodeIdleRFC7540 {
261 panic(fmt.Sprintf("stream %d already opened", streamID))
262 }
263 curr.state = priorityNodeOpenRFC7540
264 return
265 }
266 267 // RFC 7540, Section 5.3.5:
268 // "All streams are initially assigned a non-exclusive dependency on stream 0x0.
269 // Pushed streams initially depend on their associated stream. In both cases,
270 // streams are assigned a default weight of 16."
271 parent := ws.nodes[options.PusherID]
272 if parent == nil {
273 parent = &ws.root
274 }
275 n := &priorityNodeRFC7540{
276 q: *ws.queuePool.get(),
277 id: streamID,
278 weight: priorityDefaultWeightRFC7540,
279 state: priorityNodeOpenRFC7540,
280 }
281 n.setParent(parent)
282 ws.nodes[streamID] = n
283 if streamID > ws.maxID {
284 ws.maxID = streamID
285 }
286 }
287 288 func (ws *priorityWriteSchedulerRFC7540) CloseStream(streamID uint32) {
289 if streamID == 0 {
290 panic("violation of WriteScheduler interface: cannot close stream 0")
291 }
292 if ws.nodes[streamID] == nil {
293 panic(fmt.Sprintf("violation of WriteScheduler interface: unknown stream %d", streamID))
294 }
295 if ws.nodes[streamID].state != priorityNodeOpenRFC7540 {
296 panic(fmt.Sprintf("violation of WriteScheduler interface: stream %d already closed", streamID))
297 }
298 299 n := ws.nodes[streamID]
300 n.state = priorityNodeClosedRFC7540
301 n.addBytes(-n.bytes)
302 303 q := n.q
304 ws.queuePool.put(&q)
305 if ws.maxClosedNodesInTree > 0 {
306 ws.addClosedOrIdleNode(&ws.closedNodes, ws.maxClosedNodesInTree, n)
307 } else {
308 ws.removeNode(n)
309 }
310 }
311 312 func (ws *priorityWriteSchedulerRFC7540) AdjustStream(streamID uint32, priority PriorityParam) {
313 if streamID == 0 {
314 panic("adjustPriority on root")
315 }
316 317 // If streamID does not exist, there are two cases:
318 // - A closed stream that has been removed (this will have ID <= maxID)
319 // - An idle stream that is being used for "grouping" (this will have ID > maxID)
320 n := ws.nodes[streamID]
321 if n == nil {
322 if streamID <= ws.maxID || ws.maxIdleNodesInTree == 0 {
323 return
324 }
325 ws.maxID = streamID
326 n = &priorityNodeRFC7540{
327 q: *ws.queuePool.get(),
328 id: streamID,
329 weight: priorityDefaultWeightRFC7540,
330 state: priorityNodeIdleRFC7540,
331 }
332 n.setParent(&ws.root)
333 ws.nodes[streamID] = n
334 ws.addClosedOrIdleNode(&ws.idleNodes, ws.maxIdleNodesInTree, n)
335 }
336 337 // Section 5.3.1: A dependency on a stream that is not currently in the tree
338 // results in that stream being given a default priority (Section 5.3.5).
339 parent := ws.nodes[priority.StreamDep]
340 if parent == nil {
341 n.setParent(&ws.root)
342 n.weight = priorityDefaultWeightRFC7540
343 return
344 }
345 346 // Ignore if the client tries to make a node its own parent.
347 if n == parent {
348 return
349 }
350 351 // Section 5.3.3:
352 // "If a stream is made dependent on one of its own dependencies, the
353 // formerly dependent stream is first moved to be dependent on the
354 // reprioritized stream's previous parent. The moved dependency retains
355 // its weight."
356 //
357 // That is: if parent depends on n, move parent to depend on n.parent.
358 for x := parent.parent; x != nil; x = x.parent {
359 if x == n {
360 parent.setParent(n.parent)
361 break
362 }
363 }
364 365 // Section 5.3.3: The exclusive flag causes the stream to become the sole
366 // dependency of its parent stream, causing other dependencies to become
367 // dependent on the exclusive stream.
368 if priority.Exclusive {
369 k := parent.kids
370 for k != nil {
371 next := k.next
372 if k != n {
373 k.setParent(n)
374 }
375 k = next
376 }
377 }
378 379 n.setParent(parent)
380 n.weight = priority.Weight
381 }
382 383 func (ws *priorityWriteSchedulerRFC7540) Push(wr FrameWriteRequest) {
384 var n *priorityNodeRFC7540
385 if wr.isControl() {
386 n = &ws.root
387 } else {
388 id := wr.StreamID()
389 n = ws.nodes[id]
390 if n == nil {
391 // id is an idle or closed stream. wr should not be a HEADERS or
392 // DATA frame. In other case, we push wr onto the root, rather
393 // than creating a new priorityNode.
394 if wr.DataSize() > 0 {
395 panic("add DATA on non-open stream")
396 }
397 n = &ws.root
398 }
399 }
400 n.q.push(wr)
401 }
402 403 func (ws *priorityWriteSchedulerRFC7540) Pop() (wr FrameWriteRequest, ok bool) {
404 ws.root.walkReadyInOrder(false, &ws.tmp, func(n *priorityNodeRFC7540, openParent bool) bool {
405 limit := int32(math.MaxInt32)
406 if openParent {
407 limit = ws.writeThrottleLimit
408 }
409 wr, ok = n.q.consume(limit)
410 if !ok {
411 return false
412 }
413 n.addBytes(int64(wr.DataSize()))
414 // If B depends on A and B continuously has data available but A
415 // does not, gradually increase the throttling limit to allow B to
416 // steal more and more bandwidth from A.
417 if openParent {
418 ws.writeThrottleLimit += 1024
419 if ws.writeThrottleLimit < 0 {
420 ws.writeThrottleLimit = math.MaxInt32
421 }
422 } else if ws.enableWriteThrottle {
423 ws.writeThrottleLimit = 1024
424 }
425 return true
426 })
427 return wr, ok
428 }
429 430 func (ws *priorityWriteSchedulerRFC7540) addClosedOrIdleNode(list *[]*priorityNodeRFC7540, maxSize int, n *priorityNodeRFC7540) {
431 if maxSize == 0 {
432 return
433 }
434 if len(*list) == maxSize {
435 // Remove the oldest node, then shift left.
436 ws.removeNode((*list)[0])
437 x := (*list)[1:]
438 copy(*list, x)
439 *list = (*list)[:len(x)]
440 }
441 *list = append(*list, n)
442 }
443 444 func (ws *priorityWriteSchedulerRFC7540) removeNode(n *priorityNodeRFC7540) {
445 for n.kids != nil {
446 n.kids.setParent(n.parent)
447 }
448 n.setParent(nil)
449 delete(ws.nodes, n.id)
450 }
451