channelmap.go raw

   1  /*
   2   *
   3   * Copyright 2018 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  package channelz
  20  
  21  import (
  22  	"fmt"
  23  	"sort"
  24  	"sync"
  25  	"time"
  26  )
  27  
  28  // entry represents a node in the channelz database.
  29  type entry interface {
  30  	// addChild adds a child e, whose channelz id is id to child list
  31  	addChild(id int64, e entry)
  32  	// deleteChild deletes a child with channelz id to be id from child list
  33  	deleteChild(id int64)
  34  	// triggerDelete tries to delete self from channelz database. However, if
  35  	// child list is not empty, then deletion from the database is on hold until
  36  	// the last child is deleted from database.
  37  	triggerDelete()
  38  	// deleteSelfIfReady check whether triggerDelete() has been called before,
  39  	// and whether child list is now empty. If both conditions are met, then
  40  	// delete self from database.
  41  	deleteSelfIfReady()
  42  	// getParentID returns parent ID of the entry. 0 value parent ID means no parent.
  43  	getParentID() int64
  44  	Entity
  45  }
  46  
  47  // channelMap is the storage data structure for channelz.
  48  //
  49  // Methods of channelMap can be divided into two categories with respect to
  50  // locking.
  51  //
  52  // 1. Methods acquire the global lock.
  53  // 2. Methods that can only be called when global lock is held.
  54  //
  55  // A second type of method need always to be called inside a first type of method.
  56  type channelMap struct {
  57  	mu               sync.RWMutex
  58  	topLevelChannels map[int64]struct{}
  59  	channels         map[int64]*Channel
  60  	subChannels      map[int64]*SubChannel
  61  	sockets          map[int64]*Socket
  62  	servers          map[int64]*Server
  63  }
  64  
  65  func newChannelMap() *channelMap {
  66  	return &channelMap{
  67  		topLevelChannels: make(map[int64]struct{}),
  68  		channels:         make(map[int64]*Channel),
  69  		subChannels:      make(map[int64]*SubChannel),
  70  		sockets:          make(map[int64]*Socket),
  71  		servers:          make(map[int64]*Server),
  72  	}
  73  }
  74  
  75  func (c *channelMap) addServer(id int64, s *Server) {
  76  	c.mu.Lock()
  77  	defer c.mu.Unlock()
  78  	s.cm = c
  79  	c.servers[id] = s
  80  }
  81  
  82  func (c *channelMap) addChannel(id int64, cn *Channel, isTopChannel bool, pid int64) {
  83  	c.mu.Lock()
  84  	defer c.mu.Unlock()
  85  	cn.trace.cm = c
  86  	c.channels[id] = cn
  87  	if isTopChannel {
  88  		c.topLevelChannels[id] = struct{}{}
  89  	} else if p := c.channels[pid]; p != nil {
  90  		p.addChild(id, cn)
  91  	} else {
  92  		logger.Infof("channel %d references invalid parent ID %d", id, pid)
  93  	}
  94  }
  95  
  96  func (c *channelMap) addSubChannel(id int64, sc *SubChannel, pid int64) {
  97  	c.mu.Lock()
  98  	defer c.mu.Unlock()
  99  	sc.trace.cm = c
 100  	c.subChannels[id] = sc
 101  	if p := c.channels[pid]; p != nil {
 102  		p.addChild(id, sc)
 103  	} else {
 104  		logger.Infof("subchannel %d references invalid parent ID %d", id, pid)
 105  	}
 106  }
 107  
 108  func (c *channelMap) addSocket(s *Socket) {
 109  	c.mu.Lock()
 110  	defer c.mu.Unlock()
 111  	s.cm = c
 112  	c.sockets[s.ID] = s
 113  	if s.Parent == nil {
 114  		logger.Infof("normal socket %d has no parent", s.ID)
 115  	}
 116  	s.Parent.(entry).addChild(s.ID, s)
 117  }
 118  
 119  // removeEntry triggers the removal of an entry, which may not indeed delete the
 120  // entry, if it has to wait on the deletion of its children and until no other
 121  // entity's channel trace references it.  It may lead to a chain of entry
 122  // deletion. For example, deleting the last socket of a gracefully shutting down
 123  // server will lead to the server being also deleted.
 124  func (c *channelMap) removeEntry(id int64) {
 125  	c.mu.Lock()
 126  	defer c.mu.Unlock()
 127  	c.findEntry(id).triggerDelete()
 128  }
 129  
 130  // tracedChannel represents tracing operations which are present on both
 131  // channels and subChannels.
 132  type tracedChannel interface {
 133  	getChannelTrace() *ChannelTrace
 134  	incrTraceRefCount()
 135  	decrTraceRefCount()
 136  	getRefName() string
 137  }
 138  
 139  // c.mu must be held by the caller
 140  func (c *channelMap) decrTraceRefCount(id int64) {
 141  	e := c.findEntry(id)
 142  	if v, ok := e.(tracedChannel); ok {
 143  		v.decrTraceRefCount()
 144  		e.deleteSelfIfReady()
 145  	}
 146  }
 147  
 148  // c.mu must be held by the caller.
 149  func (c *channelMap) findEntry(id int64) entry {
 150  	if v, ok := c.channels[id]; ok {
 151  		return v
 152  	}
 153  	if v, ok := c.subChannels[id]; ok {
 154  		return v
 155  	}
 156  	if v, ok := c.servers[id]; ok {
 157  		return v
 158  	}
 159  	if v, ok := c.sockets[id]; ok {
 160  		return v
 161  	}
 162  	return &dummyEntry{idNotFound: id}
 163  }
 164  
 165  // c.mu must be held by the caller
 166  //
 167  // deleteEntry deletes an entry from the channelMap. Before calling this method,
 168  // caller must check this entry is ready to be deleted, i.e removeEntry() has
 169  // been called on it, and no children still exist.
 170  func (c *channelMap) deleteEntry(id int64) entry {
 171  	if v, ok := c.sockets[id]; ok {
 172  		delete(c.sockets, id)
 173  		return v
 174  	}
 175  	if v, ok := c.subChannels[id]; ok {
 176  		delete(c.subChannels, id)
 177  		return v
 178  	}
 179  	if v, ok := c.channels[id]; ok {
 180  		delete(c.channels, id)
 181  		delete(c.topLevelChannels, id)
 182  		return v
 183  	}
 184  	if v, ok := c.servers[id]; ok {
 185  		delete(c.servers, id)
 186  		return v
 187  	}
 188  	return &dummyEntry{idNotFound: id}
 189  }
 190  
 191  func (c *channelMap) traceEvent(id int64, desc *TraceEvent) {
 192  	c.mu.Lock()
 193  	defer c.mu.Unlock()
 194  	child := c.findEntry(id)
 195  	childTC, ok := child.(tracedChannel)
 196  	if !ok {
 197  		return
 198  	}
 199  	childTC.getChannelTrace().append(&traceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()})
 200  	if desc.Parent != nil {
 201  		parent := c.findEntry(child.getParentID())
 202  		var chanType RefChannelType
 203  		switch child.(type) {
 204  		case *Channel:
 205  			chanType = RefChannel
 206  		case *SubChannel:
 207  			chanType = RefSubChannel
 208  		}
 209  		if parentTC, ok := parent.(tracedChannel); ok {
 210  			parentTC.getChannelTrace().append(&traceEvent{
 211  				Desc:      desc.Parent.Desc,
 212  				Severity:  desc.Parent.Severity,
 213  				Timestamp: time.Now(),
 214  				RefID:     id,
 215  				RefName:   childTC.getRefName(),
 216  				RefType:   chanType,
 217  			})
 218  			childTC.incrTraceRefCount()
 219  		}
 220  	}
 221  }
 222  
 223  type int64Slice []int64
 224  
 225  func (s int64Slice) Len() int           { return len(s) }
 226  func (s int64Slice) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
 227  func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] }
 228  
 229  func copyMap(m map[int64]string) map[int64]string {
 230  	n := make(map[int64]string)
 231  	for k, v := range m {
 232  		n[k] = v
 233  	}
 234  	return n
 235  }
 236  
 237  func (c *channelMap) getTopChannels(id int64, maxResults int) ([]*Channel, bool) {
 238  	if maxResults <= 0 {
 239  		maxResults = EntriesPerPage
 240  	}
 241  	c.mu.RLock()
 242  	defer c.mu.RUnlock()
 243  	l := int64(len(c.topLevelChannels))
 244  	ids := make([]int64, 0, l)
 245  
 246  	for k := range c.topLevelChannels {
 247  		ids = append(ids, k)
 248  	}
 249  	sort.Sort(int64Slice(ids))
 250  	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
 251  	end := true
 252  	var t []*Channel
 253  	for _, v := range ids[idx:] {
 254  		if len(t) == maxResults {
 255  			end = false
 256  			break
 257  		}
 258  		if cn, ok := c.channels[v]; ok {
 259  			t = append(t, cn)
 260  		}
 261  	}
 262  	return t, end
 263  }
 264  
 265  func (c *channelMap) getServers(id int64, maxResults int) ([]*Server, bool) {
 266  	if maxResults <= 0 {
 267  		maxResults = EntriesPerPage
 268  	}
 269  	c.mu.RLock()
 270  	defer c.mu.RUnlock()
 271  	ids := make([]int64, 0, len(c.servers))
 272  	for k := range c.servers {
 273  		ids = append(ids, k)
 274  	}
 275  	sort.Sort(int64Slice(ids))
 276  	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
 277  	end := true
 278  	var s []*Server
 279  	for _, v := range ids[idx:] {
 280  		if len(s) == maxResults {
 281  			end = false
 282  			break
 283  		}
 284  		if svr, ok := c.servers[v]; ok {
 285  			s = append(s, svr)
 286  		}
 287  	}
 288  	return s, end
 289  }
 290  
 291  func (c *channelMap) getServerSockets(id int64, startID int64, maxResults int) ([]*Socket, bool) {
 292  	if maxResults <= 0 {
 293  		maxResults = EntriesPerPage
 294  	}
 295  	c.mu.RLock()
 296  	defer c.mu.RUnlock()
 297  	svr, ok := c.servers[id]
 298  	if !ok {
 299  		// server with id doesn't exist.
 300  		return nil, true
 301  	}
 302  	svrskts := svr.sockets
 303  	ids := make([]int64, 0, len(svrskts))
 304  	sks := make([]*Socket, 0, min(len(svrskts), maxResults))
 305  	for k := range svrskts {
 306  		ids = append(ids, k)
 307  	}
 308  	sort.Sort(int64Slice(ids))
 309  	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID })
 310  	end := true
 311  	for _, v := range ids[idx:] {
 312  		if len(sks) == maxResults {
 313  			end = false
 314  			break
 315  		}
 316  		if ns, ok := c.sockets[v]; ok {
 317  			sks = append(sks, ns)
 318  		}
 319  	}
 320  	return sks, end
 321  }
 322  
 323  func (c *channelMap) getChannel(id int64) *Channel {
 324  	c.mu.RLock()
 325  	defer c.mu.RUnlock()
 326  	return c.channels[id]
 327  }
 328  
 329  func (c *channelMap) getSubChannel(id int64) *SubChannel {
 330  	c.mu.RLock()
 331  	defer c.mu.RUnlock()
 332  	return c.subChannels[id]
 333  }
 334  
 335  func (c *channelMap) getSocket(id int64) *Socket {
 336  	c.mu.RLock()
 337  	defer c.mu.RUnlock()
 338  	return c.sockets[id]
 339  }
 340  
 341  func (c *channelMap) getServer(id int64) *Server {
 342  	c.mu.RLock()
 343  	defer c.mu.RUnlock()
 344  	return c.servers[id]
 345  }
 346  
 347  type dummyEntry struct {
 348  	// dummyEntry is a fake entry to handle entry not found case.
 349  	idNotFound int64
 350  	Entity
 351  }
 352  
 353  func (d *dummyEntry) String() string {
 354  	return fmt.Sprintf("non-existent entity #%d", d.idNotFound)
 355  }
 356  
 357  func (d *dummyEntry) ID() int64 { return d.idNotFound }
 358  
 359  func (d *dummyEntry) addChild(id int64, e entry) {
 360  	// Note: It is possible for a normal program to reach here under race
 361  	// condition.  For example, there could be a race between ClientConn.Close()
 362  	// info being propagated to addrConn and http2Client. ClientConn.Close()
 363  	// cancel the context and result in http2Client to error. The error info is
 364  	// then caught by transport monitor and before addrConn.tearDown() is called
 365  	// in side ClientConn.Close(). Therefore, the addrConn will create a new
 366  	// transport. And when registering the new transport in channelz, its parent
 367  	// addrConn could have already been torn down and deleted from channelz
 368  	// tracking, and thus reach the code here.
 369  	logger.Infof("attempt to add child of type %T with id %d to a parent (id=%d) that doesn't currently exist", e, id, d.idNotFound)
 370  }
 371  
 372  func (d *dummyEntry) deleteChild(id int64) {
 373  	// It is possible for a normal program to reach here under race condition.
 374  	// Refer to the example described in addChild().
 375  	logger.Infof("attempt to delete child with id %d from a parent (id=%d) that doesn't currently exist", id, d.idNotFound)
 376  }
 377  
 378  func (d *dummyEntry) triggerDelete() {
 379  	logger.Warningf("attempt to delete an entry (id=%d) that doesn't currently exist", d.idNotFound)
 380  }
 381  
 382  func (*dummyEntry) deleteSelfIfReady() {
 383  	// code should not reach here. deleteSelfIfReady is always called on an existing entry.
 384  }
 385  
 386  func (*dummyEntry) getParentID() int64 {
 387  	return 0
 388  }
 389  
 390  // Entity is implemented by all channelz types.
 391  type Entity interface {
 392  	isEntity()
 393  	fmt.Stringer
 394  	id() int64
 395  }
 396