writesched_roundrobin.go raw

   1  // Copyright 2023 The Go Authors. All rights reserved.
   2  // Use of this source code is governed by a BSD-style
   3  // license that can be found in the LICENSE file.
   4  
   5  package http2
   6  
   7  import (
   8  	"fmt"
   9  	"math"
  10  )
  11  
  12  type roundRobinWriteScheduler struct {
  13  	// control contains control frames (SETTINGS, PING, etc.).
  14  	control writeQueue
  15  
  16  	// streams maps stream ID to a queue.
  17  	streams map[uint32]*writeQueue
  18  
  19  	// stream queues are stored in a circular linked list.
  20  	// head is the next stream to write, or nil if there are no streams open.
  21  	head *writeQueue
  22  
  23  	// pool of empty queues for reuse.
  24  	queuePool writeQueuePool
  25  }
  26  
  27  // newRoundRobinWriteScheduler constructs a new write scheduler.
  28  // The round robin scheduler prioritizes control frames
  29  // like SETTINGS and PING over DATA frames.
  30  // When there are no control frames to send, it performs a round-robin
  31  // selection from the ready streams.
  32  func newRoundRobinWriteScheduler() WriteScheduler {
  33  	ws := &roundRobinWriteScheduler{
  34  		streams: make(map[uint32]*writeQueue),
  35  	}
  36  	return ws
  37  }
  38  
  39  func (ws *roundRobinWriteScheduler) OpenStream(streamID uint32, options OpenStreamOptions) {
  40  	if ws.streams[streamID] != nil {
  41  		panic(fmt.Errorf("stream %d already opened", streamID))
  42  	}
  43  	q := ws.queuePool.get()
  44  	ws.streams[streamID] = q
  45  	if ws.head == nil {
  46  		ws.head = q
  47  		q.next = q
  48  		q.prev = q
  49  	} else {
  50  		// Queues are stored in a ring.
  51  		// Insert the new stream before ws.head, putting it at the end of the list.
  52  		q.prev = ws.head.prev
  53  		q.next = ws.head
  54  		q.prev.next = q
  55  		q.next.prev = q
  56  	}
  57  }
  58  
  59  func (ws *roundRobinWriteScheduler) CloseStream(streamID uint32) {
  60  	q := ws.streams[streamID]
  61  	if q == nil {
  62  		return
  63  	}
  64  	if q.next == q {
  65  		// This was the only open stream.
  66  		ws.head = nil
  67  	} else {
  68  		q.prev.next = q.next
  69  		q.next.prev = q.prev
  70  		if ws.head == q {
  71  			ws.head = q.next
  72  		}
  73  	}
  74  	delete(ws.streams, streamID)
  75  	ws.queuePool.put(q)
  76  }
  77  
  78  func (ws *roundRobinWriteScheduler) AdjustStream(streamID uint32, priority PriorityParam) {}
  79  
  80  func (ws *roundRobinWriteScheduler) Push(wr FrameWriteRequest) {
  81  	if wr.isControl() {
  82  		ws.control.push(wr)
  83  		return
  84  	}
  85  	q := ws.streams[wr.StreamID()]
  86  	if q == nil {
  87  		// This is a closed stream.
  88  		// wr should not be a HEADERS or DATA frame.
  89  		// We push the request onto the control queue.
  90  		if wr.DataSize() > 0 {
  91  			panic("add DATA on non-open stream")
  92  		}
  93  		ws.control.push(wr)
  94  		return
  95  	}
  96  	q.push(wr)
  97  }
  98  
  99  func (ws *roundRobinWriteScheduler) Pop() (FrameWriteRequest, bool) {
 100  	// Control and RST_STREAM frames first.
 101  	if !ws.control.empty() {
 102  		return ws.control.shift(), true
 103  	}
 104  	if ws.head == nil {
 105  		return FrameWriteRequest{}, false
 106  	}
 107  	q := ws.head
 108  	for {
 109  		if wr, ok := q.consume(math.MaxInt32); ok {
 110  			ws.head = q.next
 111  			return wr, true
 112  		}
 113  		q = q.next
 114  		if q == ws.head {
 115  			break
 116  		}
 117  	}
 118  	return FrameWriteRequest{}, false
 119  }
 120