meter.go raw

   1  // Copyright The OpenTelemetry Authors
   2  // SPDX-License-Identifier: Apache-2.0
   3  
   4  package global // import "go.opentelemetry.io/otel/internal/global"
   5  
   6  import (
   7  	"container/list"
   8  	"context"
   9  	"reflect"
  10  	"sync"
  11  
  12  	"go.opentelemetry.io/otel/metric"
  13  	"go.opentelemetry.io/otel/metric/embedded"
  14  )
  15  
  16  // meterProvider is a placeholder for a configured SDK MeterProvider.
  17  //
  18  // All MeterProvider functionality is forwarded to a delegate once
  19  // configured.
  20  type meterProvider struct {
  21  	embedded.MeterProvider
  22  
  23  	mtx    sync.Mutex
  24  	meters map[il]*meter
  25  
  26  	delegate metric.MeterProvider
  27  }
  28  
  29  // setDelegate configures p to delegate all MeterProvider functionality to
  30  // provider.
  31  //
  32  // All Meters provided prior to this function call are switched out to be
  33  // Meters provided by provider. All instruments and callbacks are recreated and
  34  // delegated.
  35  //
  36  // It is guaranteed by the caller that this happens only once.
  37  func (p *meterProvider) setDelegate(provider metric.MeterProvider) {
  38  	p.mtx.Lock()
  39  	defer p.mtx.Unlock()
  40  
  41  	p.delegate = provider
  42  
  43  	if len(p.meters) == 0 {
  44  		return
  45  	}
  46  
  47  	for _, meter := range p.meters {
  48  		meter.setDelegate(provider)
  49  	}
  50  
  51  	p.meters = nil
  52  }
  53  
  54  // Meter implements MeterProvider.
  55  func (p *meterProvider) Meter(name string, opts ...metric.MeterOption) metric.Meter {
  56  	p.mtx.Lock()
  57  	defer p.mtx.Unlock()
  58  
  59  	if p.delegate != nil {
  60  		return p.delegate.Meter(name, opts...)
  61  	}
  62  
  63  	// At this moment it is guaranteed that no sdk is installed, save the meter in the meters map.
  64  
  65  	c := metric.NewMeterConfig(opts...)
  66  	key := il{
  67  		name:    name,
  68  		version: c.InstrumentationVersion(),
  69  		schema:  c.SchemaURL(),
  70  		attrs:   c.InstrumentationAttributes(),
  71  	}
  72  
  73  	if p.meters == nil {
  74  		p.meters = make(map[il]*meter)
  75  	}
  76  
  77  	if val, ok := p.meters[key]; ok {
  78  		return val
  79  	}
  80  
  81  	t := &meter{name: name, opts: opts, instruments: make(map[instID]delegatedInstrument)}
  82  	p.meters[key] = t
  83  	return t
  84  }
  85  
  86  // meter is a placeholder for a metric.Meter.
  87  //
  88  // All Meter functionality is forwarded to a delegate once configured.
  89  // Otherwise, all functionality is forwarded to a NoopMeter.
  90  type meter struct {
  91  	embedded.Meter
  92  
  93  	name string
  94  	opts []metric.MeterOption
  95  
  96  	mtx         sync.Mutex
  97  	instruments map[instID]delegatedInstrument
  98  
  99  	registry list.List
 100  
 101  	delegate metric.Meter
 102  }
 103  
 104  type delegatedInstrument interface {
 105  	setDelegate(metric.Meter)
 106  }
 107  
 108  // instID are the identifying properties of a instrument.
 109  type instID struct {
 110  	// name is the name of the stream.
 111  	name string
 112  	// description is the description of the stream.
 113  	description string
 114  	// kind defines the functional group of the instrument.
 115  	kind reflect.Type
 116  	// unit is the unit of the stream.
 117  	unit string
 118  }
 119  
 120  // setDelegate configures m to delegate all Meter functionality to Meters
 121  // created by provider.
 122  //
 123  // All subsequent calls to the Meter methods will be passed to the delegate.
 124  //
 125  // It is guaranteed by the caller that this happens only once.
 126  func (m *meter) setDelegate(provider metric.MeterProvider) {
 127  	m.mtx.Lock()
 128  	defer m.mtx.Unlock()
 129  
 130  	meter := provider.Meter(m.name, m.opts...)
 131  	m.delegate = meter
 132  
 133  	for _, inst := range m.instruments {
 134  		inst.setDelegate(meter)
 135  	}
 136  
 137  	var n *list.Element
 138  	for e := m.registry.Front(); e != nil; e = n {
 139  		r := e.Value.(*registration)
 140  		r.setDelegate(meter)
 141  		n = e.Next()
 142  		m.registry.Remove(e)
 143  	}
 144  
 145  	m.instruments = nil
 146  	m.registry.Init()
 147  }
 148  
 149  func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption) (metric.Int64Counter, error) {
 150  	m.mtx.Lock()
 151  	defer m.mtx.Unlock()
 152  
 153  	if m.delegate != nil {
 154  		return m.delegate.Int64Counter(name, options...)
 155  	}
 156  
 157  	cfg := metric.NewInt64CounterConfig(options...)
 158  	id := instID{
 159  		name:        name,
 160  		kind:        reflect.TypeOf((*siCounter)(nil)),
 161  		description: cfg.Description(),
 162  		unit:        cfg.Unit(),
 163  	}
 164  	if f, ok := m.instruments[id]; ok {
 165  		return f.(metric.Int64Counter), nil
 166  	}
 167  	i := &siCounter{name: name, opts: options}
 168  	m.instruments[id] = i
 169  	return i, nil
 170  }
 171  
 172  func (m *meter) Int64UpDownCounter(
 173  	name string,
 174  	options ...metric.Int64UpDownCounterOption,
 175  ) (metric.Int64UpDownCounter, error) {
 176  	m.mtx.Lock()
 177  	defer m.mtx.Unlock()
 178  
 179  	if m.delegate != nil {
 180  		return m.delegate.Int64UpDownCounter(name, options...)
 181  	}
 182  
 183  	cfg := metric.NewInt64UpDownCounterConfig(options...)
 184  	id := instID{
 185  		name:        name,
 186  		kind:        reflect.TypeOf((*siUpDownCounter)(nil)),
 187  		description: cfg.Description(),
 188  		unit:        cfg.Unit(),
 189  	}
 190  	if f, ok := m.instruments[id]; ok {
 191  		return f.(metric.Int64UpDownCounter), nil
 192  	}
 193  	i := &siUpDownCounter{name: name, opts: options}
 194  	m.instruments[id] = i
 195  	return i, nil
 196  }
 197  
 198  func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOption) (metric.Int64Histogram, error) {
 199  	m.mtx.Lock()
 200  	defer m.mtx.Unlock()
 201  
 202  	if m.delegate != nil {
 203  		return m.delegate.Int64Histogram(name, options...)
 204  	}
 205  
 206  	cfg := metric.NewInt64HistogramConfig(options...)
 207  	id := instID{
 208  		name:        name,
 209  		kind:        reflect.TypeOf((*siHistogram)(nil)),
 210  		description: cfg.Description(),
 211  		unit:        cfg.Unit(),
 212  	}
 213  	if f, ok := m.instruments[id]; ok {
 214  		return f.(metric.Int64Histogram), nil
 215  	}
 216  	i := &siHistogram{name: name, opts: options}
 217  	m.instruments[id] = i
 218  	return i, nil
 219  }
 220  
 221  func (m *meter) Int64Gauge(name string, options ...metric.Int64GaugeOption) (metric.Int64Gauge, error) {
 222  	m.mtx.Lock()
 223  	defer m.mtx.Unlock()
 224  
 225  	if m.delegate != nil {
 226  		return m.delegate.Int64Gauge(name, options...)
 227  	}
 228  
 229  	cfg := metric.NewInt64GaugeConfig(options...)
 230  	id := instID{
 231  		name:        name,
 232  		kind:        reflect.TypeOf((*siGauge)(nil)),
 233  		description: cfg.Description(),
 234  		unit:        cfg.Unit(),
 235  	}
 236  	if f, ok := m.instruments[id]; ok {
 237  		return f.(metric.Int64Gauge), nil
 238  	}
 239  	i := &siGauge{name: name, opts: options}
 240  	m.instruments[id] = i
 241  	return i, nil
 242  }
 243  
 244  func (m *meter) Int64ObservableCounter(
 245  	name string,
 246  	options ...metric.Int64ObservableCounterOption,
 247  ) (metric.Int64ObservableCounter, error) {
 248  	m.mtx.Lock()
 249  	defer m.mtx.Unlock()
 250  
 251  	if m.delegate != nil {
 252  		return m.delegate.Int64ObservableCounter(name, options...)
 253  	}
 254  
 255  	cfg := metric.NewInt64ObservableCounterConfig(options...)
 256  	id := instID{
 257  		name:        name,
 258  		kind:        reflect.TypeOf((*aiCounter)(nil)),
 259  		description: cfg.Description(),
 260  		unit:        cfg.Unit(),
 261  	}
 262  	if f, ok := m.instruments[id]; ok {
 263  		return f.(metric.Int64ObservableCounter), nil
 264  	}
 265  	i := &aiCounter{name: name, opts: options}
 266  	m.instruments[id] = i
 267  	return i, nil
 268  }
 269  
 270  func (m *meter) Int64ObservableUpDownCounter(
 271  	name string,
 272  	options ...metric.Int64ObservableUpDownCounterOption,
 273  ) (metric.Int64ObservableUpDownCounter, error) {
 274  	m.mtx.Lock()
 275  	defer m.mtx.Unlock()
 276  
 277  	if m.delegate != nil {
 278  		return m.delegate.Int64ObservableUpDownCounter(name, options...)
 279  	}
 280  
 281  	cfg := metric.NewInt64ObservableUpDownCounterConfig(options...)
 282  	id := instID{
 283  		name:        name,
 284  		kind:        reflect.TypeOf((*aiUpDownCounter)(nil)),
 285  		description: cfg.Description(),
 286  		unit:        cfg.Unit(),
 287  	}
 288  	if f, ok := m.instruments[id]; ok {
 289  		return f.(metric.Int64ObservableUpDownCounter), nil
 290  	}
 291  	i := &aiUpDownCounter{name: name, opts: options}
 292  	m.instruments[id] = i
 293  	return i, nil
 294  }
 295  
 296  func (m *meter) Int64ObservableGauge(
 297  	name string,
 298  	options ...metric.Int64ObservableGaugeOption,
 299  ) (metric.Int64ObservableGauge, error) {
 300  	m.mtx.Lock()
 301  	defer m.mtx.Unlock()
 302  
 303  	if m.delegate != nil {
 304  		return m.delegate.Int64ObservableGauge(name, options...)
 305  	}
 306  
 307  	cfg := metric.NewInt64ObservableGaugeConfig(options...)
 308  	id := instID{
 309  		name:        name,
 310  		kind:        reflect.TypeOf((*aiGauge)(nil)),
 311  		description: cfg.Description(),
 312  		unit:        cfg.Unit(),
 313  	}
 314  	if f, ok := m.instruments[id]; ok {
 315  		return f.(metric.Int64ObservableGauge), nil
 316  	}
 317  	i := &aiGauge{name: name, opts: options}
 318  	m.instruments[id] = i
 319  	return i, nil
 320  }
 321  
 322  func (m *meter) Float64Counter(name string, options ...metric.Float64CounterOption) (metric.Float64Counter, error) {
 323  	m.mtx.Lock()
 324  	defer m.mtx.Unlock()
 325  
 326  	if m.delegate != nil {
 327  		return m.delegate.Float64Counter(name, options...)
 328  	}
 329  
 330  	cfg := metric.NewFloat64CounterConfig(options...)
 331  	id := instID{
 332  		name:        name,
 333  		kind:        reflect.TypeOf((*sfCounter)(nil)),
 334  		description: cfg.Description(),
 335  		unit:        cfg.Unit(),
 336  	}
 337  	if f, ok := m.instruments[id]; ok {
 338  		return f.(metric.Float64Counter), nil
 339  	}
 340  	i := &sfCounter{name: name, opts: options}
 341  	m.instruments[id] = i
 342  	return i, nil
 343  }
 344  
 345  func (m *meter) Float64UpDownCounter(
 346  	name string,
 347  	options ...metric.Float64UpDownCounterOption,
 348  ) (metric.Float64UpDownCounter, error) {
 349  	m.mtx.Lock()
 350  	defer m.mtx.Unlock()
 351  
 352  	if m.delegate != nil {
 353  		return m.delegate.Float64UpDownCounter(name, options...)
 354  	}
 355  
 356  	cfg := metric.NewFloat64UpDownCounterConfig(options...)
 357  	id := instID{
 358  		name:        name,
 359  		kind:        reflect.TypeOf((*sfUpDownCounter)(nil)),
 360  		description: cfg.Description(),
 361  		unit:        cfg.Unit(),
 362  	}
 363  	if f, ok := m.instruments[id]; ok {
 364  		return f.(metric.Float64UpDownCounter), nil
 365  	}
 366  	i := &sfUpDownCounter{name: name, opts: options}
 367  	m.instruments[id] = i
 368  	return i, nil
 369  }
 370  
 371  func (m *meter) Float64Histogram(
 372  	name string,
 373  	options ...metric.Float64HistogramOption,
 374  ) (metric.Float64Histogram, error) {
 375  	m.mtx.Lock()
 376  	defer m.mtx.Unlock()
 377  
 378  	if m.delegate != nil {
 379  		return m.delegate.Float64Histogram(name, options...)
 380  	}
 381  
 382  	cfg := metric.NewFloat64HistogramConfig(options...)
 383  	id := instID{
 384  		name:        name,
 385  		kind:        reflect.TypeOf((*sfHistogram)(nil)),
 386  		description: cfg.Description(),
 387  		unit:        cfg.Unit(),
 388  	}
 389  	if f, ok := m.instruments[id]; ok {
 390  		return f.(metric.Float64Histogram), nil
 391  	}
 392  	i := &sfHistogram{name: name, opts: options}
 393  	m.instruments[id] = i
 394  	return i, nil
 395  }
 396  
 397  func (m *meter) Float64Gauge(name string, options ...metric.Float64GaugeOption) (metric.Float64Gauge, error) {
 398  	m.mtx.Lock()
 399  	defer m.mtx.Unlock()
 400  
 401  	if m.delegate != nil {
 402  		return m.delegate.Float64Gauge(name, options...)
 403  	}
 404  
 405  	cfg := metric.NewFloat64GaugeConfig(options...)
 406  	id := instID{
 407  		name:        name,
 408  		kind:        reflect.TypeOf((*sfGauge)(nil)),
 409  		description: cfg.Description(),
 410  		unit:        cfg.Unit(),
 411  	}
 412  	if f, ok := m.instruments[id]; ok {
 413  		return f.(metric.Float64Gauge), nil
 414  	}
 415  	i := &sfGauge{name: name, opts: options}
 416  	m.instruments[id] = i
 417  	return i, nil
 418  }
 419  
 420  func (m *meter) Float64ObservableCounter(
 421  	name string,
 422  	options ...metric.Float64ObservableCounterOption,
 423  ) (metric.Float64ObservableCounter, error) {
 424  	m.mtx.Lock()
 425  	defer m.mtx.Unlock()
 426  
 427  	if m.delegate != nil {
 428  		return m.delegate.Float64ObservableCounter(name, options...)
 429  	}
 430  
 431  	cfg := metric.NewFloat64ObservableCounterConfig(options...)
 432  	id := instID{
 433  		name:        name,
 434  		kind:        reflect.TypeOf((*afCounter)(nil)),
 435  		description: cfg.Description(),
 436  		unit:        cfg.Unit(),
 437  	}
 438  	if f, ok := m.instruments[id]; ok {
 439  		return f.(metric.Float64ObservableCounter), nil
 440  	}
 441  	i := &afCounter{name: name, opts: options}
 442  	m.instruments[id] = i
 443  	return i, nil
 444  }
 445  
 446  func (m *meter) Float64ObservableUpDownCounter(
 447  	name string,
 448  	options ...metric.Float64ObservableUpDownCounterOption,
 449  ) (metric.Float64ObservableUpDownCounter, error) {
 450  	m.mtx.Lock()
 451  	defer m.mtx.Unlock()
 452  
 453  	if m.delegate != nil {
 454  		return m.delegate.Float64ObservableUpDownCounter(name, options...)
 455  	}
 456  
 457  	cfg := metric.NewFloat64ObservableUpDownCounterConfig(options...)
 458  	id := instID{
 459  		name:        name,
 460  		kind:        reflect.TypeOf((*afUpDownCounter)(nil)),
 461  		description: cfg.Description(),
 462  		unit:        cfg.Unit(),
 463  	}
 464  	if f, ok := m.instruments[id]; ok {
 465  		return f.(metric.Float64ObservableUpDownCounter), nil
 466  	}
 467  	i := &afUpDownCounter{name: name, opts: options}
 468  	m.instruments[id] = i
 469  	return i, nil
 470  }
 471  
 472  func (m *meter) Float64ObservableGauge(
 473  	name string,
 474  	options ...metric.Float64ObservableGaugeOption,
 475  ) (metric.Float64ObservableGauge, error) {
 476  	m.mtx.Lock()
 477  	defer m.mtx.Unlock()
 478  
 479  	if m.delegate != nil {
 480  		return m.delegate.Float64ObservableGauge(name, options...)
 481  	}
 482  
 483  	cfg := metric.NewFloat64ObservableGaugeConfig(options...)
 484  	id := instID{
 485  		name:        name,
 486  		kind:        reflect.TypeOf((*afGauge)(nil)),
 487  		description: cfg.Description(),
 488  		unit:        cfg.Unit(),
 489  	}
 490  	if f, ok := m.instruments[id]; ok {
 491  		return f.(metric.Float64ObservableGauge), nil
 492  	}
 493  	i := &afGauge{name: name, opts: options}
 494  	m.instruments[id] = i
 495  	return i, nil
 496  }
 497  
 498  // RegisterCallback captures the function that will be called during Collect.
 499  func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable) (metric.Registration, error) {
 500  	m.mtx.Lock()
 501  	defer m.mtx.Unlock()
 502  
 503  	if m.delegate != nil {
 504  		return m.delegate.RegisterCallback(unwrapCallback(f), unwrapInstruments(insts)...)
 505  	}
 506  
 507  	reg := &registration{instruments: insts, function: f}
 508  	e := m.registry.PushBack(reg)
 509  	reg.unreg = func() error {
 510  		m.mtx.Lock()
 511  		_ = m.registry.Remove(e)
 512  		m.mtx.Unlock()
 513  		return nil
 514  	}
 515  	return reg, nil
 516  }
 517  
 518  func unwrapInstruments(instruments []metric.Observable) []metric.Observable {
 519  	out := make([]metric.Observable, 0, len(instruments))
 520  
 521  	for _, inst := range instruments {
 522  		if in, ok := inst.(unwrapper); ok {
 523  			out = append(out, in.unwrap())
 524  		} else {
 525  			out = append(out, inst)
 526  		}
 527  	}
 528  
 529  	return out
 530  }
 531  
 532  type registration struct {
 533  	embedded.Registration
 534  
 535  	instruments []metric.Observable
 536  	function    metric.Callback
 537  
 538  	unreg   func() error
 539  	unregMu sync.Mutex
 540  }
 541  
 542  type unwrapObs struct {
 543  	embedded.Observer
 544  	obs metric.Observer
 545  }
 546  
 547  // unwrapFloat64Observable returns an expected metric.Float64Observable after
 548  // unwrapping the global object.
 549  func unwrapFloat64Observable(inst metric.Float64Observable) metric.Float64Observable {
 550  	if unwrapped, ok := inst.(unwrapper); ok {
 551  		if floatObs, ok := unwrapped.unwrap().(metric.Float64Observable); ok {
 552  			// Note: if the unwrapped object does not
 553  			// unwrap as an observable for either of the
 554  			// predicates here, it means an internal bug in
 555  			// this package.  We avoid logging an error in
 556  			// this case, because the SDK has to try its
 557  			// own type conversion on the object.  The SDK
 558  			// will see this and be forced to respond with
 559  			// its own error.
 560  			//
 561  			// This code uses a double-nested if statement
 562  			// to avoid creating a branch that is
 563  			// impossible to cover.
 564  			inst = floatObs
 565  		}
 566  	}
 567  	return inst
 568  }
 569  
 570  // unwrapInt64Observable returns an expected metric.Int64Observable after
 571  // unwrapping the global object.
 572  func unwrapInt64Observable(inst metric.Int64Observable) metric.Int64Observable {
 573  	if unwrapped, ok := inst.(unwrapper); ok {
 574  		if unint, ok := unwrapped.unwrap().(metric.Int64Observable); ok {
 575  			// See the comment in unwrapFloat64Observable().
 576  			inst = unint
 577  		}
 578  	}
 579  	return inst
 580  }
 581  
 582  func (uo *unwrapObs) ObserveFloat64(inst metric.Float64Observable, value float64, opts ...metric.ObserveOption) {
 583  	uo.obs.ObserveFloat64(unwrapFloat64Observable(inst), value, opts...)
 584  }
 585  
 586  func (uo *unwrapObs) ObserveInt64(inst metric.Int64Observable, value int64, opts ...metric.ObserveOption) {
 587  	uo.obs.ObserveInt64(unwrapInt64Observable(inst), value, opts...)
 588  }
 589  
 590  func unwrapCallback(f metric.Callback) metric.Callback {
 591  	return func(ctx context.Context, obs metric.Observer) error {
 592  		return f(ctx, &unwrapObs{obs: obs})
 593  	}
 594  }
 595  
 596  func (c *registration) setDelegate(m metric.Meter) {
 597  	c.unregMu.Lock()
 598  	defer c.unregMu.Unlock()
 599  
 600  	if c.unreg == nil {
 601  		// Unregister already called.
 602  		return
 603  	}
 604  
 605  	reg, err := m.RegisterCallback(unwrapCallback(c.function), unwrapInstruments(c.instruments)...)
 606  	if err != nil {
 607  		GetErrorHandler().Handle(err)
 608  		return
 609  	}
 610  
 611  	c.unreg = reg.Unregister
 612  }
 613  
 614  func (c *registration) Unregister() error {
 615  	c.unregMu.Lock()
 616  	defer c.unregMu.Unlock()
 617  	if c.unreg == nil {
 618  		// Unregister already called.
 619  		return nil
 620  	}
 621  
 622  	var err error
 623  	err, c.unreg = c.unreg(), nil
 624  	return err
 625  }
 626