writesched_priority_rfc9218.go raw

   1  // Copyright 2025 The Go Authors. All rights reserved.
   2  // Use of this source code is governed by a BSD-style
   3  // license that can be found in the LICENSE file.
   4  
   5  package http2
   6  
   7  import (
   8  	"fmt"
   9  	"math"
  10  )
  11  
  12  type streamMetadata struct {
  13  	location *writeQueue
  14  	priority PriorityParam
  15  }
  16  
  17  type priorityWriteSchedulerRFC9218 struct {
  18  	// control contains control frames (SETTINGS, PING, etc.).
  19  	control writeQueue
  20  
  21  	// heads contain the head of a circular list of streams.
  22  	// We put these heads within a nested array that represents urgency and
  23  	// incremental, as defined in
  24  	// https://www.rfc-editor.org/rfc/rfc9218.html#name-priority-parameters.
  25  	// 8 represents u=0 up to u=7, and 2 represents i=false and i=true.
  26  	heads [8][2]*writeQueue
  27  
  28  	// streams contains a mapping between each stream ID and their metadata, so
  29  	// we can quickly locate them when needing to, for example, adjust their
  30  	// priority.
  31  	streams map[uint32]streamMetadata
  32  
  33  	// queuePool are empty queues for reuse.
  34  	queuePool writeQueuePool
  35  
  36  	// prioritizeIncremental is used to determine whether we should prioritize
  37  	// incremental streams or not, when urgency is the same in a given Pop()
  38  	// call.
  39  	prioritizeIncremental bool
  40  
  41  	// priorityUpdateBuf is used to buffer the most recent PRIORITY_UPDATE we
  42  	// receive per https://www.rfc-editor.org/rfc/rfc9218.html#name-the-priority_update-frame.
  43  	priorityUpdateBuf struct {
  44  		// streamID being 0 means that the buffer is empty. This is a safe
  45  		// assumption as PRIORITY_UPDATE for stream 0 is a PROTOCOL_ERROR.
  46  		streamID uint32
  47  		priority PriorityParam
  48  	}
  49  }
  50  
  51  func newPriorityWriteSchedulerRFC9218() WriteScheduler {
  52  	ws := &priorityWriteSchedulerRFC9218{
  53  		streams: make(map[uint32]streamMetadata),
  54  	}
  55  	return ws
  56  }
  57  
  58  func (ws *priorityWriteSchedulerRFC9218) OpenStream(streamID uint32, opt OpenStreamOptions) {
  59  	if ws.streams[streamID].location != nil {
  60  		panic(fmt.Errorf("stream %d already opened", streamID))
  61  	}
  62  	if streamID == ws.priorityUpdateBuf.streamID {
  63  		ws.priorityUpdateBuf.streamID = 0
  64  		opt.priority = ws.priorityUpdateBuf.priority
  65  	}
  66  	q := ws.queuePool.get()
  67  	ws.streams[streamID] = streamMetadata{
  68  		location: q,
  69  		priority: opt.priority,
  70  	}
  71  
  72  	u, i := opt.priority.urgency, opt.priority.incremental
  73  	if ws.heads[u][i] == nil {
  74  		ws.heads[u][i] = q
  75  		q.next = q
  76  		q.prev = q
  77  	} else {
  78  		// Queues are stored in a ring.
  79  		// Insert the new stream before ws.head, putting it at the end of the list.
  80  		q.prev = ws.heads[u][i].prev
  81  		q.next = ws.heads[u][i]
  82  		q.prev.next = q
  83  		q.next.prev = q
  84  	}
  85  }
  86  
  87  func (ws *priorityWriteSchedulerRFC9218) CloseStream(streamID uint32) {
  88  	metadata := ws.streams[streamID]
  89  	q, u, i := metadata.location, metadata.priority.urgency, metadata.priority.incremental
  90  	if q == nil {
  91  		return
  92  	}
  93  	if q.next == q {
  94  		// This was the only open stream.
  95  		ws.heads[u][i] = nil
  96  	} else {
  97  		q.prev.next = q.next
  98  		q.next.prev = q.prev
  99  		if ws.heads[u][i] == q {
 100  			ws.heads[u][i] = q.next
 101  		}
 102  	}
 103  	delete(ws.streams, streamID)
 104  	ws.queuePool.put(q)
 105  }
 106  
 107  func (ws *priorityWriteSchedulerRFC9218) AdjustStream(streamID uint32, priority PriorityParam) {
 108  	metadata := ws.streams[streamID]
 109  	q, u, i := metadata.location, metadata.priority.urgency, metadata.priority.incremental
 110  	if q == nil {
 111  		ws.priorityUpdateBuf.streamID = streamID
 112  		ws.priorityUpdateBuf.priority = priority
 113  		return
 114  	}
 115  
 116  	// Remove stream from current location.
 117  	if q.next == q {
 118  		// This was the only open stream.
 119  		ws.heads[u][i] = nil
 120  	} else {
 121  		q.prev.next = q.next
 122  		q.next.prev = q.prev
 123  		if ws.heads[u][i] == q {
 124  			ws.heads[u][i] = q.next
 125  		}
 126  	}
 127  
 128  	// Insert stream to the new queue.
 129  	u, i = priority.urgency, priority.incremental
 130  	if ws.heads[u][i] == nil {
 131  		ws.heads[u][i] = q
 132  		q.next = q
 133  		q.prev = q
 134  	} else {
 135  		// Queues are stored in a ring.
 136  		// Insert the new stream before ws.head, putting it at the end of the list.
 137  		q.prev = ws.heads[u][i].prev
 138  		q.next = ws.heads[u][i]
 139  		q.prev.next = q
 140  		q.next.prev = q
 141  	}
 142  
 143  	// Update the metadata.
 144  	ws.streams[streamID] = streamMetadata{
 145  		location: q,
 146  		priority: priority,
 147  	}
 148  }
 149  
 150  func (ws *priorityWriteSchedulerRFC9218) Push(wr FrameWriteRequest) {
 151  	if wr.isControl() {
 152  		ws.control.push(wr)
 153  		return
 154  	}
 155  	q := ws.streams[wr.StreamID()].location
 156  	if q == nil {
 157  		// This is a closed stream.
 158  		// wr should not be a HEADERS or DATA frame.
 159  		// We push the request onto the control queue.
 160  		if wr.DataSize() > 0 {
 161  			panic("add DATA on non-open stream")
 162  		}
 163  		ws.control.push(wr)
 164  		return
 165  	}
 166  	q.push(wr)
 167  }
 168  
 169  func (ws *priorityWriteSchedulerRFC9218) Pop() (FrameWriteRequest, bool) {
 170  	// Control and RST_STREAM frames first.
 171  	if !ws.control.empty() {
 172  		return ws.control.shift(), true
 173  	}
 174  
 175  	// On the next Pop(), we want to prioritize incremental if we prioritized
 176  	// non-incremental request of the same urgency this time. Vice-versa.
 177  	// i.e. when there are incremental and non-incremental requests at the same
 178  	// priority, we give 50% of our bandwidth to the incremental ones in
 179  	// aggregate and 50% to the first non-incremental one (since
 180  	// non-incremental streams do not use round-robin writes).
 181  	ws.prioritizeIncremental = !ws.prioritizeIncremental
 182  
 183  	// Always prioritize lowest u (i.e. highest urgency level).
 184  	for u := range ws.heads {
 185  		for i := range ws.heads[u] {
 186  			// When we want to prioritize incremental, we try to pop i=true
 187  			// first before i=false when u is the same.
 188  			if ws.prioritizeIncremental {
 189  				i = (i + 1) % 2
 190  			}
 191  			q := ws.heads[u][i]
 192  			if q == nil {
 193  				continue
 194  			}
 195  			for {
 196  				if wr, ok := q.consume(math.MaxInt32); ok {
 197  					if i == 1 {
 198  						// For incremental streams, we update head to q.next so
 199  						// we can round-robin between multiple streams that can
 200  						// immediately benefit from partial writes.
 201  						ws.heads[u][i] = q.next
 202  					} else {
 203  						// For non-incremental streams, we try to finish one to
 204  						// completion rather than doing round-robin. However,
 205  						// we update head here so that if q.consume() is !ok
 206  						// (e.g. the stream has no more frame to consume), head
 207  						// is updated to the next q that has frames to consume
 208  						// on future iterations. This way, we do not prioritize
 209  						// writing to unavailable stream on next Pop() calls,
 210  						// preventing head-of-line blocking.
 211  						ws.heads[u][i] = q
 212  					}
 213  					return wr, true
 214  				}
 215  				q = q.next
 216  				if q == ws.heads[u][i] {
 217  					break
 218  				}
 219  			}
 220  
 221  		}
 222  	}
 223  	return FrameWriteRequest{}, false
 224  }
 225