writesched_random.go raw

   1  // Copyright 2014 The Go Authors. All rights reserved.
   2  // Use of this source code is governed by a BSD-style
   3  // license that can be found in the LICENSE file.
   4  
   5  package http2
   6  
   7  import "math"
   8  
   9  // NewRandomWriteScheduler constructs a WriteScheduler that ignores HTTP/2
  10  // priorities. Control frames like SETTINGS and PING are written before DATA
  11  // frames, but if no control frames are queued and multiple streams have queued
  12  // HEADERS or DATA frames, Pop selects a ready stream arbitrarily.
  13  func NewRandomWriteScheduler() WriteScheduler {
  14  	return &randomWriteScheduler{sq: make(map[uint32]*writeQueue)}
  15  }
  16  
  17  type randomWriteScheduler struct {
  18  	// zero are frames not associated with a specific stream.
  19  	zero writeQueue
  20  
  21  	// sq contains the stream-specific queues, keyed by stream ID.
  22  	// When a stream is idle, closed, or emptied, it's deleted
  23  	// from the map.
  24  	sq map[uint32]*writeQueue
  25  
  26  	// pool of empty queues for reuse.
  27  	queuePool writeQueuePool
  28  }
  29  
  30  func (ws *randomWriteScheduler) OpenStream(streamID uint32, options OpenStreamOptions) {
  31  	// no-op: idle streams are not tracked
  32  }
  33  
  34  func (ws *randomWriteScheduler) CloseStream(streamID uint32) {
  35  	q, ok := ws.sq[streamID]
  36  	if !ok {
  37  		return
  38  	}
  39  	delete(ws.sq, streamID)
  40  	ws.queuePool.put(q)
  41  }
  42  
  43  func (ws *randomWriteScheduler) AdjustStream(streamID uint32, priority PriorityParam) {
  44  	// no-op: priorities are ignored
  45  }
  46  
  47  func (ws *randomWriteScheduler) Push(wr FrameWriteRequest) {
  48  	if wr.isControl() {
  49  		ws.zero.push(wr)
  50  		return
  51  	}
  52  	id := wr.StreamID()
  53  	q, ok := ws.sq[id]
  54  	if !ok {
  55  		q = ws.queuePool.get()
  56  		ws.sq[id] = q
  57  	}
  58  	q.push(wr)
  59  }
  60  
  61  func (ws *randomWriteScheduler) Pop() (FrameWriteRequest, bool) {
  62  	// Control and RST_STREAM frames first.
  63  	if !ws.zero.empty() {
  64  		return ws.zero.shift(), true
  65  	}
  66  	// Iterate over all non-idle streams until finding one that can be consumed.
  67  	for streamID, q := range ws.sq {
  68  		if wr, ok := q.consume(math.MaxInt32); ok {
  69  			if q.empty() {
  70  				delete(ws.sq, streamID)
  71  				ws.queuePool.put(q)
  72  			}
  73  			return wr, true
  74  		}
  75  	}
  76  	return FrameWriteRequest{}, false
  77  }
  78