waitfor.go raw

   1  package linodego
   2  
   3  import (
   4  	"context"
   5  	"fmt"
   6  	"log"
   7  	"net/http"
   8  	"slices"
   9  	"strconv"
  10  	"time"
  11  
  12  	"golang.org/x/text/cases"
  13  	"golang.org/x/text/language"
  14  )
  15  
  16  var englishTitle = cases.Title(language.English)
  17  
  18  type EventPoller struct {
  19  	EntityID   any
  20  	EntityType EntityType
  21  
  22  	// Type is excluded here because it is implicitly determined
  23  	// by the event action.
  24  	SecondaryEntityID any
  25  
  26  	Action EventAction
  27  
  28  	client         Client
  29  	previousEvents map[int]bool
  30  }
  31  
  32  // WaitForInstanceStatus waits for the Linode instance to reach the desired state
  33  // before returning. It will timeout with an error after timeoutSeconds.
  34  func (client Client) WaitForInstanceStatus(ctx context.Context, instanceID int, status InstanceStatus, timeoutSeconds int) (*Instance, error) {
  35  	ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSeconds)*time.Second)
  36  	defer cancel()
  37  
  38  	ticker := time.NewTicker(client.pollInterval)
  39  	defer ticker.Stop()
  40  
  41  	for {
  42  		select {
  43  		case <-ticker.C:
  44  			instance, err := client.GetInstance(ctx, instanceID)
  45  			if err != nil {
  46  				return instance, err
  47  			}
  48  
  49  			complete := (instance.Status == status)
  50  
  51  			if complete {
  52  				return instance, nil
  53  			}
  54  		case <-ctx.Done():
  55  			return nil, fmt.Errorf("Error waiting for Instance %d status %s: %w", instanceID, status, ctx.Err())
  56  		}
  57  	}
  58  }
  59  
  60  // WaitForInstanceDiskStatus waits for the Linode instance disk to reach the desired state
  61  // before returning. It will timeout with an error after timeoutSeconds.
  62  func (client Client) WaitForInstanceDiskStatus(ctx context.Context, instanceID int, diskID int, status DiskStatus, timeoutSeconds int) (*InstanceDisk, error) {
  63  	ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSeconds)*time.Second)
  64  	defer cancel()
  65  
  66  	ticker := time.NewTicker(client.pollInterval)
  67  	defer ticker.Stop()
  68  
  69  	for {
  70  		select {
  71  		case <-ticker.C:
  72  			// GetInstanceDisk will 404 on newly created disks. use List instead.
  73  			// disk, err := client.GetInstanceDisk(ctx, instanceID, diskID)
  74  			disks, err := client.ListInstanceDisks(ctx, instanceID, nil)
  75  			if err != nil {
  76  				return nil, err
  77  			}
  78  
  79  			for _, disk := range disks {
  80  				if disk.ID == diskID {
  81  					complete := (disk.Status == status)
  82  					if complete {
  83  						return &disk, nil
  84  					}
  85  
  86  					break
  87  				}
  88  			}
  89  		case <-ctx.Done():
  90  			return nil, fmt.Errorf("Error waiting for Instance %d Disk %d status %s: %w", instanceID, diskID, status, ctx.Err())
  91  		}
  92  	}
  93  }
  94  
  95  // WaitForVolumeStatus waits for the Volume to reach the desired state
  96  // before returning. It will timeout with an error after timeoutSeconds.
  97  func (client Client) WaitForVolumeStatus(ctx context.Context, volumeID int, status VolumeStatus, timeoutSeconds int) (*Volume, error) {
  98  	ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSeconds)*time.Second)
  99  	defer cancel()
 100  
 101  	ticker := time.NewTicker(client.pollInterval)
 102  	defer ticker.Stop()
 103  
 104  	for {
 105  		select {
 106  		case <-ticker.C:
 107  			volume, err := client.GetVolume(ctx, volumeID)
 108  			if err != nil {
 109  				return volume, err
 110  			}
 111  
 112  			complete := (volume.Status == status)
 113  
 114  			if complete {
 115  				return volume, nil
 116  			}
 117  		case <-ctx.Done():
 118  			return nil, fmt.Errorf("Error waiting for Volume %d status %s: %w", volumeID, status, ctx.Err())
 119  		}
 120  	}
 121  }
 122  
 123  // WaitForSnapshotStatus waits for the Snapshot to reach the desired state
 124  // before returning. It will timeout with an error after timeoutSeconds.
 125  func (client Client) WaitForSnapshotStatus(
 126  	ctx context.Context,
 127  	instanceID int,
 128  	snapshotID int,
 129  	status InstanceSnapshotStatus,
 130  	timeoutSeconds int,
 131  ) (*InstanceSnapshot, error) {
 132  	ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSeconds)*time.Second)
 133  	defer cancel()
 134  
 135  	ticker := time.NewTicker(client.pollInterval)
 136  	defer ticker.Stop()
 137  
 138  	for {
 139  		select {
 140  		case <-ticker.C:
 141  			snapshot, err := client.GetInstanceSnapshot(ctx, instanceID, snapshotID)
 142  			if err != nil {
 143  				return snapshot, err
 144  			}
 145  
 146  			complete := (snapshot.Status == status)
 147  
 148  			if complete {
 149  				return snapshot, nil
 150  			}
 151  		case <-ctx.Done():
 152  			return nil, fmt.Errorf("Error waiting for Instance %d Snapshot %d status %s: %w", instanceID, snapshotID, status, ctx.Err())
 153  		}
 154  	}
 155  }
 156  
 157  // WaitForVolumeLinodeID waits for the Volume to match the desired LinodeID
 158  // before returning. An active Instance will not immediately attach or detach a volume, so
 159  // the LinodeID must be polled to determine volume readiness from the API.
 160  // WaitForVolumeLinodeID will timeout with an error after timeoutSeconds.
 161  func (client Client) WaitForVolumeLinodeID(ctx context.Context, volumeID int, linodeID *int, timeoutSeconds int) (*Volume, error) {
 162  	ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSeconds)*time.Second)
 163  	defer cancel()
 164  
 165  	ticker := time.NewTicker(client.pollInterval)
 166  	defer ticker.Stop()
 167  
 168  	for {
 169  		select {
 170  		case <-ticker.C:
 171  			volume, err := client.GetVolume(ctx, volumeID)
 172  			if err != nil {
 173  				return volume, err
 174  			}
 175  
 176  			switch {
 177  			case linodeID == nil && volume.LinodeID == nil:
 178  				return volume, nil
 179  			case linodeID == nil || volume.LinodeID == nil:
 180  				// continue waiting
 181  			case *volume.LinodeID == *linodeID:
 182  				return volume, nil
 183  			}
 184  		case <-ctx.Done():
 185  			return nil, fmt.Errorf("Error waiting for Volume %d to have Instance %v: %w", volumeID, linodeID, ctx.Err())
 186  		}
 187  	}
 188  }
 189  
 190  // WaitForLKEClusterStatus waits for the LKECluster to reach the desired state
 191  // before returning. It will timeout with an error after timeoutSeconds.
 192  func (client Client) WaitForLKEClusterStatus(ctx context.Context, clusterID int, status LKEClusterStatus, timeoutSeconds int) (*LKECluster, error) {
 193  	ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSeconds)*time.Second)
 194  	defer cancel()
 195  
 196  	ticker := time.NewTicker(client.pollInterval)
 197  	defer ticker.Stop()
 198  
 199  	for {
 200  		select {
 201  		case <-ticker.C:
 202  			cluster, err := client.GetLKECluster(ctx, clusterID)
 203  			if err != nil {
 204  				return cluster, err
 205  			}
 206  
 207  			complete := (cluster.Status == status)
 208  
 209  			if complete {
 210  				return cluster, nil
 211  			}
 212  		case <-ctx.Done():
 213  			return nil, fmt.Errorf("Error waiting for Cluster %d status %s: %w", clusterID, status, ctx.Err())
 214  		}
 215  	}
 216  }
 217  
 218  // LKEClusterPollOptions configures polls against LKE Clusters.
 219  type LKEClusterPollOptions struct {
 220  	// Retry will cause the Poll to ignore interimittent errors
 221  	Retry bool
 222  
 223  	// TimeoutSeconds is the number of Seconds to wait for the poll to succeed
 224  	// before exiting.
 225  	TimeoutSeconds int
 226  
 227  	// TansportWrapper allows adding a transport middleware function that will
 228  	// wrap the LKE Cluster client's underlying http.RoundTripper.
 229  	TransportWrapper func(http.RoundTripper) http.RoundTripper
 230  }
 231  
 232  type ClusterConditionOptions struct {
 233  	LKEClusterKubeconfig *LKEClusterKubeconfig
 234  	TransportWrapper     func(http.RoundTripper) http.RoundTripper
 235  }
 236  
 237  // ClusterConditionFunc represents a function that tests a condition against an LKE cluster,
 238  // returns true if the condition has been reached, false if it has not yet been reached.
 239  type ClusterConditionFunc func(context.Context, ClusterConditionOptions) (bool, error)
 240  
 241  // WaitForLKEClusterConditions waits for the given LKE conditions to be true
 242  func (client Client) WaitForLKEClusterConditions(
 243  	ctx context.Context,
 244  	clusterID int,
 245  	options LKEClusterPollOptions,
 246  	conditions ...ClusterConditionFunc,
 247  ) error {
 248  	ctx, cancel := context.WithCancel(ctx)
 249  	if options.TimeoutSeconds != 0 {
 250  		ctx, cancel = context.WithTimeout(ctx, time.Duration(options.TimeoutSeconds)*time.Second)
 251  	}
 252  	defer cancel()
 253  
 254  	lkeKubeConfig, err := client.GetLKEClusterKubeconfig(ctx, clusterID)
 255  	if err != nil {
 256  		return fmt.Errorf("failed to get Kubeconfig for LKE cluster %d: %w", clusterID, err)
 257  	}
 258  
 259  	ticker := time.NewTicker(client.pollInterval)
 260  	defer ticker.Stop()
 261  
 262  	conditionOptions := ClusterConditionOptions{LKEClusterKubeconfig: lkeKubeConfig, TransportWrapper: options.TransportWrapper}
 263  
 264  	for _, condition := range conditions {
 265  	ConditionSucceeded:
 266  		for {
 267  			select {
 268  			case <-ticker.C:
 269  				result, err := condition(ctx, conditionOptions)
 270  				if err != nil {
 271  					log.Printf("[WARN] Ignoring WaitForLKEClusterConditions conditional error: %s", err)
 272  
 273  					if !options.Retry {
 274  						return err
 275  					}
 276  				}
 277  
 278  				if result {
 279  					break ConditionSucceeded
 280  				}
 281  
 282  			case <-ctx.Done():
 283  				return fmt.Errorf("Error waiting for cluster %d conditions: %w", clusterID, ctx.Err())
 284  			}
 285  		}
 286  	}
 287  
 288  	return nil
 289  }
 290  
 291  // WaitForEventFinished waits for an entity action to reach the 'finished' state
 292  // before returning. It will timeout with an error after timeoutSeconds.
 293  // If the event indicates a failure both the failed event and the error will be returned.
 294  // nolint
 295  func (client Client) WaitForEventFinished(
 296  	ctx context.Context,
 297  	id any,
 298  	entityType EntityType,
 299  	action EventAction,
 300  	minStart time.Time,
 301  	timeoutSeconds int,
 302  ) (*Event, error) {
 303  	titledEntityType := englishTitle.String(string(entityType))
 304  	filter := Filter{
 305  		Order:   Descending,
 306  		OrderBy: "created",
 307  	}
 308  	filter.AddField(Eq, "action", action)
 309  	filter.AddField(Gte, "created", minStart.UTC().Format("2006-01-02T15:04:05"))
 310  
 311  	// Optimistically restrict results to page 1.  We should remove this when more
 312  	// precise filtering options exist.
 313  	pages := 1
 314  
 315  	// The API has limitted filtering support for Event ID and Event Type
 316  	// Optimize the list, if possible
 317  	switch entityType {
 318  	case EntityDisk, EntityDatabase, EntityLinode, EntityDomain, EntityNodebalancer:
 319  		// All of the filter supported types have int ids
 320  		filterableEntityID, err := strconv.Atoi(fmt.Sprintf("%v", id))
 321  		if err != nil {
 322  			return nil, fmt.Errorf("error parsing Entity ID %q for optimized "+
 323  				"WaitForEventFinished EventType %q: %w", id, entityType, err)
 324  		}
 325  		filter.AddField(Eq, "entity.id", filterableEntityID)
 326  		filter.AddField(Eq, "entity.type", entityType)
 327  	}
 328  
 329  	ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSeconds)*time.Second)
 330  	defer cancel()
 331  
 332  	if deadline, ok := ctx.Deadline(); ok {
 333  		duration := time.Until(deadline)
 334  		log.Printf("[INFO] Waiting %d seconds for %s events since %v for %s %v", int(duration.Seconds()), action, minStart, titledEntityType, id)
 335  	}
 336  
 337  	ticker := time.NewTicker(client.pollInterval)
 338  
 339  	// avoid repeating log messages
 340  	nextLog := ""
 341  	lastLog := ""
 342  	lastEventID := 0
 343  
 344  	defer ticker.Stop()
 345  	for {
 346  		select {
 347  		case <-ticker.C:
 348  			if lastEventID > 0 {
 349  				filter.AddField(Gte, "id", lastEventID)
 350  			}
 351  
 352  			filterStr, err := filter.MarshalJSON()
 353  			if err != nil {
 354  				return nil, err
 355  			}
 356  
 357  			listOptions := NewListOptions(pages, string(filterStr))
 358  
 359  			events, err := client.ListEvents(ctx, listOptions)
 360  			if err != nil {
 361  				return nil, err
 362  			}
 363  
 364  			// If there are events for this instance + action, inspect them
 365  			for _, event := range events {
 366  				event := event
 367  
 368  				if event.Entity == nil || event.Entity.Type != entityType {
 369  					// log.Println("type mismatch", event.Entity.Type, entityType)
 370  					continue
 371  				}
 372  
 373  				var entID string
 374  
 375  				switch id := event.Entity.ID.(type) {
 376  				case float64, float32:
 377  					entID = fmt.Sprintf("%.f", id)
 378  				case int:
 379  					entID = strconv.Itoa(id)
 380  				default:
 381  					entID = fmt.Sprintf("%v", id)
 382  				}
 383  
 384  				var findID string
 385  				switch id := id.(type) {
 386  				case float64, float32:
 387  					findID = fmt.Sprintf("%.f", id)
 388  				case int:
 389  					findID = strconv.Itoa(id)
 390  				default:
 391  					findID = fmt.Sprintf("%v", id)
 392  				}
 393  
 394  				if entID != findID {
 395  					// log.Println("id mismatch", entID, findID)
 396  					continue
 397  				}
 398  
 399  				if event.Created == nil {
 400  					log.Printf("[WARN] event.Created is nil when API returned: %#+v", event.Created)
 401  				}
 402  
 403  				// This is the event we are looking for. Save our place.
 404  				if lastEventID == 0 {
 405  					lastEventID = event.ID
 406  				}
 407  
 408  				switch event.Status {
 409  				case EventFailed:
 410  					return &event, fmt.Errorf("%s %v action %s failed", titledEntityType, id, action)
 411  				case EventFinished:
 412  					log.Printf("[INFO] %s %v action %s is finished", titledEntityType, id, action)
 413  					return &event, nil
 414  				}
 415  
 416  				nextLog = fmt.Sprintf("[INFO] %s %v action %s is %s", titledEntityType, id, action, event.Status)
 417  			}
 418  
 419  			// de-dupe logging statements
 420  			if nextLog != lastLog {
 421  				log.Print(nextLog)
 422  				lastLog = nextLog
 423  			}
 424  		case <-ctx.Done():
 425  			return nil, fmt.Errorf("Error waiting for Event Status '%s' of %s %v action '%s': %w", EventFinished, titledEntityType, id, action, ctx.Err())
 426  		}
 427  	}
 428  }
 429  
 430  // WaitForImageStatus waits for the Image to reach the desired state
 431  // before returning. It will timeout with an error after timeoutSeconds.
 432  func (client Client) WaitForImageStatus(ctx context.Context, imageID string, status ImageStatus, timeoutSeconds int) (*Image, error) {
 433  	ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSeconds)*time.Second)
 434  	defer cancel()
 435  
 436  	ticker := time.NewTicker(client.pollInterval)
 437  	defer ticker.Stop()
 438  
 439  	for {
 440  		select {
 441  		case <-ticker.C:
 442  			image, err := client.GetImage(ctx, imageID)
 443  			if err != nil {
 444  				return image, err
 445  			}
 446  
 447  			complete := image.Status == status
 448  
 449  			if complete {
 450  				return image, nil
 451  			}
 452  		case <-ctx.Done():
 453  			return nil, fmt.Errorf("failed to wait for Image %s status %s: %w", imageID, status, ctx.Err())
 454  		}
 455  	}
 456  }
 457  
 458  // WaitForImageRegionStatus waits for an Image's replica to reach the desired state
 459  // before returning.
 460  func (client Client) WaitForImageRegionStatus(ctx context.Context, imageID, region string, status ImageRegionStatus) (*Image, error) {
 461  	ticker := time.NewTicker(client.pollInterval)
 462  	defer ticker.Stop()
 463  
 464  	for {
 465  		select {
 466  		case <-ticker.C:
 467  			image, err := client.GetImage(ctx, imageID)
 468  			if err != nil {
 469  				return image, err
 470  			}
 471  
 472  			replicaIdx := slices.IndexFunc(
 473  				image.Regions,
 474  				func(r ImageRegion) bool {
 475  					return r.Region == region
 476  				},
 477  			)
 478  
 479  			// If no replica was found or the status doesn't match, try again
 480  			if replicaIdx < 0 || image.Regions[replicaIdx].Status != status {
 481  				continue
 482  			}
 483  
 484  			return image, nil
 485  
 486  		case <-ctx.Done():
 487  			return nil, fmt.Errorf("failed to wait for Image %s status %s: %w", imageID, status, ctx.Err())
 488  		}
 489  	}
 490  }
 491  
 492  // WaitForMySQLDatabaseBackup waits for the backup with the given label to be available.
 493  //
 494  // Deprecated: WaitForMySQLDatabaseBackup is a deprecated method, as the backup endpoints are no longer supported in DBaaS V2.
 495  // In DBaaS V2, databases can be backed up via database forking.
 496  func (client Client) WaitForMySQLDatabaseBackup(ctx context.Context, dbID int, label string, timeoutSeconds int) (*MySQLDatabaseBackup, error) {
 497  	ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSeconds)*time.Second)
 498  	defer cancel()
 499  
 500  	ticker := time.NewTicker(client.pollInterval)
 501  	defer ticker.Stop()
 502  
 503  	for {
 504  		select {
 505  		case <-ticker.C:
 506  			backups, err := client.ListMySQLDatabaseBackups(ctx, dbID, nil)
 507  			if err != nil {
 508  				return nil, err
 509  			}
 510  
 511  			for _, backup := range backups {
 512  				if backup.Label == label {
 513  					return &backup, nil
 514  				}
 515  			}
 516  		case <-ctx.Done():
 517  			return nil, fmt.Errorf("failed to wait for backup %s: %w", label, ctx.Err())
 518  		}
 519  	}
 520  }
 521  
 522  // WaitForPostgresDatabaseBackup waits for the backup with the given label to be available.
 523  //
 524  // Deprecated: WaitForPostgresDatabaseBackup is a deprecated method, as the backup endpoints are no longer supported in DBaaS V2.
 525  // In DBaaS V2, databases can be backed up via database forking.
 526  func (client Client) WaitForPostgresDatabaseBackup(ctx context.Context, dbID int, label string, timeoutSeconds int) (*PostgresDatabaseBackup, error) {
 527  	ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSeconds)*time.Second)
 528  	defer cancel()
 529  
 530  	ticker := time.NewTicker(client.pollInterval)
 531  	defer ticker.Stop()
 532  
 533  	for {
 534  		select {
 535  		case <-ticker.C:
 536  			backups, err := client.ListPostgresDatabaseBackups(ctx, dbID, nil)
 537  			if err != nil {
 538  				return nil, err
 539  			}
 540  
 541  			for _, backup := range backups {
 542  				if backup.Label == label {
 543  					return &backup, nil
 544  				}
 545  			}
 546  		case <-ctx.Done():
 547  			return nil, fmt.Errorf("failed to wait for backup %s: %w", label, ctx.Err())
 548  		}
 549  	}
 550  }
 551  
 552  type databaseStatusFunc func(ctx context.Context, client Client, dbID int) (DatabaseStatus, error)
 553  
 554  var databaseStatusHandlers = map[DatabaseEngineType]databaseStatusFunc{
 555  	DatabaseEngineTypeMySQL: func(ctx context.Context, client Client, dbID int) (DatabaseStatus, error) {
 556  		db, err := client.GetMySQLDatabase(ctx, dbID)
 557  		if err != nil {
 558  			return "", err
 559  		}
 560  
 561  		return db.Status, nil
 562  	},
 563  	DatabaseEngineTypePostgres: func(ctx context.Context, client Client, dbID int) (DatabaseStatus, error) {
 564  		db, err := client.GetPostgresDatabase(ctx, dbID)
 565  		if err != nil {
 566  			return "", err
 567  		}
 568  
 569  		return db.Status, nil
 570  	},
 571  }
 572  
 573  // WaitForDatabaseStatus waits for the provided database to have the given status.
 574  func (client Client) WaitForDatabaseStatus(
 575  	ctx context.Context, dbID int, dbEngine DatabaseEngineType, status DatabaseStatus, timeoutSeconds int,
 576  ) error {
 577  	ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSeconds)*time.Second)
 578  	defer cancel()
 579  
 580  	ticker := time.NewTicker(client.pollInterval)
 581  	defer ticker.Stop()
 582  
 583  	for {
 584  		select {
 585  		case <-ticker.C:
 586  			statusHandler, ok := databaseStatusHandlers[dbEngine]
 587  			if !ok {
 588  				return fmt.Errorf("invalid db engine: %s", dbEngine)
 589  			}
 590  
 591  			currentStatus, err := statusHandler(ctx, client, dbID)
 592  			if err != nil {
 593  				return fmt.Errorf("failed to get db status: %w", err)
 594  			}
 595  
 596  			if currentStatus == status {
 597  				return nil
 598  			}
 599  		case <-ctx.Done():
 600  			return fmt.Errorf("failed to wait for database %d status: %w", dbID, ctx.Err())
 601  		}
 602  	}
 603  }
 604  
 605  // NewEventPoller initializes a new Linode event poller. This should be run before the event is triggered as it stores
 606  // the previous state of the entity's events.
 607  func (client Client) NewEventPoller(
 608  	ctx context.Context, id any, entityType EntityType, action EventAction,
 609  ) (*EventPoller, error) {
 610  	result := EventPoller{
 611  		EntityID:   id,
 612  		EntityType: entityType,
 613  		Action:     action,
 614  
 615  		client: client,
 616  	}
 617  
 618  	if err := result.PreTask(ctx); err != nil {
 619  		return nil, fmt.Errorf("failed to run pretask: %w", err)
 620  	}
 621  
 622  	return &result, nil
 623  }
 624  
 625  // NewEventPollerWithSecondary initializes a new Linode event poller with for events with a
 626  // specific secondary entity.
 627  func (client Client) NewEventPollerWithSecondary(
 628  	ctx context.Context, id any, primaryEntityType EntityType, secondaryID int, action EventAction,
 629  ) (*EventPoller, error) {
 630  	poller, err := client.NewEventPoller(ctx, id, primaryEntityType, action)
 631  	if err != nil {
 632  		return nil, err
 633  	}
 634  
 635  	poller.SecondaryEntityID = secondaryID
 636  
 637  	return poller, nil
 638  }
 639  
 640  // NewEventPollerWithoutEntity initializes a new Linode event poller without a target entity ID.
 641  // This is useful for create events where the ID of the entity is not yet known.
 642  // For example:
 643  // p, _ := client.NewEventPollerWithoutEntity(...)
 644  // inst, _ := client.CreateInstance(...)
 645  // p.EntityID = inst.ID
 646  // ...
 647  func (client Client) NewEventPollerWithoutEntity(entityType EntityType, action EventAction) (*EventPoller, error) {
 648  	result := EventPoller{
 649  		EntityType:     entityType,
 650  		Action:         action,
 651  		EntityID:       0,
 652  		previousEvents: make(map[int]bool, 0),
 653  
 654  		client: client,
 655  	}
 656  
 657  	return &result, nil
 658  }
 659  
 660  // PreTask stores all current events for the given entity to prevent them from being
 661  // processed on subsequent runs.
 662  func (p *EventPoller) PreTask(ctx context.Context) error {
 663  	f := Filter{
 664  		OrderBy: "created",
 665  		Order:   Descending,
 666  	}
 667  	f.AddField(Eq, "entity.type", p.EntityType)
 668  	f.AddField(Eq, "entity.id", p.EntityID)
 669  	f.AddField(Eq, "action", p.Action)
 670  
 671  	fBytes, err := f.MarshalJSON()
 672  	if err != nil {
 673  		return err
 674  	}
 675  
 676  	events, err := p.client.ListEvents(ctx, &ListOptions{
 677  		Filter:      string(fBytes),
 678  		PageOptions: &PageOptions{Page: 1},
 679  	})
 680  	if err != nil {
 681  		return fmt.Errorf("failed to list events: %w", err)
 682  	}
 683  
 684  	eventIDs := make(map[int]bool, len(events))
 685  	for _, event := range events {
 686  		eventIDs[event.ID] = true
 687  	}
 688  
 689  	p.previousEvents = eventIDs
 690  
 691  	return nil
 692  }
 693  
 694  func (p *EventPoller) WaitForLatestUnknownEvent(ctx context.Context) (*Event, error) {
 695  	ticker := time.NewTicker(p.client.pollInterval)
 696  	defer ticker.Stop()
 697  
 698  	f := Filter{
 699  		OrderBy: "created",
 700  		Order:   Descending,
 701  	}
 702  	f.AddField(Eq, "entity.type", p.EntityType)
 703  	f.AddField(Eq, "entity.id", p.EntityID)
 704  	f.AddField(Eq, "action", p.Action)
 705  
 706  	fBytes, err := f.MarshalJSON()
 707  	if err != nil {
 708  		return nil, err
 709  	}
 710  
 711  	listOpts := ListOptions{
 712  		Filter:      string(fBytes),
 713  		PageOptions: &PageOptions{Page: 1},
 714  	}
 715  
 716  	for {
 717  		select {
 718  		case <-ticker.C:
 719  			events, err := p.client.ListEvents(ctx, &listOpts)
 720  			if err != nil {
 721  				return nil, fmt.Errorf("failed to list events: %w", err)
 722  			}
 723  
 724  			for _, event := range events {
 725  				if p.SecondaryEntityID != nil && !eventMatchesSecondary(p.SecondaryEntityID, event) {
 726  					continue
 727  				}
 728  
 729  				if _, ok := p.previousEvents[event.ID]; !ok {
 730  					// Store this event so it is no longer picked up
 731  					// on subsequent jobs
 732  					p.previousEvents[event.ID] = true
 733  
 734  					return &event, nil
 735  				}
 736  			}
 737  		case <-ctx.Done():
 738  			return nil, fmt.Errorf("failed to wait for event: %w", ctx.Err())
 739  		}
 740  	}
 741  }
 742  
 743  // WaitForFinished waits for a new event to be finished.
 744  func (p *EventPoller) WaitForFinished(
 745  	ctx context.Context, timeoutSeconds int,
 746  ) (*Event, error) {
 747  	ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSeconds)*time.Second)
 748  	defer cancel()
 749  
 750  	ticker := time.NewTicker(p.client.pollInterval)
 751  	defer ticker.Stop()
 752  
 753  	event, err := p.WaitForLatestUnknownEvent(ctx)
 754  	if err != nil {
 755  		return nil, fmt.Errorf("failed to wait for event: %w", err)
 756  	}
 757  
 758  	for {
 759  		select {
 760  		case <-ticker.C:
 761  			event, err = p.client.GetEvent(ctx, event.ID)
 762  			if err != nil {
 763  				return nil, fmt.Errorf("failed to get event: %w", err)
 764  			}
 765  
 766  			switch event.Status {
 767  			case EventFinished:
 768  				return event, nil
 769  			case EventFailed:
 770  				return nil, fmt.Errorf("event %d has failed", event.ID)
 771  			case EventScheduled, EventStarted, EventNotification:
 772  				continue
 773  			}
 774  		case <-ctx.Done():
 775  			return nil, fmt.Errorf("failed to wait for event finished: %w", ctx.Err())
 776  		}
 777  	}
 778  }
 779  
 780  // WaitForResourceFree waits for a resource to have no running events.
 781  func (client Client) WaitForResourceFree(
 782  	ctx context.Context, entityType EntityType, entityID any, timeoutSeconds int,
 783  ) error {
 784  	apiFilter := Filter{
 785  		Order:   Descending,
 786  		OrderBy: "created",
 787  	}
 788  	apiFilter.AddField(Eq, "entity.id", entityID)
 789  	apiFilter.AddField(Eq, "entity.type", entityType)
 790  
 791  	filterStr, err := apiFilter.MarshalJSON()
 792  	if err != nil {
 793  		return fmt.Errorf("failed to create filter: %s", err)
 794  	}
 795  
 796  	ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSeconds)*time.Second)
 797  	defer cancel()
 798  
 799  	ticker := time.NewTicker(client.pollInterval)
 800  	defer ticker.Stop()
 801  
 802  	// A helper function to determine whether a resource is busy
 803  	checkIsBusy := func(events []Event) bool {
 804  		for _, event := range events {
 805  			if event.Status == EventStarted || event.Status == EventScheduled {
 806  				return true
 807  			}
 808  		}
 809  
 810  		return false
 811  	}
 812  
 813  	for {
 814  		select {
 815  		case <-ticker.C:
 816  			events, err := client.ListEvents(ctx, &ListOptions{
 817  				Filter: string(filterStr),
 818  			})
 819  			if err != nil {
 820  				return fmt.Errorf("failed to list events: %s", err)
 821  			}
 822  
 823  			if !checkIsBusy(events) {
 824  				return nil
 825  			}
 826  
 827  		case <-ctx.Done():
 828  			return fmt.Errorf("failed to wait for resource free: %s", ctx.Err())
 829  		}
 830  	}
 831  }
 832  
 833  // eventMatchesSecondary returns whether the given event's secondary entity
 834  // matches the configured secondary ID.
 835  // This logic has been broken out to improve readability.
 836  func eventMatchesSecondary(configuredID any, e Event) bool {
 837  	// We should return false if the event has no secondary entity.
 838  	// e.g. A previous disk deletion has completed.
 839  	if e.SecondaryEntity == nil {
 840  		return false
 841  	}
 842  
 843  	secondaryID := e.SecondaryEntity.ID
 844  
 845  	// Evil hack to correct IDs parsed as floats
 846  	if value, ok := secondaryID.(float64); ok {
 847  		secondaryID = int(value)
 848  	}
 849  
 850  	return secondaryID == configuredID
 851  }
 852