1 // Copyright 2025 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4 5 package http2
6 7 import (
8 "fmt"
9 "math"
10 )
11 12 type streamMetadata struct {
13 location *writeQueue
14 priority PriorityParam
15 }
16 17 type priorityWriteSchedulerRFC9218 struct {
18 // control contains control frames (SETTINGS, PING, etc.).
19 control writeQueue
20 21 // heads contain the head of a circular list of streams.
22 // We put these heads within a nested array that represents urgency and
23 // incremental, as defined in
24 // https://www.rfc-editor.org/rfc/rfc9218.html#name-priority-parameters.
25 // 8 represents u=0 up to u=7, and 2 represents i=false and i=true.
26 heads [8][2]*writeQueue
27 28 // streams contains a mapping between each stream ID and their metadata, so
29 // we can quickly locate them when needing to, for example, adjust their
30 // priority.
31 streams map[uint32]streamMetadata
32 33 // queuePool are empty queues for reuse.
34 queuePool writeQueuePool
35 36 // prioritizeIncremental is used to determine whether we should prioritize
37 // incremental streams or not, when urgency is the same in a given Pop()
38 // call.
39 prioritizeIncremental bool
40 41 // priorityUpdateBuf is used to buffer the most recent PRIORITY_UPDATE we
42 // receive per https://www.rfc-editor.org/rfc/rfc9218.html#name-the-priority_update-frame.
43 priorityUpdateBuf struct {
44 // streamID being 0 means that the buffer is empty. This is a safe
45 // assumption as PRIORITY_UPDATE for stream 0 is a PROTOCOL_ERROR.
46 streamID uint32
47 priority PriorityParam
48 }
49 }
50 51 func newPriorityWriteSchedulerRFC9218() WriteScheduler {
52 ws := &priorityWriteSchedulerRFC9218{
53 streams: make(map[uint32]streamMetadata),
54 }
55 return ws
56 }
57 58 func (ws *priorityWriteSchedulerRFC9218) OpenStream(streamID uint32, opt OpenStreamOptions) {
59 if ws.streams[streamID].location != nil {
60 panic(fmt.Errorf("stream %d already opened", streamID))
61 }
62 if streamID == ws.priorityUpdateBuf.streamID {
63 ws.priorityUpdateBuf.streamID = 0
64 opt.priority = ws.priorityUpdateBuf.priority
65 }
66 q := ws.queuePool.get()
67 ws.streams[streamID] = streamMetadata{
68 location: q,
69 priority: opt.priority,
70 }
71 72 u, i := opt.priority.urgency, opt.priority.incremental
73 if ws.heads[u][i] == nil {
74 ws.heads[u][i] = q
75 q.next = q
76 q.prev = q
77 } else {
78 // Queues are stored in a ring.
79 // Insert the new stream before ws.head, putting it at the end of the list.
80 q.prev = ws.heads[u][i].prev
81 q.next = ws.heads[u][i]
82 q.prev.next = q
83 q.next.prev = q
84 }
85 }
86 87 func (ws *priorityWriteSchedulerRFC9218) CloseStream(streamID uint32) {
88 metadata := ws.streams[streamID]
89 q, u, i := metadata.location, metadata.priority.urgency, metadata.priority.incremental
90 if q == nil {
91 return
92 }
93 if q.next == q {
94 // This was the only open stream.
95 ws.heads[u][i] = nil
96 } else {
97 q.prev.next = q.next
98 q.next.prev = q.prev
99 if ws.heads[u][i] == q {
100 ws.heads[u][i] = q.next
101 }
102 }
103 delete(ws.streams, streamID)
104 ws.queuePool.put(q)
105 }
106 107 func (ws *priorityWriteSchedulerRFC9218) AdjustStream(streamID uint32, priority PriorityParam) {
108 metadata := ws.streams[streamID]
109 q, u, i := metadata.location, metadata.priority.urgency, metadata.priority.incremental
110 if q == nil {
111 ws.priorityUpdateBuf.streamID = streamID
112 ws.priorityUpdateBuf.priority = priority
113 return
114 }
115 116 // Remove stream from current location.
117 if q.next == q {
118 // This was the only open stream.
119 ws.heads[u][i] = nil
120 } else {
121 q.prev.next = q.next
122 q.next.prev = q.prev
123 if ws.heads[u][i] == q {
124 ws.heads[u][i] = q.next
125 }
126 }
127 128 // Insert stream to the new queue.
129 u, i = priority.urgency, priority.incremental
130 if ws.heads[u][i] == nil {
131 ws.heads[u][i] = q
132 q.next = q
133 q.prev = q
134 } else {
135 // Queues are stored in a ring.
136 // Insert the new stream before ws.head, putting it at the end of the list.
137 q.prev = ws.heads[u][i].prev
138 q.next = ws.heads[u][i]
139 q.prev.next = q
140 q.next.prev = q
141 }
142 143 // Update the metadata.
144 ws.streams[streamID] = streamMetadata{
145 location: q,
146 priority: priority,
147 }
148 }
149 150 func (ws *priorityWriteSchedulerRFC9218) Push(wr FrameWriteRequest) {
151 if wr.isControl() {
152 ws.control.push(wr)
153 return
154 }
155 q := ws.streams[wr.StreamID()].location
156 if q == nil {
157 // This is a closed stream.
158 // wr should not be a HEADERS or DATA frame.
159 // We push the request onto the control queue.
160 if wr.DataSize() > 0 {
161 panic("add DATA on non-open stream")
162 }
163 ws.control.push(wr)
164 return
165 }
166 q.push(wr)
167 }
168 169 func (ws *priorityWriteSchedulerRFC9218) Pop() (FrameWriteRequest, bool) {
170 // Control and RST_STREAM frames first.
171 if !ws.control.empty() {
172 return ws.control.shift(), true
173 }
174 175 // On the next Pop(), we want to prioritize incremental if we prioritized
176 // non-incremental request of the same urgency this time. Vice-versa.
177 // i.e. when there are incremental and non-incremental requests at the same
178 // priority, we give 50% of our bandwidth to the incremental ones in
179 // aggregate and 50% to the first non-incremental one (since
180 // non-incremental streams do not use round-robin writes).
181 ws.prioritizeIncremental = !ws.prioritizeIncremental
182 183 // Always prioritize lowest u (i.e. highest urgency level).
184 for u := range ws.heads {
185 for i := range ws.heads[u] {
186 // When we want to prioritize incremental, we try to pop i=true
187 // first before i=false when u is the same.
188 if ws.prioritizeIncremental {
189 i = (i + 1) % 2
190 }
191 q := ws.heads[u][i]
192 if q == nil {
193 continue
194 }
195 for {
196 if wr, ok := q.consume(math.MaxInt32); ok {
197 if i == 1 {
198 // For incremental streams, we update head to q.next so
199 // we can round-robin between multiple streams that can
200 // immediately benefit from partial writes.
201 ws.heads[u][i] = q.next
202 } else {
203 // For non-incremental streams, we try to finish one to
204 // completion rather than doing round-robin. However,
205 // we update head here so that if q.consume() is !ok
206 // (e.g. the stream has no more frame to consume), head
207 // is updated to the next q that has frames to consume
208 // on future iterations. This way, we do not prioritize
209 // writing to unavailable stream on next Pop() calls,
210 // preventing head-of-line blocking.
211 ws.heads[u][i] = q
212 }
213 return wr, true
214 }
215 q = q.next
216 if q == ws.heads[u][i] {
217 break
218 }
219 }
220 221 }
222 }
223 return FrameWriteRequest{}, false
224 }
225