writesched_priority_rfc7540.go raw

   1  // Copyright 2016 The Go Authors. All rights reserved.
   2  // Use of this source code is governed by a BSD-style
   3  // license that can be found in the LICENSE file.
   4  
   5  package http2
   6  
   7  import (
   8  	"fmt"
   9  	"math"
  10  	"sort"
  11  )
  12  
  13  // RFC 7540, Section 5.3.5: the default weight is 16.
  14  const priorityDefaultWeightRFC7540 = 15 // 16 = 15 + 1
  15  
  16  // PriorityWriteSchedulerConfig configures a priorityWriteScheduler.
  17  type PriorityWriteSchedulerConfig struct {
  18  	// MaxClosedNodesInTree controls the maximum number of closed streams to
  19  	// retain in the priority tree. Setting this to zero saves a small amount
  20  	// of memory at the cost of performance.
  21  	//
  22  	// See RFC 7540, Section 5.3.4:
  23  	//   "It is possible for a stream to become closed while prioritization
  24  	//   information ... is in transit. ... This potentially creates suboptimal
  25  	//   prioritization, since the stream could be given a priority that is
  26  	//   different from what is intended. To avoid these problems, an endpoint
  27  	//   SHOULD retain stream prioritization state for a period after streams
  28  	//   become closed. The longer state is retained, the lower the chance that
  29  	//   streams are assigned incorrect or default priority values."
  30  	MaxClosedNodesInTree int
  31  
  32  	// MaxIdleNodesInTree controls the maximum number of idle streams to
  33  	// retain in the priority tree. Setting this to zero saves a small amount
  34  	// of memory at the cost of performance.
  35  	//
  36  	// See RFC 7540, Section 5.3.4:
  37  	//   Similarly, streams that are in the "idle" state can be assigned
  38  	//   priority or become a parent of other streams. This allows for the
  39  	//   creation of a grouping node in the dependency tree, which enables
  40  	//   more flexible expressions of priority. Idle streams begin with a
  41  	//   default priority (Section 5.3.5).
  42  	MaxIdleNodesInTree int
  43  
  44  	// ThrottleOutOfOrderWrites enables write throttling to help ensure that
  45  	// data is delivered in priority order. This works around a race where
  46  	// stream B depends on stream A and both streams are about to call Write
  47  	// to queue DATA frames. If B wins the race, a naive scheduler would eagerly
  48  	// write as much data from B as possible, but this is suboptimal because A
  49  	// is a higher-priority stream. With throttling enabled, we write a small
  50  	// amount of data from B to minimize the amount of bandwidth that B can
  51  	// steal from A.
  52  	ThrottleOutOfOrderWrites bool
  53  }
  54  
  55  // NewPriorityWriteScheduler constructs a WriteScheduler that schedules
  56  // frames by following HTTP/2 priorities as described in RFC 7540 Section 5.3.
  57  // If cfg is nil, default options are used.
  58  func NewPriorityWriteScheduler(cfg *PriorityWriteSchedulerConfig) WriteScheduler {
  59  	if cfg == nil {
  60  		// For justification of these defaults, see:
  61  		// https://docs.google.com/document/d/1oLhNg1skaWD4_DtaoCxdSRN5erEXrH-KnLrMwEpOtFY
  62  		cfg = &PriorityWriteSchedulerConfig{
  63  			MaxClosedNodesInTree:     10,
  64  			MaxIdleNodesInTree:       10,
  65  			ThrottleOutOfOrderWrites: false,
  66  		}
  67  	}
  68  
  69  	ws := &priorityWriteSchedulerRFC7540{
  70  		nodes:                make(map[uint32]*priorityNodeRFC7540),
  71  		maxClosedNodesInTree: cfg.MaxClosedNodesInTree,
  72  		maxIdleNodesInTree:   cfg.MaxIdleNodesInTree,
  73  		enableWriteThrottle:  cfg.ThrottleOutOfOrderWrites,
  74  	}
  75  	ws.nodes[0] = &ws.root
  76  	if cfg.ThrottleOutOfOrderWrites {
  77  		ws.writeThrottleLimit = 1024
  78  	} else {
  79  		ws.writeThrottleLimit = math.MaxInt32
  80  	}
  81  	return ws
  82  }
  83  
  84  type priorityNodeStateRFC7540 int
  85  
  86  const (
  87  	priorityNodeOpenRFC7540 priorityNodeStateRFC7540 = iota
  88  	priorityNodeClosedRFC7540
  89  	priorityNodeIdleRFC7540
  90  )
  91  
  92  // priorityNodeRFC7540 is a node in an HTTP/2 priority tree.
  93  // Each node is associated with a single stream ID.
  94  // See RFC 7540, Section 5.3.
  95  type priorityNodeRFC7540 struct {
  96  	q            writeQueue               // queue of pending frames to write
  97  	id           uint32                   // id of the stream, or 0 for the root of the tree
  98  	weight       uint8                    // the actual weight is weight+1, so the value is in [1,256]
  99  	state        priorityNodeStateRFC7540 // open | closed | idle
 100  	bytes        int64                    // number of bytes written by this node, or 0 if closed
 101  	subtreeBytes int64                    // sum(node.bytes) of all nodes in this subtree
 102  
 103  	// These links form the priority tree.
 104  	parent     *priorityNodeRFC7540
 105  	kids       *priorityNodeRFC7540 // start of the kids list
 106  	prev, next *priorityNodeRFC7540 // doubly-linked list of siblings
 107  }
 108  
 109  func (n *priorityNodeRFC7540) setParent(parent *priorityNodeRFC7540) {
 110  	if n == parent {
 111  		panic("setParent to self")
 112  	}
 113  	if n.parent == parent {
 114  		return
 115  	}
 116  	// Unlink from current parent.
 117  	if parent := n.parent; parent != nil {
 118  		if n.prev == nil {
 119  			parent.kids = n.next
 120  		} else {
 121  			n.prev.next = n.next
 122  		}
 123  		if n.next != nil {
 124  			n.next.prev = n.prev
 125  		}
 126  	}
 127  	// Link to new parent.
 128  	// If parent=nil, remove n from the tree.
 129  	// Always insert at the head of parent.kids (this is assumed by walkReadyInOrder).
 130  	n.parent = parent
 131  	if parent == nil {
 132  		n.next = nil
 133  		n.prev = nil
 134  	} else {
 135  		n.next = parent.kids
 136  		n.prev = nil
 137  		if n.next != nil {
 138  			n.next.prev = n
 139  		}
 140  		parent.kids = n
 141  	}
 142  }
 143  
 144  func (n *priorityNodeRFC7540) addBytes(b int64) {
 145  	n.bytes += b
 146  	for ; n != nil; n = n.parent {
 147  		n.subtreeBytes += b
 148  	}
 149  }
 150  
 151  // walkReadyInOrder iterates over the tree in priority order, calling f for each node
 152  // with a non-empty write queue. When f returns true, this function returns true and the
 153  // walk halts. tmp is used as scratch space for sorting.
 154  //
 155  // f(n, openParent) takes two arguments: the node to visit, n, and a bool that is true
 156  // if any ancestor p of n is still open (ignoring the root node).
 157  func (n *priorityNodeRFC7540) walkReadyInOrder(openParent bool, tmp *[]*priorityNodeRFC7540, f func(*priorityNodeRFC7540, bool) bool) bool {
 158  	if !n.q.empty() && f(n, openParent) {
 159  		return true
 160  	}
 161  	if n.kids == nil {
 162  		return false
 163  	}
 164  
 165  	// Don't consider the root "open" when updating openParent since
 166  	// we can't send data frames on the root stream (only control frames).
 167  	if n.id != 0 {
 168  		openParent = openParent || (n.state == priorityNodeOpenRFC7540)
 169  	}
 170  
 171  	// Common case: only one kid or all kids have the same weight.
 172  	// Some clients don't use weights; other clients (like web browsers)
 173  	// use mostly-linear priority trees.
 174  	w := n.kids.weight
 175  	needSort := false
 176  	for k := n.kids.next; k != nil; k = k.next {
 177  		if k.weight != w {
 178  			needSort = true
 179  			break
 180  		}
 181  	}
 182  	if !needSort {
 183  		for k := n.kids; k != nil; k = k.next {
 184  			if k.walkReadyInOrder(openParent, tmp, f) {
 185  				return true
 186  			}
 187  		}
 188  		return false
 189  	}
 190  
 191  	// Uncommon case: sort the child nodes. We remove the kids from the parent,
 192  	// then re-insert after sorting so we can reuse tmp for future sort calls.
 193  	*tmp = (*tmp)[:0]
 194  	for n.kids != nil {
 195  		*tmp = append(*tmp, n.kids)
 196  		n.kids.setParent(nil)
 197  	}
 198  	sort.Sort(sortPriorityNodeSiblingsRFC7540(*tmp))
 199  	for i := len(*tmp) - 1; i >= 0; i-- {
 200  		(*tmp)[i].setParent(n) // setParent inserts at the head of n.kids
 201  	}
 202  	for k := n.kids; k != nil; k = k.next {
 203  		if k.walkReadyInOrder(openParent, tmp, f) {
 204  			return true
 205  		}
 206  	}
 207  	return false
 208  }
 209  
 210  type sortPriorityNodeSiblingsRFC7540 []*priorityNodeRFC7540
 211  
 212  func (z sortPriorityNodeSiblingsRFC7540) Len() int      { return len(z) }
 213  func (z sortPriorityNodeSiblingsRFC7540) Swap(i, k int) { z[i], z[k] = z[k], z[i] }
 214  func (z sortPriorityNodeSiblingsRFC7540) Less(i, k int) bool {
 215  	// Prefer the subtree that has sent fewer bytes relative to its weight.
 216  	// See sections 5.3.2 and 5.3.4.
 217  	wi, bi := float64(z[i].weight)+1, float64(z[i].subtreeBytes)
 218  	wk, bk := float64(z[k].weight)+1, float64(z[k].subtreeBytes)
 219  	if bi == 0 && bk == 0 {
 220  		return wi >= wk
 221  	}
 222  	if bk == 0 {
 223  		return false
 224  	}
 225  	return bi/bk <= wi/wk
 226  }
 227  
 228  type priorityWriteSchedulerRFC7540 struct {
 229  	// root is the root of the priority tree, where root.id = 0.
 230  	// The root queues control frames that are not associated with any stream.
 231  	root priorityNodeRFC7540
 232  
 233  	// nodes maps stream ids to priority tree nodes.
 234  	nodes map[uint32]*priorityNodeRFC7540
 235  
 236  	// maxID is the maximum stream id in nodes.
 237  	maxID uint32
 238  
 239  	// lists of nodes that have been closed or are idle, but are kept in
 240  	// the tree for improved prioritization. When the lengths exceed either
 241  	// maxClosedNodesInTree or maxIdleNodesInTree, old nodes are discarded.
 242  	closedNodes, idleNodes []*priorityNodeRFC7540
 243  
 244  	// From the config.
 245  	maxClosedNodesInTree int
 246  	maxIdleNodesInTree   int
 247  	writeThrottleLimit   int32
 248  	enableWriteThrottle  bool
 249  
 250  	// tmp is scratch space for priorityNode.walkReadyInOrder to reduce allocations.
 251  	tmp []*priorityNodeRFC7540
 252  
 253  	// pool of empty queues for reuse.
 254  	queuePool writeQueuePool
 255  }
 256  
 257  func (ws *priorityWriteSchedulerRFC7540) OpenStream(streamID uint32, options OpenStreamOptions) {
 258  	// The stream may be currently idle but cannot be opened or closed.
 259  	if curr := ws.nodes[streamID]; curr != nil {
 260  		if curr.state != priorityNodeIdleRFC7540 {
 261  			panic(fmt.Sprintf("stream %d already opened", streamID))
 262  		}
 263  		curr.state = priorityNodeOpenRFC7540
 264  		return
 265  	}
 266  
 267  	// RFC 7540, Section 5.3.5:
 268  	//  "All streams are initially assigned a non-exclusive dependency on stream 0x0.
 269  	//  Pushed streams initially depend on their associated stream. In both cases,
 270  	//  streams are assigned a default weight of 16."
 271  	parent := ws.nodes[options.PusherID]
 272  	if parent == nil {
 273  		parent = &ws.root
 274  	}
 275  	n := &priorityNodeRFC7540{
 276  		q:      *ws.queuePool.get(),
 277  		id:     streamID,
 278  		weight: priorityDefaultWeightRFC7540,
 279  		state:  priorityNodeOpenRFC7540,
 280  	}
 281  	n.setParent(parent)
 282  	ws.nodes[streamID] = n
 283  	if streamID > ws.maxID {
 284  		ws.maxID = streamID
 285  	}
 286  }
 287  
 288  func (ws *priorityWriteSchedulerRFC7540) CloseStream(streamID uint32) {
 289  	if streamID == 0 {
 290  		panic("violation of WriteScheduler interface: cannot close stream 0")
 291  	}
 292  	if ws.nodes[streamID] == nil {
 293  		panic(fmt.Sprintf("violation of WriteScheduler interface: unknown stream %d", streamID))
 294  	}
 295  	if ws.nodes[streamID].state != priorityNodeOpenRFC7540 {
 296  		panic(fmt.Sprintf("violation of WriteScheduler interface: stream %d already closed", streamID))
 297  	}
 298  
 299  	n := ws.nodes[streamID]
 300  	n.state = priorityNodeClosedRFC7540
 301  	n.addBytes(-n.bytes)
 302  
 303  	q := n.q
 304  	ws.queuePool.put(&q)
 305  	if ws.maxClosedNodesInTree > 0 {
 306  		ws.addClosedOrIdleNode(&ws.closedNodes, ws.maxClosedNodesInTree, n)
 307  	} else {
 308  		ws.removeNode(n)
 309  	}
 310  }
 311  
 312  func (ws *priorityWriteSchedulerRFC7540) AdjustStream(streamID uint32, priority PriorityParam) {
 313  	if streamID == 0 {
 314  		panic("adjustPriority on root")
 315  	}
 316  
 317  	// If streamID does not exist, there are two cases:
 318  	// - A closed stream that has been removed (this will have ID <= maxID)
 319  	// - An idle stream that is being used for "grouping" (this will have ID > maxID)
 320  	n := ws.nodes[streamID]
 321  	if n == nil {
 322  		if streamID <= ws.maxID || ws.maxIdleNodesInTree == 0 {
 323  			return
 324  		}
 325  		ws.maxID = streamID
 326  		n = &priorityNodeRFC7540{
 327  			q:      *ws.queuePool.get(),
 328  			id:     streamID,
 329  			weight: priorityDefaultWeightRFC7540,
 330  			state:  priorityNodeIdleRFC7540,
 331  		}
 332  		n.setParent(&ws.root)
 333  		ws.nodes[streamID] = n
 334  		ws.addClosedOrIdleNode(&ws.idleNodes, ws.maxIdleNodesInTree, n)
 335  	}
 336  
 337  	// Section 5.3.1: A dependency on a stream that is not currently in the tree
 338  	// results in that stream being given a default priority (Section 5.3.5).
 339  	parent := ws.nodes[priority.StreamDep]
 340  	if parent == nil {
 341  		n.setParent(&ws.root)
 342  		n.weight = priorityDefaultWeightRFC7540
 343  		return
 344  	}
 345  
 346  	// Ignore if the client tries to make a node its own parent.
 347  	if n == parent {
 348  		return
 349  	}
 350  
 351  	// Section 5.3.3:
 352  	//   "If a stream is made dependent on one of its own dependencies, the
 353  	//   formerly dependent stream is first moved to be dependent on the
 354  	//   reprioritized stream's previous parent. The moved dependency retains
 355  	//   its weight."
 356  	//
 357  	// That is: if parent depends on n, move parent to depend on n.parent.
 358  	for x := parent.parent; x != nil; x = x.parent {
 359  		if x == n {
 360  			parent.setParent(n.parent)
 361  			break
 362  		}
 363  	}
 364  
 365  	// Section 5.3.3: The exclusive flag causes the stream to become the sole
 366  	// dependency of its parent stream, causing other dependencies to become
 367  	// dependent on the exclusive stream.
 368  	if priority.Exclusive {
 369  		k := parent.kids
 370  		for k != nil {
 371  			next := k.next
 372  			if k != n {
 373  				k.setParent(n)
 374  			}
 375  			k = next
 376  		}
 377  	}
 378  
 379  	n.setParent(parent)
 380  	n.weight = priority.Weight
 381  }
 382  
 383  func (ws *priorityWriteSchedulerRFC7540) Push(wr FrameWriteRequest) {
 384  	var n *priorityNodeRFC7540
 385  	if wr.isControl() {
 386  		n = &ws.root
 387  	} else {
 388  		id := wr.StreamID()
 389  		n = ws.nodes[id]
 390  		if n == nil {
 391  			// id is an idle or closed stream. wr should not be a HEADERS or
 392  			// DATA frame. In other case, we push wr onto the root, rather
 393  			// than creating a new priorityNode.
 394  			if wr.DataSize() > 0 {
 395  				panic("add DATA on non-open stream")
 396  			}
 397  			n = &ws.root
 398  		}
 399  	}
 400  	n.q.push(wr)
 401  }
 402  
 403  func (ws *priorityWriteSchedulerRFC7540) Pop() (wr FrameWriteRequest, ok bool) {
 404  	ws.root.walkReadyInOrder(false, &ws.tmp, func(n *priorityNodeRFC7540, openParent bool) bool {
 405  		limit := int32(math.MaxInt32)
 406  		if openParent {
 407  			limit = ws.writeThrottleLimit
 408  		}
 409  		wr, ok = n.q.consume(limit)
 410  		if !ok {
 411  			return false
 412  		}
 413  		n.addBytes(int64(wr.DataSize()))
 414  		// If B depends on A and B continuously has data available but A
 415  		// does not, gradually increase the throttling limit to allow B to
 416  		// steal more and more bandwidth from A.
 417  		if openParent {
 418  			ws.writeThrottleLimit += 1024
 419  			if ws.writeThrottleLimit < 0 {
 420  				ws.writeThrottleLimit = math.MaxInt32
 421  			}
 422  		} else if ws.enableWriteThrottle {
 423  			ws.writeThrottleLimit = 1024
 424  		}
 425  		return true
 426  	})
 427  	return wr, ok
 428  }
 429  
 430  func (ws *priorityWriteSchedulerRFC7540) addClosedOrIdleNode(list *[]*priorityNodeRFC7540, maxSize int, n *priorityNodeRFC7540) {
 431  	if maxSize == 0 {
 432  		return
 433  	}
 434  	if len(*list) == maxSize {
 435  		// Remove the oldest node, then shift left.
 436  		ws.removeNode((*list)[0])
 437  		x := (*list)[1:]
 438  		copy(*list, x)
 439  		*list = (*list)[:len(x)]
 440  	}
 441  	*list = append(*list, n)
 442  }
 443  
 444  func (ws *priorityWriteSchedulerRFC7540) removeNode(n *priorityNodeRFC7540) {
 445  	for n.kids != nil {
 446  		n.kids.setParent(n.parent)
 447  	}
 448  	n.setParent(nil)
 449  	delete(ws.nodes, n.id)
 450  }
 451