timeseries.go raw

   1  // Copyright 2015 The Go Authors. All rights reserved.
   2  // Use of this source code is governed by a BSD-style
   3  // license that can be found in the LICENSE file.
   4  
   5  // Package timeseries implements a time series structure for stats collection.
   6  package timeseries // import "golang.org/x/net/internal/timeseries"
   7  
   8  import (
   9  	"fmt"
  10  	"log"
  11  	"time"
  12  )
  13  
  14  const (
  15  	timeSeriesNumBuckets       = 64
  16  	minuteHourSeriesNumBuckets = 60
  17  )
  18  
  19  var timeSeriesResolutions = []time.Duration{
  20  	1 * time.Second,
  21  	10 * time.Second,
  22  	1 * time.Minute,
  23  	10 * time.Minute,
  24  	1 * time.Hour,
  25  	6 * time.Hour,
  26  	24 * time.Hour,          // 1 day
  27  	7 * 24 * time.Hour,      // 1 week
  28  	4 * 7 * 24 * time.Hour,  // 4 weeks
  29  	16 * 7 * 24 * time.Hour, // 16 weeks
  30  }
  31  
  32  var minuteHourSeriesResolutions = []time.Duration{
  33  	1 * time.Second,
  34  	1 * time.Minute,
  35  }
  36  
  37  // An Observable is a kind of data that can be aggregated in a time series.
  38  type Observable interface {
  39  	Multiply(ratio float64)    // Multiplies the data in self by a given ratio
  40  	Add(other Observable)      // Adds the data from a different observation to self
  41  	Clear()                    // Clears the observation so it can be reused.
  42  	CopyFrom(other Observable) // Copies the contents of a given observation to self
  43  }
  44  
  45  // Float attaches the methods of Observable to a float64.
  46  type Float float64
  47  
  48  // NewFloat returns a Float.
  49  func NewFloat() Observable {
  50  	f := Float(0)
  51  	return &f
  52  }
  53  
  54  // String returns the float as a string.
  55  func (f *Float) String() string { return fmt.Sprintf("%g", f.Value()) }
  56  
  57  // Value returns the float's value.
  58  func (f *Float) Value() float64 { return float64(*f) }
  59  
  60  func (f *Float) Multiply(ratio float64) { *f *= Float(ratio) }
  61  
  62  func (f *Float) Add(other Observable) {
  63  	o := other.(*Float)
  64  	*f += *o
  65  }
  66  
  67  func (f *Float) Clear() { *f = 0 }
  68  
  69  func (f *Float) CopyFrom(other Observable) {
  70  	o := other.(*Float)
  71  	*f = *o
  72  }
  73  
  74  // A Clock tells the current time.
  75  type Clock interface {
  76  	Time() time.Time
  77  }
  78  
  79  type defaultClock int
  80  
  81  var defaultClockInstance defaultClock
  82  
  83  func (defaultClock) Time() time.Time { return time.Now() }
  84  
  85  // Information kept per level. Each level consists of a circular list of
  86  // observations. The start of the level may be derived from end and the
  87  // len(buckets) * sizeInMillis.
  88  type tsLevel struct {
  89  	oldest   int               // index to oldest bucketed Observable
  90  	newest   int               // index to newest bucketed Observable
  91  	end      time.Time         // end timestamp for this level
  92  	size     time.Duration     // duration of the bucketed Observable
  93  	buckets  []Observable      // collections of observations
  94  	provider func() Observable // used for creating new Observable
  95  }
  96  
  97  func (l *tsLevel) Clear() {
  98  	l.oldest = 0
  99  	l.newest = len(l.buckets) - 1
 100  	l.end = time.Time{}
 101  	for i := range l.buckets {
 102  		if l.buckets[i] != nil {
 103  			l.buckets[i].Clear()
 104  			l.buckets[i] = nil
 105  		}
 106  	}
 107  }
 108  
 109  func (l *tsLevel) InitLevel(size time.Duration, numBuckets int, f func() Observable) {
 110  	l.size = size
 111  	l.provider = f
 112  	l.buckets = make([]Observable, numBuckets)
 113  }
 114  
 115  // Keeps a sequence of levels. Each level is responsible for storing data at
 116  // a given resolution. For example, the first level stores data at a one
 117  // minute resolution while the second level stores data at a one hour
 118  // resolution.
 119  
 120  // Each level is represented by a sequence of buckets. Each bucket spans an
 121  // interval equal to the resolution of the level. New observations are added
 122  // to the last bucket.
 123  type timeSeries struct {
 124  	provider    func() Observable // make more Observable
 125  	numBuckets  int               // number of buckets in each level
 126  	levels      []*tsLevel        // levels of bucketed Observable
 127  	lastAdd     time.Time         // time of last Observable tracked
 128  	total       Observable        // convenient aggregation of all Observable
 129  	clock       Clock             // Clock for getting current time
 130  	pending     Observable        // observations not yet bucketed
 131  	pendingTime time.Time         // what time are we keeping in pending
 132  	dirty       bool              // if there are pending observations
 133  }
 134  
 135  // init initializes a level according to the supplied criteria.
 136  func (ts *timeSeries) init(resolutions []time.Duration, f func() Observable, numBuckets int, clock Clock) {
 137  	ts.provider = f
 138  	ts.numBuckets = numBuckets
 139  	ts.clock = clock
 140  	ts.levels = make([]*tsLevel, len(resolutions))
 141  
 142  	for i := range resolutions {
 143  		if i > 0 && resolutions[i-1] >= resolutions[i] {
 144  			log.Print("timeseries: resolutions must be monotonically increasing")
 145  			break
 146  		}
 147  		newLevel := new(tsLevel)
 148  		newLevel.InitLevel(resolutions[i], ts.numBuckets, ts.provider)
 149  		ts.levels[i] = newLevel
 150  	}
 151  
 152  	ts.Clear()
 153  }
 154  
 155  // Clear removes all observations from the time series.
 156  func (ts *timeSeries) Clear() {
 157  	ts.lastAdd = time.Time{}
 158  	ts.total = ts.resetObservation(ts.total)
 159  	ts.pending = ts.resetObservation(ts.pending)
 160  	ts.pendingTime = time.Time{}
 161  	ts.dirty = false
 162  
 163  	for i := range ts.levels {
 164  		ts.levels[i].Clear()
 165  	}
 166  }
 167  
 168  // Add records an observation at the current time.
 169  func (ts *timeSeries) Add(observation Observable) {
 170  	ts.AddWithTime(observation, ts.clock.Time())
 171  }
 172  
 173  // AddWithTime records an observation at the specified time.
 174  func (ts *timeSeries) AddWithTime(observation Observable, t time.Time) {
 175  
 176  	smallBucketDuration := ts.levels[0].size
 177  
 178  	if t.After(ts.lastAdd) {
 179  		ts.lastAdd = t
 180  	}
 181  
 182  	if t.After(ts.pendingTime) {
 183  		ts.advance(t)
 184  		ts.mergePendingUpdates()
 185  		ts.pendingTime = ts.levels[0].end
 186  		ts.pending.CopyFrom(observation)
 187  		ts.dirty = true
 188  	} else if t.After(ts.pendingTime.Add(-1 * smallBucketDuration)) {
 189  		// The observation is close enough to go into the pending bucket.
 190  		// This compensates for clock skewing and small scheduling delays
 191  		// by letting the update stay in the fast path.
 192  		ts.pending.Add(observation)
 193  		ts.dirty = true
 194  	} else {
 195  		ts.mergeValue(observation, t)
 196  	}
 197  }
 198  
 199  // mergeValue inserts the observation at the specified time in the past into all levels.
 200  func (ts *timeSeries) mergeValue(observation Observable, t time.Time) {
 201  	for _, level := range ts.levels {
 202  		index := (ts.numBuckets - 1) - int(level.end.Sub(t)/level.size)
 203  		if 0 <= index && index < ts.numBuckets {
 204  			bucketNumber := (level.oldest + index) % ts.numBuckets
 205  			if level.buckets[bucketNumber] == nil {
 206  				level.buckets[bucketNumber] = level.provider()
 207  			}
 208  			level.buckets[bucketNumber].Add(observation)
 209  		}
 210  	}
 211  	ts.total.Add(observation)
 212  }
 213  
 214  // mergePendingUpdates applies the pending updates into all levels.
 215  func (ts *timeSeries) mergePendingUpdates() {
 216  	if ts.dirty {
 217  		ts.mergeValue(ts.pending, ts.pendingTime)
 218  		ts.pending = ts.resetObservation(ts.pending)
 219  		ts.dirty = false
 220  	}
 221  }
 222  
 223  // advance cycles the buckets at each level until the latest bucket in
 224  // each level can hold the time specified.
 225  func (ts *timeSeries) advance(t time.Time) {
 226  	if !t.After(ts.levels[0].end) {
 227  		return
 228  	}
 229  	for i := 0; i < len(ts.levels); i++ {
 230  		level := ts.levels[i]
 231  		if !level.end.Before(t) {
 232  			break
 233  		}
 234  
 235  		// If the time is sufficiently far, just clear the level and advance
 236  		// directly.
 237  		if !t.Before(level.end.Add(level.size * time.Duration(ts.numBuckets))) {
 238  			for _, b := range level.buckets {
 239  				ts.resetObservation(b)
 240  			}
 241  			level.end = time.Unix(0, (t.UnixNano()/level.size.Nanoseconds())*level.size.Nanoseconds())
 242  		}
 243  
 244  		for t.After(level.end) {
 245  			level.end = level.end.Add(level.size)
 246  			level.newest = level.oldest
 247  			level.oldest = (level.oldest + 1) % ts.numBuckets
 248  			ts.resetObservation(level.buckets[level.newest])
 249  		}
 250  
 251  		t = level.end
 252  	}
 253  }
 254  
 255  // Latest returns the sum of the num latest buckets from the level.
 256  func (ts *timeSeries) Latest(level, num int) Observable {
 257  	now := ts.clock.Time()
 258  	if ts.levels[0].end.Before(now) {
 259  		ts.advance(now)
 260  	}
 261  
 262  	ts.mergePendingUpdates()
 263  
 264  	result := ts.provider()
 265  	l := ts.levels[level]
 266  	index := l.newest
 267  
 268  	for i := 0; i < num; i++ {
 269  		if l.buckets[index] != nil {
 270  			result.Add(l.buckets[index])
 271  		}
 272  		if index == 0 {
 273  			index = ts.numBuckets
 274  		}
 275  		index--
 276  	}
 277  
 278  	return result
 279  }
 280  
 281  // LatestBuckets returns a copy of the num latest buckets from level.
 282  func (ts *timeSeries) LatestBuckets(level, num int) []Observable {
 283  	if level < 0 || level > len(ts.levels) {
 284  		log.Print("timeseries: bad level argument: ", level)
 285  		return nil
 286  	}
 287  	if num < 0 || num >= ts.numBuckets {
 288  		log.Print("timeseries: bad num argument: ", num)
 289  		return nil
 290  	}
 291  
 292  	results := make([]Observable, num)
 293  	now := ts.clock.Time()
 294  	if ts.levels[0].end.Before(now) {
 295  		ts.advance(now)
 296  	}
 297  
 298  	ts.mergePendingUpdates()
 299  
 300  	l := ts.levels[level]
 301  	index := l.newest
 302  
 303  	for i := 0; i < num; i++ {
 304  		result := ts.provider()
 305  		results[i] = result
 306  		if l.buckets[index] != nil {
 307  			result.CopyFrom(l.buckets[index])
 308  		}
 309  
 310  		if index == 0 {
 311  			index = ts.numBuckets
 312  		}
 313  		index -= 1
 314  	}
 315  	return results
 316  }
 317  
 318  // ScaleBy updates observations by scaling by factor.
 319  func (ts *timeSeries) ScaleBy(factor float64) {
 320  	for _, l := range ts.levels {
 321  		for i := 0; i < ts.numBuckets; i++ {
 322  			l.buckets[i].Multiply(factor)
 323  		}
 324  	}
 325  
 326  	ts.total.Multiply(factor)
 327  	ts.pending.Multiply(factor)
 328  }
 329  
 330  // Range returns the sum of observations added over the specified time range.
 331  // If start or finish times don't fall on bucket boundaries of the same
 332  // level, then return values are approximate answers.
 333  func (ts *timeSeries) Range(start, finish time.Time) Observable {
 334  	return ts.ComputeRange(start, finish, 1)[0]
 335  }
 336  
 337  // Recent returns the sum of observations from the last delta.
 338  func (ts *timeSeries) Recent(delta time.Duration) Observable {
 339  	now := ts.clock.Time()
 340  	return ts.Range(now.Add(-delta), now)
 341  }
 342  
 343  // Total returns the total of all observations.
 344  func (ts *timeSeries) Total() Observable {
 345  	ts.mergePendingUpdates()
 346  	return ts.total
 347  }
 348  
 349  // ComputeRange computes a specified number of values into a slice using
 350  // the observations recorded over the specified time period. The return
 351  // values are approximate if the start or finish times don't fall on the
 352  // bucket boundaries at the same level or if the number of buckets spanning
 353  // the range is not an integral multiple of num.
 354  func (ts *timeSeries) ComputeRange(start, finish time.Time, num int) []Observable {
 355  	if start.After(finish) {
 356  		log.Printf("timeseries: start > finish, %v>%v", start, finish)
 357  		return nil
 358  	}
 359  
 360  	if num < 0 {
 361  		log.Printf("timeseries: num < 0, %v", num)
 362  		return nil
 363  	}
 364  
 365  	results := make([]Observable, num)
 366  
 367  	for _, l := range ts.levels {
 368  		if !start.Before(l.end.Add(-l.size * time.Duration(ts.numBuckets))) {
 369  			ts.extract(l, start, finish, num, results)
 370  			return results
 371  		}
 372  	}
 373  
 374  	// Failed to find a level that covers the desired range. So just
 375  	// extract from the last level, even if it doesn't cover the entire
 376  	// desired range.
 377  	ts.extract(ts.levels[len(ts.levels)-1], start, finish, num, results)
 378  
 379  	return results
 380  }
 381  
 382  // RecentList returns the specified number of values in slice over the most
 383  // recent time period of the specified range.
 384  func (ts *timeSeries) RecentList(delta time.Duration, num int) []Observable {
 385  	if delta < 0 {
 386  		return nil
 387  	}
 388  	now := ts.clock.Time()
 389  	return ts.ComputeRange(now.Add(-delta), now, num)
 390  }
 391  
 392  // extract returns a slice of specified number of observations from a given
 393  // level over a given range.
 394  func (ts *timeSeries) extract(l *tsLevel, start, finish time.Time, num int, results []Observable) {
 395  	ts.mergePendingUpdates()
 396  
 397  	srcInterval := l.size
 398  	dstInterval := finish.Sub(start) / time.Duration(num)
 399  	dstStart := start
 400  	srcStart := l.end.Add(-srcInterval * time.Duration(ts.numBuckets))
 401  
 402  	srcIndex := 0
 403  
 404  	// Where should scanning start?
 405  	if dstStart.After(srcStart) {
 406  		advance := int(dstStart.Sub(srcStart) / srcInterval)
 407  		srcIndex += advance
 408  		srcStart = srcStart.Add(time.Duration(advance) * srcInterval)
 409  	}
 410  
 411  	// The i'th value is computed as show below.
 412  	// interval = (finish/start)/num
 413  	// i'th value = sum of observation in range
 414  	//   [ start + i       * interval,
 415  	//     start + (i + 1) * interval )
 416  	for i := 0; i < num; i++ {
 417  		results[i] = ts.resetObservation(results[i])
 418  		dstEnd := dstStart.Add(dstInterval)
 419  		for srcIndex < ts.numBuckets && srcStart.Before(dstEnd) {
 420  			srcEnd := srcStart.Add(srcInterval)
 421  			if srcEnd.After(ts.lastAdd) {
 422  				srcEnd = ts.lastAdd
 423  			}
 424  
 425  			if !srcEnd.Before(dstStart) {
 426  				srcValue := l.buckets[(srcIndex+l.oldest)%ts.numBuckets]
 427  				if !srcStart.Before(dstStart) && !srcEnd.After(dstEnd) {
 428  					// dst completely contains src.
 429  					if srcValue != nil {
 430  						results[i].Add(srcValue)
 431  					}
 432  				} else {
 433  					// dst partially overlaps src.
 434  					overlapStart := maxTime(srcStart, dstStart)
 435  					overlapEnd := minTime(srcEnd, dstEnd)
 436  					base := srcEnd.Sub(srcStart)
 437  					fraction := overlapEnd.Sub(overlapStart).Seconds() / base.Seconds()
 438  
 439  					used := ts.provider()
 440  					if srcValue != nil {
 441  						used.CopyFrom(srcValue)
 442  					}
 443  					used.Multiply(fraction)
 444  					results[i].Add(used)
 445  				}
 446  
 447  				if srcEnd.After(dstEnd) {
 448  					break
 449  				}
 450  			}
 451  			srcIndex++
 452  			srcStart = srcStart.Add(srcInterval)
 453  		}
 454  		dstStart = dstStart.Add(dstInterval)
 455  	}
 456  }
 457  
 458  // resetObservation clears the content so the struct may be reused.
 459  func (ts *timeSeries) resetObservation(observation Observable) Observable {
 460  	if observation == nil {
 461  		observation = ts.provider()
 462  	} else {
 463  		observation.Clear()
 464  	}
 465  	return observation
 466  }
 467  
 468  // TimeSeries tracks data at granularities from 1 second to 16 weeks.
 469  type TimeSeries struct {
 470  	timeSeries
 471  }
 472  
 473  // NewTimeSeries creates a new TimeSeries using the function provided for creating new Observable.
 474  func NewTimeSeries(f func() Observable) *TimeSeries {
 475  	return NewTimeSeriesWithClock(f, defaultClockInstance)
 476  }
 477  
 478  // NewTimeSeriesWithClock creates a new TimeSeries using the function provided for creating new Observable and the clock for
 479  // assigning timestamps.
 480  func NewTimeSeriesWithClock(f func() Observable, clock Clock) *TimeSeries {
 481  	ts := new(TimeSeries)
 482  	ts.timeSeries.init(timeSeriesResolutions, f, timeSeriesNumBuckets, clock)
 483  	return ts
 484  }
 485  
 486  // MinuteHourSeries tracks data at granularities of 1 minute and 1 hour.
 487  type MinuteHourSeries struct {
 488  	timeSeries
 489  }
 490  
 491  // NewMinuteHourSeries creates a new MinuteHourSeries using the function provided for creating new Observable.
 492  func NewMinuteHourSeries(f func() Observable) *MinuteHourSeries {
 493  	return NewMinuteHourSeriesWithClock(f, defaultClockInstance)
 494  }
 495  
 496  // NewMinuteHourSeriesWithClock creates a new MinuteHourSeries using the function provided for creating new Observable and the clock for
 497  // assigning timestamps.
 498  func NewMinuteHourSeriesWithClock(f func() Observable, clock Clock) *MinuteHourSeries {
 499  	ts := new(MinuteHourSeries)
 500  	ts.timeSeries.init(minuteHourSeriesResolutions, f,
 501  		minuteHourSeriesNumBuckets, clock)
 502  	return ts
 503  }
 504  
 505  func (ts *MinuteHourSeries) Minute() Observable {
 506  	return ts.timeSeries.Latest(0, 60)
 507  }
 508  
 509  func (ts *MinuteHourSeries) Hour() Observable {
 510  	return ts.timeSeries.Latest(1, 60)
 511  }
 512  
 513  func minTime(a, b time.Time) time.Time {
 514  	if a.Before(b) {
 515  		return a
 516  	}
 517  	return b
 518  }
 519  
 520  func maxTime(a, b time.Time) time.Time {
 521  	if a.After(b) {
 522  		return a
 523  	}
 524  	return b
 525  }
 526