channel.go raw

   1  /*
   2   *
   3   * Copyright 2024 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  package channelz
  20  
  21  import (
  22  	"fmt"
  23  	"sync/atomic"
  24  
  25  	"google.golang.org/grpc/connectivity"
  26  )
  27  
  28  // Channel represents a channel within channelz, which includes metrics and
  29  // internal channelz data, such as channelz id, child list, etc.
  30  type Channel struct {
  31  	Entity
  32  	// ID is the channelz id of this channel.
  33  	ID int64
  34  	// RefName is the human readable reference string of this channel.
  35  	RefName string
  36  
  37  	closeCalled bool
  38  	nestedChans map[int64]string
  39  	subChans    map[int64]string
  40  	Parent      *Channel
  41  	trace       *ChannelTrace
  42  	// traceRefCount is the number of trace events that reference this channel.
  43  	// Non-zero traceRefCount means the trace of this channel cannot be deleted.
  44  	traceRefCount int32
  45  
  46  	// ChannelMetrics holds connectivity state, target and call metrics for the
  47  	// channel within channelz.
  48  	ChannelMetrics ChannelMetrics
  49  }
  50  
  51  // Implemented to make Channel implement the Identifier interface used for
  52  // nesting.
  53  func (c *Channel) channelzIdentifier() {}
  54  
  55  // String returns a string representation of the Channel, including its parent
  56  // entity and ID.
  57  func (c *Channel) String() string {
  58  	if c.Parent == nil {
  59  		return fmt.Sprintf("Channel #%d", c.ID)
  60  	}
  61  	return fmt.Sprintf("%s Channel #%d", c.Parent, c.ID)
  62  }
  63  
  64  func (c *Channel) id() int64 {
  65  	return c.ID
  66  }
  67  
  68  // SubChans returns a copy of the map of sub-channels associated with the
  69  // Channel.
  70  func (c *Channel) SubChans() map[int64]string {
  71  	db.mu.RLock()
  72  	defer db.mu.RUnlock()
  73  	return copyMap(c.subChans)
  74  }
  75  
  76  // NestedChans returns a copy of the map of nested channels associated with the
  77  // Channel.
  78  func (c *Channel) NestedChans() map[int64]string {
  79  	db.mu.RLock()
  80  	defer db.mu.RUnlock()
  81  	return copyMap(c.nestedChans)
  82  }
  83  
  84  // Trace returns a copy of the Channel's trace data.
  85  func (c *Channel) Trace() *ChannelTrace {
  86  	db.mu.RLock()
  87  	defer db.mu.RUnlock()
  88  	return c.trace.copy()
  89  }
  90  
  91  // ChannelMetrics holds connectivity state, target and call metrics for the
  92  // channel within channelz.
  93  type ChannelMetrics struct {
  94  	// The current connectivity state of the channel.
  95  	State atomic.Pointer[connectivity.State]
  96  	// The target this channel originally tried to connect to.  May be absent
  97  	Target atomic.Pointer[string]
  98  	// The number of calls started on the channel.
  99  	CallsStarted atomic.Int64
 100  	// The number of calls that have completed with an OK status.
 101  	CallsSucceeded atomic.Int64
 102  	// The number of calls that have a completed with a non-OK status.
 103  	CallsFailed atomic.Int64
 104  	// The last time a call was started on the channel.
 105  	LastCallStartedTimestamp atomic.Int64
 106  }
 107  
 108  // CopyFrom copies the metrics in o to c.  For testing only.
 109  func (c *ChannelMetrics) CopyFrom(o *ChannelMetrics) {
 110  	c.State.Store(o.State.Load())
 111  	c.Target.Store(o.Target.Load())
 112  	c.CallsStarted.Store(o.CallsStarted.Load())
 113  	c.CallsSucceeded.Store(o.CallsSucceeded.Load())
 114  	c.CallsFailed.Store(o.CallsFailed.Load())
 115  	c.LastCallStartedTimestamp.Store(o.LastCallStartedTimestamp.Load())
 116  }
 117  
 118  // Equal returns true iff the metrics of c are the same as the metrics of o.
 119  // For testing only.
 120  func (c *ChannelMetrics) Equal(o any) bool {
 121  	oc, ok := o.(*ChannelMetrics)
 122  	if !ok {
 123  		return false
 124  	}
 125  	if (c.State.Load() == nil) != (oc.State.Load() == nil) {
 126  		return false
 127  	}
 128  	if c.State.Load() != nil && *c.State.Load() != *oc.State.Load() {
 129  		return false
 130  	}
 131  	if (c.Target.Load() == nil) != (oc.Target.Load() == nil) {
 132  		return false
 133  	}
 134  	if c.Target.Load() != nil && *c.Target.Load() != *oc.Target.Load() {
 135  		return false
 136  	}
 137  	return c.CallsStarted.Load() == oc.CallsStarted.Load() &&
 138  		c.CallsFailed.Load() == oc.CallsFailed.Load() &&
 139  		c.CallsSucceeded.Load() == oc.CallsSucceeded.Load() &&
 140  		c.LastCallStartedTimestamp.Load() == oc.LastCallStartedTimestamp.Load()
 141  }
 142  
 143  func strFromPointer(s *string) string {
 144  	if s == nil {
 145  		return ""
 146  	}
 147  	return *s
 148  }
 149  
 150  // String returns a string representation of the ChannelMetrics, including its
 151  // state, target, and call metrics.
 152  func (c *ChannelMetrics) String() string {
 153  	return fmt.Sprintf("State: %v, Target: %s, CallsStarted: %v, CallsSucceeded: %v, CallsFailed: %v, LastCallStartedTimestamp: %v",
 154  		c.State.Load(), strFromPointer(c.Target.Load()), c.CallsStarted.Load(), c.CallsSucceeded.Load(), c.CallsFailed.Load(), c.LastCallStartedTimestamp.Load(),
 155  	)
 156  }
 157  
 158  // NewChannelMetricForTesting creates a new instance of ChannelMetrics with
 159  // specified initial values for testing purposes.
 160  func NewChannelMetricForTesting(state connectivity.State, target string, started, succeeded, failed, timestamp int64) *ChannelMetrics {
 161  	c := &ChannelMetrics{}
 162  	c.State.Store(&state)
 163  	c.Target.Store(&target)
 164  	c.CallsStarted.Store(started)
 165  	c.CallsSucceeded.Store(succeeded)
 166  	c.CallsFailed.Store(failed)
 167  	c.LastCallStartedTimestamp.Store(timestamp)
 168  	return c
 169  }
 170  
 171  func (c *Channel) addChild(id int64, e entry) {
 172  	switch v := e.(type) {
 173  	case *SubChannel:
 174  		c.subChans[id] = v.RefName
 175  	case *Channel:
 176  		c.nestedChans[id] = v.RefName
 177  	default:
 178  		logger.Errorf("cannot add a child (id = %d) of type %T to a channel", id, e)
 179  	}
 180  }
 181  
 182  func (c *Channel) deleteChild(id int64) {
 183  	delete(c.subChans, id)
 184  	delete(c.nestedChans, id)
 185  	c.deleteSelfIfReady()
 186  }
 187  
 188  func (c *Channel) triggerDelete() {
 189  	c.closeCalled = true
 190  	c.deleteSelfIfReady()
 191  }
 192  
 193  func (c *Channel) getParentID() int64 {
 194  	if c.Parent == nil {
 195  		return -1
 196  	}
 197  	return c.Parent.ID
 198  }
 199  
 200  // deleteSelfFromTree tries to delete the channel from the channelz entry relation tree, which means
 201  // deleting the channel reference from its parent's child list.
 202  //
 203  // In order for a channel to be deleted from the tree, it must meet the criteria that, removal of the
 204  // corresponding grpc object has been invoked, and the channel does not have any children left.
 205  //
 206  // The returned boolean value indicates whether the channel has been successfully deleted from tree.
 207  func (c *Channel) deleteSelfFromTree() (deleted bool) {
 208  	if !c.closeCalled || len(c.subChans)+len(c.nestedChans) != 0 {
 209  		return false
 210  	}
 211  	// not top channel
 212  	if c.Parent != nil {
 213  		c.Parent.deleteChild(c.ID)
 214  	}
 215  	return true
 216  }
 217  
 218  // deleteSelfFromMap checks whether it is valid to delete the channel from the map, which means
 219  // deleting the channel from channelz's tracking entirely. Users can no longer use id to query the
 220  // channel, and its memory will be garbage collected.
 221  //
 222  // The trace reference count of the channel must be 0 in order to be deleted from the map. This is
 223  // specified in the channel tracing gRFC that as long as some other trace has reference to an entity,
 224  // the trace of the referenced entity must not be deleted. In order to release the resource allocated
 225  // by grpc, the reference to the grpc object is reset to a dummy object.
 226  //
 227  // deleteSelfFromMap must be called after deleteSelfFromTree returns true.
 228  //
 229  // It returns a bool to indicate whether the channel can be safely deleted from map.
 230  func (c *Channel) deleteSelfFromMap() (delete bool) {
 231  	return c.getTraceRefCount() == 0
 232  }
 233  
 234  // deleteSelfIfReady tries to delete the channel itself from the channelz database.
 235  // The delete process includes two steps:
 236  //  1. delete the channel from the entry relation tree, i.e. delete the channel reference from its
 237  //     parent's child list.
 238  //  2. delete the channel from the map, i.e. delete the channel entirely from channelz. Lookup by id
 239  //     will return entry not found error.
 240  func (c *Channel) deleteSelfIfReady() {
 241  	if !c.deleteSelfFromTree() {
 242  		return
 243  	}
 244  	if !c.deleteSelfFromMap() {
 245  		return
 246  	}
 247  	db.deleteEntry(c.ID)
 248  	c.trace.clear()
 249  }
 250  
 251  func (c *Channel) getChannelTrace() *ChannelTrace {
 252  	return c.trace
 253  }
 254  
 255  func (c *Channel) incrTraceRefCount() {
 256  	atomic.AddInt32(&c.traceRefCount, 1)
 257  }
 258  
 259  func (c *Channel) decrTraceRefCount() {
 260  	atomic.AddInt32(&c.traceRefCount, -1)
 261  }
 262  
 263  func (c *Channel) getTraceRefCount() int {
 264  	i := atomic.LoadInt32(&c.traceRefCount)
 265  	return int(i)
 266  }
 267  
 268  func (c *Channel) getRefName() string {
 269  	return c.RefName
 270  }
 271