eventual_consistency.go raw

   1  // Copyright (c) 2016, 2018, 2025, Oracle and/or its affiliates.  All rights reserved.
   2  // This software is dual-licensed to you under the Universal Permissive License (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose either license.
   3  
   4  package common
   5  
   6  import (
   7  	"bytes"
   8  	"errors"
   9  	"fmt"
  10  	"os"
  11  	"runtime"
  12  	"strconv"
  13  	"strings"
  14  	"sync"
  15  	"sync/atomic"
  16  	"time"
  17  
  18  	"github.com/gofrs/flock"
  19  )
  20  
  21  const (
  22  	// OciGoSdkEcConfigEnvVarName contains the name of the environment variable that can be used to configure the eventual consistency (EC) communication mode.
  23  	// Allowed values for environment variable:
  24  	// 1. OCI_GO_SDK_EC_CONFIG = "file,/path/to/shared/timestamp/file"
  25  	// 2. OCI_GO_SDK_EC_CONFIG = "inprocess"
  26  	// 3. absent -- same as OCI_GO_SDK_EC_CONFIG = "inprocess"
  27  	OciGoSdkEcConfigEnvVarName string = "OCI_GO_SDK_EC_CONFIG"
  28  )
  29  
  30  //
  31  // Eventual consistency communication mode
  32  //
  33  
  34  // EcMode is the eventual consistency (EC) communication mode used.
  35  type EcMode int64
  36  
  37  const (
  38  	// Uninitialized means the EC communication mode has not been set yet.
  39  	Uninitialized EcMode = iota // 0
  40  
  41  	// InProcess is the default EC communication mode which only communicates the end-of-window timestamp inside the same process.
  42  	InProcess
  43  
  44  	// File is the EC communication mode that uses a file to communicate the end-of-window timestamp using a file visible across processes.
  45  	// Locking is performed using a lock file.
  46  	File
  47  )
  48  
  49  var (
  50  	affectedByEventualConsistencyRetryStatusCodeMap = map[StatErrCode]bool{
  51  		{400, "RelatedResourceNotAuthorizedOrNotFound"}: true,
  52  		{404, "NotAuthorizedOrNotFound"}:                true,
  53  		{409, "NotAuthorizedOrResourceAlreadyExists"}:   true,
  54  		{409, "ResourceAlreadyExists"}:                  true,
  55  		{400, "InsufficientServicePermissions"}:         true,
  56  		{400, "ResourceDisabled"}:                       true,
  57  	}
  58  )
  59  
  60  // IsErrorAffectedByEventualConsistency returns true if the error is affected by eventual consistency.
  61  func IsErrorAffectedByEventualConsistency(Error error) bool {
  62  	if err, ok := IsServiceError(Error); ok {
  63  		return affectedByEventualConsistencyRetryStatusCodeMap[StatErrCode{err.GetHTTPStatusCode(), err.GetCode()}]
  64  	}
  65  	return false
  66  }
  67  
  68  func getEcMode(mode string) EcMode {
  69  	var lmode = strings.ToLower(mode)
  70  	switch lmode {
  71  	case "file":
  72  		return File
  73  	case "inprocess":
  74  		return InProcess
  75  	}
  76  	ecLogf("%s: Unknown ec mode '%s', assuming 'inprocess'", OciGoSdkEcConfigEnvVarName, mode)
  77  	return InProcess
  78  }
  79  
  80  // EventuallyConsistentContext contains the information about the end of the eventually consistent window.
  81  type EventuallyConsistentContext struct {
  82  	// memory-based
  83  	endOfWindow     atomic.Value
  84  	lock            sync.RWMutex
  85  	timeNowProvider func() time.Time
  86  
  87  	// mode selector
  88  	ecMode EcMode
  89  
  90  	// file-based
  91  
  92  	// timestampFileName and timestampLockFile should be set to files that
  93  	// are accessible by all processes that need to share information about
  94  	// eventual consistency.
  95  	// A sensible choice are files inside the temp directory, as returned by os.TempDir()
  96  	timestampFileName *string
  97  	timestampFileLock *flock.Flock
  98  
  99  	// lock and unlock functions
 100  	readLock    func(e *EventuallyConsistentContext) error
 101  	readUnlock  func(e *EventuallyConsistentContext) error
 102  	writeLock   func(e *EventuallyConsistentContext) error
 103  	writeUnlock func(e *EventuallyConsistentContext) error
 104  
 105  	// get/set functions
 106  	getEndOfWindowUnsynchronized func(e *EventuallyConsistentContext) (*time.Time, error)
 107  	setEndOfWindowUnsynchronized func(e *EventuallyConsistentContext, newEndOfWindowTime *time.Time) error
 108  }
 109  
 110  // newEcContext creates a new EC context based on the OCI_GO_SDK_EC_CONFIG environment variable.
 111  func newEcContext() *EventuallyConsistentContext {
 112  	ecConfig, ecConfigProvided := os.LookupEnv(OciGoSdkEcConfigEnvVarName)
 113  	if !ecConfigProvided {
 114  		ecConfig = ""
 115  	}
 116  
 117  	commaIndex := strings.Index(ecConfig, ",")
 118  	var ecConfigMode = ecConfig
 119  	var ecConfigRest = ""
 120  	if commaIndex >= 0 {
 121  		ecConfigMode = ecConfig[:commaIndex]
 122  		ecConfigRest = ecConfig[commaIndex+1:]
 123  	}
 124  	ecMode := getEcMode(ecConfigMode)
 125  
 126  	switch ecMode {
 127  	case File:
 128  		if len(ecConfigRest) < 1 {
 129  			ecLogf("%s: Expected file name after comma for 'File' mode ('file,/path/to/file'), was: '%s'", OciGoSdkEcConfigEnvVarName, ecConfig)
 130  			return nil
 131  		}
 132  		return newEcContextFile(ecConfigRest)
 133  	}
 134  
 135  	return newEcContextInProcess()
 136  }
 137  
 138  // newEcContextInProcess creates a new in-process EC context.
 139  func newEcContextInProcess() *EventuallyConsistentContext {
 140  	ecContext := EventuallyConsistentContext{
 141  		ecMode:                       InProcess,
 142  		readLock:                     ecInProcessReadLock,
 143  		readUnlock:                   ecInProcessReadUnlock,
 144  		writeLock:                    ecInProcessWriteLock,
 145  		writeUnlock:                  ecInProcessWriteUnlock,
 146  		getEndOfWindowUnsynchronized: ecInProcessGetEndOfWindowUnsynchronized,
 147  		setEndOfWindowUnsynchronized: ecInProcessSetEndOfWindowUnsynchronized,
 148  		timeNowProvider:              func() time.Time { return time.Now() },
 149  	}
 150  	return &ecContext
 151  }
 152  
 153  // newEcContextFile creates a new EC context kept in a file.
 154  // timestampFileName should be set to a file accessible by all processes that
 155  // need to share information about eventual consistency.
 156  // A sensible choice are files inside the temp directory, as returned by os.TempDir()
 157  // The lock file will use the same name, with the suffix ".lock" added.
 158  func newEcContextFile(timestampFileName string) *EventuallyConsistentContext {
 159  	timestampLockFileName := timestampFileName + ".lock"
 160  	ecContext := EventuallyConsistentContext{
 161  		ecMode:                       File,
 162  		readLock:                     ecFileReadLock,
 163  		readUnlock:                   ecFileReadUnlock,
 164  		writeLock:                    ecFileWriteLock,
 165  		writeUnlock:                  ecFileWriteUnlock,
 166  		getEndOfWindowUnsynchronized: ecFileGetEndOfWindowUnsynchronized,
 167  		setEndOfWindowUnsynchronized: ecFileSetEndOfWindowUnsynchronized,
 168  		timeNowProvider:              func() time.Time { return time.Now() },
 169  		timestampFileName:            &timestampFileName,
 170  		timestampFileLock:            flock.New(timestampLockFileName),
 171  	}
 172  	ecDebugf("%s: Using file modification time of file '%s' and lock file '%s'", OciGoSdkEcConfigEnvVarName, *ecContext.timestampFileName, timestampLockFileName)
 173  	return &ecContext
 174  }
 175  
 176  // InitializeEcContextFromEnvVar initializes the EcContext variable as configured
 177  // in the OCI_GO_SDK_EC_CONFIG environment variable.
 178  func InitializeEcContextFromEnvVar() {
 179  	EcContext = newEcContext()
 180  }
 181  
 182  // InitializeEcContextInProcess initializes the EcContext variable to be in-process only.
 183  func InitializeEcContextInProcess() {
 184  	EcContext = newEcContextInProcess()
 185  }
 186  
 187  // InitializeEcContextFile initializes the EcContext variable to be kept in a timestamp file,
 188  // protected by a lock file.
 189  // timestampFileName should be set to a file accessible by all processes that
 190  // need to share information about eventual consistency.
 191  // A sensible choice are files inside the temp directory, as returned by os.TempDir()
 192  // The lock file will use the same name, with the suffix ".lock" added.
 193  func InitializeEcContextFile(timestampFileName string) {
 194  	EcContext = newEcContextFile(timestampFileName)
 195  }
 196  
 197  //
 198  // InProcess functions
 199  //
 200  
 201  func ecInProcessReadLock(e *EventuallyConsistentContext) error {
 202  	e.lock.RLock()
 203  	return nil
 204  }
 205  
 206  func ecInProcessReadUnlock(e *EventuallyConsistentContext) error {
 207  	e.lock.RUnlock()
 208  	return nil
 209  }
 210  
 211  func ecInProcessWriteLock(e *EventuallyConsistentContext) error {
 212  	e.lock.Lock()
 213  	return nil
 214  }
 215  
 216  func ecInProcessWriteUnlock(e *EventuallyConsistentContext) error {
 217  	e.lock.Unlock()
 218  	return nil
 219  }
 220  
 221  // ecInProcessGetEndOfWindowUnsynchronized returns the end time of an eventually consistent window,
 222  // or nil if no eventually consistent requests were made.
 223  // There is no mutex synchronization.
 224  func ecInProcessGetEndOfWindowUnsynchronized(e *EventuallyConsistentContext) (*time.Time, error) {
 225  	untyped := e.endOfWindow.Load() // returns nil if there has been no call to Store for this Value
 226  	if untyped == nil {
 227  		return (*time.Time)(nil), nil
 228  	}
 229  	t := untyped.(*time.Time)
 230  
 231  	return t, nil
 232  }
 233  
 234  // ecInProcessSetEndOfWindowUnsynchronized sets the end time of the eventually consistent window.
 235  // There is no mutex synchronization.
 236  func ecInProcessSetEndOfWindowUnsynchronized(e *EventuallyConsistentContext, newEndOfWindowTime *time.Time) error {
 237  	e.endOfWindow.Store(newEndOfWindowTime) // atomically replace the current object with the new one
 238  	return nil
 239  }
 240  
 241  //
 242  // File functions
 243  //
 244  
 245  func ecFileReadLock(e *EventuallyConsistentContext) error {
 246  	return e.timestampFileLock.RLock()
 247  }
 248  
 249  func ecFileReadUnlock(e *EventuallyConsistentContext) error {
 250  	return e.timestampFileLock.Unlock()
 251  }
 252  
 253  func ecFileWriteLock(e *EventuallyConsistentContext) error {
 254  	return e.timestampFileLock.Lock()
 255  }
 256  
 257  func ecFileWriteUnlock(e *EventuallyConsistentContext) error {
 258  	return e.timestampFileLock.Unlock()
 259  }
 260  
 261  // ecFileGetEndOfWindowUnsynchronized returns the end time of an eventually consistent window,
 262  // or nil if no eventually consistent requests were made.
 263  // There is no mutex synchronization.
 264  func ecFileGetEndOfWindowUnsynchronized(e *EventuallyConsistentContext) (*time.Time, error) {
 265  	file, err := os.Stat(*e.timestampFileName)
 266  
 267  	if errors.Is(err, os.ErrNotExist) {
 268  		ecDebugf("%s: File '%s' does not exist, meaning no EC in effect", OciGoSdkEcConfigEnvVarName, *e.timestampFileName)
 269  		return (*time.Time)(nil), nil
 270  	}
 271  	if err != nil {
 272  		ecLogf("%s: Error getting modified time from file '%s', assuming no EC in effect: %s", OciGoSdkEcConfigEnvVarName, *e.timestampFileName, err)
 273  		return (*time.Time)(nil), err
 274  	}
 275  
 276  	t := file.ModTime()
 277  	ecDebugf("%s: Read modified time of file '%s' as '%s'", OciGoSdkEcConfigEnvVarName, *e.timestampFileName, t)
 278  
 279  	return &t, nil
 280  }
 281  
 282  // ecFileSetEndOfWindowUnsynchronized sets the end time of the eventually consistent window.
 283  // There is no mutex synchronization.
 284  func ecFileSetEndOfWindowUnsynchronized(e *EventuallyConsistentContext, newEndOfWindowTime *time.Time) error {
 285  	if newEndOfWindowTime != nil {
 286  		ecDebugf("%s: Updating modified time of file '%s' to '%s'", OciGoSdkEcConfigEnvVarName, *e.timestampFileName, *newEndOfWindowTime)
 287  	} else {
 288  		ecDebugf("%s: Updating modified time of file '%s' to <nil>", OciGoSdkEcConfigEnvVarName, *e.timestampFileName)
 289  	}
 290  
 291  	if newEndOfWindowTime == nil {
 292  		err := os.Remove(*e.timestampFileName)
 293  		if err != nil && !errors.Is(err, os.ErrNotExist) {
 294  			ecLogf("%s: Error removing file '%s', may draw wrong EC conflusions: %s", OciGoSdkEcConfigEnvVarName, *e.timestampFileName, err)
 295  		}
 296  		return err
 297  	}
 298  
 299  	atime := time.Now()
 300  	var err = os.Chtimes(*e.timestampFileName, atime, *newEndOfWindowTime)
 301  	if errors.Is(err, os.ErrNotExist) {
 302  		_, createErr := os.Create(*e.timestampFileName)
 303  		if createErr != nil {
 304  			ecLogf("%s: Error creating file '%s', will have to assume no EC in effect: %s", OciGoSdkEcConfigEnvVarName, *e.timestampFileName, createErr)
 305  			return createErr
 306  		}
 307  		err = os.Chtimes(*e.timestampFileName, atime, *newEndOfWindowTime)
 308  	}
 309  	if err != nil {
 310  		ecLogf("%s: Error changing modified time for file '%s', will have to assume no EC in effect: %s", OciGoSdkEcConfigEnvVarName, *e.timestampFileName, err)
 311  		return err
 312  	}
 313  	return nil
 314  }
 315  
 316  //
 317  // General functions for EC window handling, for all EC communication modes
 318  //
 319  
 320  // GetEndOfWindow returns the end time an eventually consistent window,
 321  // or nil if no eventually consistent requests were made
 322  func (e *EventuallyConsistentContext) GetEndOfWindow() *time.Time {
 323  	e.readLock(e) // synchronize with potential writers
 324  	defer e.readUnlock(e)
 325  
 326  	endOfWindowTime, _ := e.getEndOfWindowUnsynchronized(e)
 327  
 328  	// TODO: this is noisy logging, consider removing
 329  	if endOfWindowTime != nil {
 330  		ecDebugln(fmt.Sprintf("EcContext.GetEndOfWindow returns %s", endOfWindowTime))
 331  	} else {
 332  		ecDebugln("EcContext.GetEndOfWindow returns <nil>")
 333  	}
 334  
 335  	return endOfWindowTime
 336  }
 337  
 338  // UpdateEndOfWindow sets the end time of the eventually consistent window the specified
 339  // duration into the future
 340  func (e *EventuallyConsistentContext) UpdateEndOfWindow(windowSize time.Duration) *time.Time {
 341  	e.writeLock(e) // synchronize with other potential writers
 342  	defer e.writeUnlock(e)
 343  
 344  	currentEndOfWindowTime, _ := e.getEndOfWindowUnsynchronized(e)
 345  	var newEndOfWindowTime = e.timeNowProvider().Add(windowSize)
 346  	if currentEndOfWindowTime == nil || newEndOfWindowTime.After(*currentEndOfWindowTime) {
 347  		e.setEndOfWindowUnsynchronized(e, &newEndOfWindowTime)
 348  
 349  		// TODO: this is noisy logging, consider removing
 350  		ecDebugln(fmt.Sprintf("EcContext.UpdateEndOfWindow to %s", newEndOfWindowTime))
 351  
 352  		return &newEndOfWindowTime
 353  	}
 354  	return currentEndOfWindowTime
 355  }
 356  
 357  // setEndTimeOfEventuallyConsistentWindow sets the last time an eventually consistent request was made
 358  // to the specified time
 359  func (e *EventuallyConsistentContext) setEndOfWindow(newTime *time.Time) *time.Time {
 360  	e.writeLock(e) // synchronize with other potential writers
 361  	defer e.writeUnlock(e)
 362  
 363  	e.setEndOfWindowUnsynchronized(e, newTime)
 364  
 365  	// TODO: this is noisy logging, consider removing
 366  	if newTime != nil {
 367  		ecDebugln(fmt.Sprintf("EcContext.setEndOfWindow to %s", *newTime))
 368  	} else {
 369  		ecDebugln("EcContext.setEndOfWindow to <nil>")
 370  	}
 371  
 372  	return newTime
 373  }
 374  
 375  // EcContext contains the information about the end of the eventually consistent window for this process.
 376  var EcContext = newEcContext()
 377  
 378  //
 379  // Logging helpers
 380  //
 381  
 382  // getGID returns the Goroutine id. This is purely for logging and debugging.
 383  // See https://blog.sgmansfield.com/2015/12/goroutine-ids/
 384  func getGID() uint64 {
 385  	b := make([]byte, 64)
 386  	b = b[:runtime.Stack(b, false)]
 387  	b = bytes.TrimPrefix(b, []byte("goroutine "))
 388  	b = b[:bytes.IndexByte(b, ' ')]
 389  	n, _ := strconv.ParseUint(string(b), 10, 64)
 390  	return n
 391  }
 392  
 393  // some of these errors happen so early, defaultLogger may not have been
 394  // initialized yet.
 395  func initLogIfNecessary() {
 396  	if defaultLogger == nil {
 397  		l, _ := NewSDKLogger()
 398  		SetSDKLogger(l)
 399  	}
 400  }
 401  
 402  // Debugf logs v with the provided format if debug mode is set.
 403  // There is no mutex synchronization. You should have acquired e.lock first.
 404  func ecDebugf(format string, v ...interface{}) {
 405  	defer func() {
 406  		// recover from panic if one occured.
 407  		if recover() != nil {
 408  			Debugln("ecDebugf failed")
 409  		}
 410  	}()
 411  
 412  	str := fmt.Sprintf(format, v...)
 413  
 414  	initLogIfNecessary()
 415  
 416  	// prefix message with "(pid=25140, gid=5)"
 417  	Debugf("(pid=%d, gid=%d) %s", os.Getpid(), getGID(), str)
 418  }
 419  
 420  // Debug logs v if debug mode is set.
 421  // There is no mutex synchronization. You should have acquired e.lock first.
 422  func ecDebug(v ...interface{}) {
 423  	defer func() {
 424  		// recover from panic if one occured.
 425  		if recover() != nil {
 426  			Debugln("ecDebug failed")
 427  		}
 428  	}()
 429  
 430  	initLogIfNecessary()
 431  
 432  	// prefix message with "(pid=25140, gid=5)"
 433  	Debug(append([]interface{}{"(pid=", os.Getpid(), ", gid=", getGID(), ") "}, v...)...)
 434  }
 435  
 436  // Debugln logs v appending a new line if debug mode is set
 437  // There is no mutex synchronization. You should have acquired e.lock first.
 438  func ecDebugln(v ...interface{}) {
 439  	defer func() {
 440  		// recover from panic if one occured.
 441  		if recover() != nil {
 442  			Debugln("ecDebugln failed")
 443  		}
 444  	}()
 445  
 446  	initLogIfNecessary()
 447  
 448  	// prefix message with "(pid=25140, gid=5)"
 449  	Debugln(append([]interface{}{"(pid=", os.Getpid(), ", gid=", getGID(), ") "}, v...)...)
 450  }
 451  
 452  // Logf logs v with the provided format if info mode is set.
 453  // There is no mutex synchronization. You should have acquired e.lock first.
 454  func ecLogf(format string, v ...interface{}) {
 455  	defer func() {
 456  		// recover from panic if one occured.
 457  		if recover() != nil {
 458  			Debugln("ecLogf failed")
 459  		}
 460  	}()
 461  
 462  	initLogIfNecessary()
 463  
 464  	str := fmt.Sprintf(format, v...)
 465  	// prefix message with "(pid=25140, gid=5)"
 466  	Logf("(pid=%d, gid=%d) %s", os.Getpid(), getGID(), str)
 467  }
 468