waitfor.go raw
1 package linodego
2
3 import (
4 "context"
5 "fmt"
6 "log"
7 "net/http"
8 "slices"
9 "strconv"
10 "time"
11
12 "golang.org/x/text/cases"
13 "golang.org/x/text/language"
14 )
15
16 var englishTitle = cases.Title(language.English)
17
18 type EventPoller struct {
19 EntityID any
20 EntityType EntityType
21
22 // Type is excluded here because it is implicitly determined
23 // by the event action.
24 SecondaryEntityID any
25
26 Action EventAction
27
28 client Client
29 previousEvents map[int]bool
30 }
31
32 // WaitForInstanceStatus waits for the Linode instance to reach the desired state
33 // before returning. It will timeout with an error after timeoutSeconds.
34 func (client Client) WaitForInstanceStatus(ctx context.Context, instanceID int, status InstanceStatus, timeoutSeconds int) (*Instance, error) {
35 ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSeconds)*time.Second)
36 defer cancel()
37
38 ticker := time.NewTicker(client.pollInterval)
39 defer ticker.Stop()
40
41 for {
42 select {
43 case <-ticker.C:
44 instance, err := client.GetInstance(ctx, instanceID)
45 if err != nil {
46 return instance, err
47 }
48
49 complete := (instance.Status == status)
50
51 if complete {
52 return instance, nil
53 }
54 case <-ctx.Done():
55 return nil, fmt.Errorf("Error waiting for Instance %d status %s: %w", instanceID, status, ctx.Err())
56 }
57 }
58 }
59
60 // WaitForInstanceDiskStatus waits for the Linode instance disk to reach the desired state
61 // before returning. It will timeout with an error after timeoutSeconds.
62 func (client Client) WaitForInstanceDiskStatus(ctx context.Context, instanceID int, diskID int, status DiskStatus, timeoutSeconds int) (*InstanceDisk, error) {
63 ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSeconds)*time.Second)
64 defer cancel()
65
66 ticker := time.NewTicker(client.pollInterval)
67 defer ticker.Stop()
68
69 for {
70 select {
71 case <-ticker.C:
72 // GetInstanceDisk will 404 on newly created disks. use List instead.
73 // disk, err := client.GetInstanceDisk(ctx, instanceID, diskID)
74 disks, err := client.ListInstanceDisks(ctx, instanceID, nil)
75 if err != nil {
76 return nil, err
77 }
78
79 for _, disk := range disks {
80 if disk.ID == diskID {
81 complete := (disk.Status == status)
82 if complete {
83 return &disk, nil
84 }
85
86 break
87 }
88 }
89 case <-ctx.Done():
90 return nil, fmt.Errorf("Error waiting for Instance %d Disk %d status %s: %w", instanceID, diskID, status, ctx.Err())
91 }
92 }
93 }
94
95 // WaitForVolumeStatus waits for the Volume to reach the desired state
96 // before returning. It will timeout with an error after timeoutSeconds.
97 func (client Client) WaitForVolumeStatus(ctx context.Context, volumeID int, status VolumeStatus, timeoutSeconds int) (*Volume, error) {
98 ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSeconds)*time.Second)
99 defer cancel()
100
101 ticker := time.NewTicker(client.pollInterval)
102 defer ticker.Stop()
103
104 for {
105 select {
106 case <-ticker.C:
107 volume, err := client.GetVolume(ctx, volumeID)
108 if err != nil {
109 return volume, err
110 }
111
112 complete := (volume.Status == status)
113
114 if complete {
115 return volume, nil
116 }
117 case <-ctx.Done():
118 return nil, fmt.Errorf("Error waiting for Volume %d status %s: %w", volumeID, status, ctx.Err())
119 }
120 }
121 }
122
123 // WaitForSnapshotStatus waits for the Snapshot to reach the desired state
124 // before returning. It will timeout with an error after timeoutSeconds.
125 func (client Client) WaitForSnapshotStatus(
126 ctx context.Context,
127 instanceID int,
128 snapshotID int,
129 status InstanceSnapshotStatus,
130 timeoutSeconds int,
131 ) (*InstanceSnapshot, error) {
132 ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSeconds)*time.Second)
133 defer cancel()
134
135 ticker := time.NewTicker(client.pollInterval)
136 defer ticker.Stop()
137
138 for {
139 select {
140 case <-ticker.C:
141 snapshot, err := client.GetInstanceSnapshot(ctx, instanceID, snapshotID)
142 if err != nil {
143 return snapshot, err
144 }
145
146 complete := (snapshot.Status == status)
147
148 if complete {
149 return snapshot, nil
150 }
151 case <-ctx.Done():
152 return nil, fmt.Errorf("Error waiting for Instance %d Snapshot %d status %s: %w", instanceID, snapshotID, status, ctx.Err())
153 }
154 }
155 }
156
157 // WaitForVolumeLinodeID waits for the Volume to match the desired LinodeID
158 // before returning. An active Instance will not immediately attach or detach a volume, so
159 // the LinodeID must be polled to determine volume readiness from the API.
160 // WaitForVolumeLinodeID will timeout with an error after timeoutSeconds.
161 func (client Client) WaitForVolumeLinodeID(ctx context.Context, volumeID int, linodeID *int, timeoutSeconds int) (*Volume, error) {
162 ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSeconds)*time.Second)
163 defer cancel()
164
165 ticker := time.NewTicker(client.pollInterval)
166 defer ticker.Stop()
167
168 for {
169 select {
170 case <-ticker.C:
171 volume, err := client.GetVolume(ctx, volumeID)
172 if err != nil {
173 return volume, err
174 }
175
176 switch {
177 case linodeID == nil && volume.LinodeID == nil:
178 return volume, nil
179 case linodeID == nil || volume.LinodeID == nil:
180 // continue waiting
181 case *volume.LinodeID == *linodeID:
182 return volume, nil
183 }
184 case <-ctx.Done():
185 return nil, fmt.Errorf("Error waiting for Volume %d to have Instance %v: %w", volumeID, linodeID, ctx.Err())
186 }
187 }
188 }
189
190 // WaitForLKEClusterStatus waits for the LKECluster to reach the desired state
191 // before returning. It will timeout with an error after timeoutSeconds.
192 func (client Client) WaitForLKEClusterStatus(ctx context.Context, clusterID int, status LKEClusterStatus, timeoutSeconds int) (*LKECluster, error) {
193 ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSeconds)*time.Second)
194 defer cancel()
195
196 ticker := time.NewTicker(client.pollInterval)
197 defer ticker.Stop()
198
199 for {
200 select {
201 case <-ticker.C:
202 cluster, err := client.GetLKECluster(ctx, clusterID)
203 if err != nil {
204 return cluster, err
205 }
206
207 complete := (cluster.Status == status)
208
209 if complete {
210 return cluster, nil
211 }
212 case <-ctx.Done():
213 return nil, fmt.Errorf("Error waiting for Cluster %d status %s: %w", clusterID, status, ctx.Err())
214 }
215 }
216 }
217
218 // LKEClusterPollOptions configures polls against LKE Clusters.
219 type LKEClusterPollOptions struct {
220 // Retry will cause the Poll to ignore interimittent errors
221 Retry bool
222
223 // TimeoutSeconds is the number of Seconds to wait for the poll to succeed
224 // before exiting.
225 TimeoutSeconds int
226
227 // TansportWrapper allows adding a transport middleware function that will
228 // wrap the LKE Cluster client's underlying http.RoundTripper.
229 TransportWrapper func(http.RoundTripper) http.RoundTripper
230 }
231
232 type ClusterConditionOptions struct {
233 LKEClusterKubeconfig *LKEClusterKubeconfig
234 TransportWrapper func(http.RoundTripper) http.RoundTripper
235 }
236
237 // ClusterConditionFunc represents a function that tests a condition against an LKE cluster,
238 // returns true if the condition has been reached, false if it has not yet been reached.
239 type ClusterConditionFunc func(context.Context, ClusterConditionOptions) (bool, error)
240
241 // WaitForLKEClusterConditions waits for the given LKE conditions to be true
242 func (client Client) WaitForLKEClusterConditions(
243 ctx context.Context,
244 clusterID int,
245 options LKEClusterPollOptions,
246 conditions ...ClusterConditionFunc,
247 ) error {
248 ctx, cancel := context.WithCancel(ctx)
249 if options.TimeoutSeconds != 0 {
250 ctx, cancel = context.WithTimeout(ctx, time.Duration(options.TimeoutSeconds)*time.Second)
251 }
252 defer cancel()
253
254 lkeKubeConfig, err := client.GetLKEClusterKubeconfig(ctx, clusterID)
255 if err != nil {
256 return fmt.Errorf("failed to get Kubeconfig for LKE cluster %d: %w", clusterID, err)
257 }
258
259 ticker := time.NewTicker(client.pollInterval)
260 defer ticker.Stop()
261
262 conditionOptions := ClusterConditionOptions{LKEClusterKubeconfig: lkeKubeConfig, TransportWrapper: options.TransportWrapper}
263
264 for _, condition := range conditions {
265 ConditionSucceeded:
266 for {
267 select {
268 case <-ticker.C:
269 result, err := condition(ctx, conditionOptions)
270 if err != nil {
271 log.Printf("[WARN] Ignoring WaitForLKEClusterConditions conditional error: %s", err)
272
273 if !options.Retry {
274 return err
275 }
276 }
277
278 if result {
279 break ConditionSucceeded
280 }
281
282 case <-ctx.Done():
283 return fmt.Errorf("Error waiting for cluster %d conditions: %w", clusterID, ctx.Err())
284 }
285 }
286 }
287
288 return nil
289 }
290
291 // WaitForEventFinished waits for an entity action to reach the 'finished' state
292 // before returning. It will timeout with an error after timeoutSeconds.
293 // If the event indicates a failure both the failed event and the error will be returned.
294 // nolint
295 func (client Client) WaitForEventFinished(
296 ctx context.Context,
297 id any,
298 entityType EntityType,
299 action EventAction,
300 minStart time.Time,
301 timeoutSeconds int,
302 ) (*Event, error) {
303 titledEntityType := englishTitle.String(string(entityType))
304 filter := Filter{
305 Order: Descending,
306 OrderBy: "created",
307 }
308 filter.AddField(Eq, "action", action)
309 filter.AddField(Gte, "created", minStart.UTC().Format("2006-01-02T15:04:05"))
310
311 // Optimistically restrict results to page 1. We should remove this when more
312 // precise filtering options exist.
313 pages := 1
314
315 // The API has limitted filtering support for Event ID and Event Type
316 // Optimize the list, if possible
317 switch entityType {
318 case EntityDisk, EntityDatabase, EntityLinode, EntityDomain, EntityNodebalancer:
319 // All of the filter supported types have int ids
320 filterableEntityID, err := strconv.Atoi(fmt.Sprintf("%v", id))
321 if err != nil {
322 return nil, fmt.Errorf("error parsing Entity ID %q for optimized "+
323 "WaitForEventFinished EventType %q: %w", id, entityType, err)
324 }
325 filter.AddField(Eq, "entity.id", filterableEntityID)
326 filter.AddField(Eq, "entity.type", entityType)
327 }
328
329 ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSeconds)*time.Second)
330 defer cancel()
331
332 if deadline, ok := ctx.Deadline(); ok {
333 duration := time.Until(deadline)
334 log.Printf("[INFO] Waiting %d seconds for %s events since %v for %s %v", int(duration.Seconds()), action, minStart, titledEntityType, id)
335 }
336
337 ticker := time.NewTicker(client.pollInterval)
338
339 // avoid repeating log messages
340 nextLog := ""
341 lastLog := ""
342 lastEventID := 0
343
344 defer ticker.Stop()
345 for {
346 select {
347 case <-ticker.C:
348 if lastEventID > 0 {
349 filter.AddField(Gte, "id", lastEventID)
350 }
351
352 filterStr, err := filter.MarshalJSON()
353 if err != nil {
354 return nil, err
355 }
356
357 listOptions := NewListOptions(pages, string(filterStr))
358
359 events, err := client.ListEvents(ctx, listOptions)
360 if err != nil {
361 return nil, err
362 }
363
364 // If there are events for this instance + action, inspect them
365 for _, event := range events {
366 event := event
367
368 if event.Entity == nil || event.Entity.Type != entityType {
369 // log.Println("type mismatch", event.Entity.Type, entityType)
370 continue
371 }
372
373 var entID string
374
375 switch id := event.Entity.ID.(type) {
376 case float64, float32:
377 entID = fmt.Sprintf("%.f", id)
378 case int:
379 entID = strconv.Itoa(id)
380 default:
381 entID = fmt.Sprintf("%v", id)
382 }
383
384 var findID string
385 switch id := id.(type) {
386 case float64, float32:
387 findID = fmt.Sprintf("%.f", id)
388 case int:
389 findID = strconv.Itoa(id)
390 default:
391 findID = fmt.Sprintf("%v", id)
392 }
393
394 if entID != findID {
395 // log.Println("id mismatch", entID, findID)
396 continue
397 }
398
399 if event.Created == nil {
400 log.Printf("[WARN] event.Created is nil when API returned: %#+v", event.Created)
401 }
402
403 // This is the event we are looking for. Save our place.
404 if lastEventID == 0 {
405 lastEventID = event.ID
406 }
407
408 switch event.Status {
409 case EventFailed:
410 return &event, fmt.Errorf("%s %v action %s failed", titledEntityType, id, action)
411 case EventFinished:
412 log.Printf("[INFO] %s %v action %s is finished", titledEntityType, id, action)
413 return &event, nil
414 }
415
416 nextLog = fmt.Sprintf("[INFO] %s %v action %s is %s", titledEntityType, id, action, event.Status)
417 }
418
419 // de-dupe logging statements
420 if nextLog != lastLog {
421 log.Print(nextLog)
422 lastLog = nextLog
423 }
424 case <-ctx.Done():
425 return nil, fmt.Errorf("Error waiting for Event Status '%s' of %s %v action '%s': %w", EventFinished, titledEntityType, id, action, ctx.Err())
426 }
427 }
428 }
429
430 // WaitForImageStatus waits for the Image to reach the desired state
431 // before returning. It will timeout with an error after timeoutSeconds.
432 func (client Client) WaitForImageStatus(ctx context.Context, imageID string, status ImageStatus, timeoutSeconds int) (*Image, error) {
433 ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSeconds)*time.Second)
434 defer cancel()
435
436 ticker := time.NewTicker(client.pollInterval)
437 defer ticker.Stop()
438
439 for {
440 select {
441 case <-ticker.C:
442 image, err := client.GetImage(ctx, imageID)
443 if err != nil {
444 return image, err
445 }
446
447 complete := image.Status == status
448
449 if complete {
450 return image, nil
451 }
452 case <-ctx.Done():
453 return nil, fmt.Errorf("failed to wait for Image %s status %s: %w", imageID, status, ctx.Err())
454 }
455 }
456 }
457
458 // WaitForImageRegionStatus waits for an Image's replica to reach the desired state
459 // before returning.
460 func (client Client) WaitForImageRegionStatus(ctx context.Context, imageID, region string, status ImageRegionStatus) (*Image, error) {
461 ticker := time.NewTicker(client.pollInterval)
462 defer ticker.Stop()
463
464 for {
465 select {
466 case <-ticker.C:
467 image, err := client.GetImage(ctx, imageID)
468 if err != nil {
469 return image, err
470 }
471
472 replicaIdx := slices.IndexFunc(
473 image.Regions,
474 func(r ImageRegion) bool {
475 return r.Region == region
476 },
477 )
478
479 // If no replica was found or the status doesn't match, try again
480 if replicaIdx < 0 || image.Regions[replicaIdx].Status != status {
481 continue
482 }
483
484 return image, nil
485
486 case <-ctx.Done():
487 return nil, fmt.Errorf("failed to wait for Image %s status %s: %w", imageID, status, ctx.Err())
488 }
489 }
490 }
491
492 // WaitForMySQLDatabaseBackup waits for the backup with the given label to be available.
493 //
494 // Deprecated: WaitForMySQLDatabaseBackup is a deprecated method, as the backup endpoints are no longer supported in DBaaS V2.
495 // In DBaaS V2, databases can be backed up via database forking.
496 func (client Client) WaitForMySQLDatabaseBackup(ctx context.Context, dbID int, label string, timeoutSeconds int) (*MySQLDatabaseBackup, error) {
497 ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSeconds)*time.Second)
498 defer cancel()
499
500 ticker := time.NewTicker(client.pollInterval)
501 defer ticker.Stop()
502
503 for {
504 select {
505 case <-ticker.C:
506 backups, err := client.ListMySQLDatabaseBackups(ctx, dbID, nil)
507 if err != nil {
508 return nil, err
509 }
510
511 for _, backup := range backups {
512 if backup.Label == label {
513 return &backup, nil
514 }
515 }
516 case <-ctx.Done():
517 return nil, fmt.Errorf("failed to wait for backup %s: %w", label, ctx.Err())
518 }
519 }
520 }
521
522 // WaitForPostgresDatabaseBackup waits for the backup with the given label to be available.
523 //
524 // Deprecated: WaitForPostgresDatabaseBackup is a deprecated method, as the backup endpoints are no longer supported in DBaaS V2.
525 // In DBaaS V2, databases can be backed up via database forking.
526 func (client Client) WaitForPostgresDatabaseBackup(ctx context.Context, dbID int, label string, timeoutSeconds int) (*PostgresDatabaseBackup, error) {
527 ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSeconds)*time.Second)
528 defer cancel()
529
530 ticker := time.NewTicker(client.pollInterval)
531 defer ticker.Stop()
532
533 for {
534 select {
535 case <-ticker.C:
536 backups, err := client.ListPostgresDatabaseBackups(ctx, dbID, nil)
537 if err != nil {
538 return nil, err
539 }
540
541 for _, backup := range backups {
542 if backup.Label == label {
543 return &backup, nil
544 }
545 }
546 case <-ctx.Done():
547 return nil, fmt.Errorf("failed to wait for backup %s: %w", label, ctx.Err())
548 }
549 }
550 }
551
552 type databaseStatusFunc func(ctx context.Context, client Client, dbID int) (DatabaseStatus, error)
553
554 var databaseStatusHandlers = map[DatabaseEngineType]databaseStatusFunc{
555 DatabaseEngineTypeMySQL: func(ctx context.Context, client Client, dbID int) (DatabaseStatus, error) {
556 db, err := client.GetMySQLDatabase(ctx, dbID)
557 if err != nil {
558 return "", err
559 }
560
561 return db.Status, nil
562 },
563 DatabaseEngineTypePostgres: func(ctx context.Context, client Client, dbID int) (DatabaseStatus, error) {
564 db, err := client.GetPostgresDatabase(ctx, dbID)
565 if err != nil {
566 return "", err
567 }
568
569 return db.Status, nil
570 },
571 }
572
573 // WaitForDatabaseStatus waits for the provided database to have the given status.
574 func (client Client) WaitForDatabaseStatus(
575 ctx context.Context, dbID int, dbEngine DatabaseEngineType, status DatabaseStatus, timeoutSeconds int,
576 ) error {
577 ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSeconds)*time.Second)
578 defer cancel()
579
580 ticker := time.NewTicker(client.pollInterval)
581 defer ticker.Stop()
582
583 for {
584 select {
585 case <-ticker.C:
586 statusHandler, ok := databaseStatusHandlers[dbEngine]
587 if !ok {
588 return fmt.Errorf("invalid db engine: %s", dbEngine)
589 }
590
591 currentStatus, err := statusHandler(ctx, client, dbID)
592 if err != nil {
593 return fmt.Errorf("failed to get db status: %w", err)
594 }
595
596 if currentStatus == status {
597 return nil
598 }
599 case <-ctx.Done():
600 return fmt.Errorf("failed to wait for database %d status: %w", dbID, ctx.Err())
601 }
602 }
603 }
604
605 // NewEventPoller initializes a new Linode event poller. This should be run before the event is triggered as it stores
606 // the previous state of the entity's events.
607 func (client Client) NewEventPoller(
608 ctx context.Context, id any, entityType EntityType, action EventAction,
609 ) (*EventPoller, error) {
610 result := EventPoller{
611 EntityID: id,
612 EntityType: entityType,
613 Action: action,
614
615 client: client,
616 }
617
618 if err := result.PreTask(ctx); err != nil {
619 return nil, fmt.Errorf("failed to run pretask: %w", err)
620 }
621
622 return &result, nil
623 }
624
625 // NewEventPollerWithSecondary initializes a new Linode event poller with for events with a
626 // specific secondary entity.
627 func (client Client) NewEventPollerWithSecondary(
628 ctx context.Context, id any, primaryEntityType EntityType, secondaryID int, action EventAction,
629 ) (*EventPoller, error) {
630 poller, err := client.NewEventPoller(ctx, id, primaryEntityType, action)
631 if err != nil {
632 return nil, err
633 }
634
635 poller.SecondaryEntityID = secondaryID
636
637 return poller, nil
638 }
639
640 // NewEventPollerWithoutEntity initializes a new Linode event poller without a target entity ID.
641 // This is useful for create events where the ID of the entity is not yet known.
642 // For example:
643 // p, _ := client.NewEventPollerWithoutEntity(...)
644 // inst, _ := client.CreateInstance(...)
645 // p.EntityID = inst.ID
646 // ...
647 func (client Client) NewEventPollerWithoutEntity(entityType EntityType, action EventAction) (*EventPoller, error) {
648 result := EventPoller{
649 EntityType: entityType,
650 Action: action,
651 EntityID: 0,
652 previousEvents: make(map[int]bool, 0),
653
654 client: client,
655 }
656
657 return &result, nil
658 }
659
660 // PreTask stores all current events for the given entity to prevent them from being
661 // processed on subsequent runs.
662 func (p *EventPoller) PreTask(ctx context.Context) error {
663 f := Filter{
664 OrderBy: "created",
665 Order: Descending,
666 }
667 f.AddField(Eq, "entity.type", p.EntityType)
668 f.AddField(Eq, "entity.id", p.EntityID)
669 f.AddField(Eq, "action", p.Action)
670
671 fBytes, err := f.MarshalJSON()
672 if err != nil {
673 return err
674 }
675
676 events, err := p.client.ListEvents(ctx, &ListOptions{
677 Filter: string(fBytes),
678 PageOptions: &PageOptions{Page: 1},
679 })
680 if err != nil {
681 return fmt.Errorf("failed to list events: %w", err)
682 }
683
684 eventIDs := make(map[int]bool, len(events))
685 for _, event := range events {
686 eventIDs[event.ID] = true
687 }
688
689 p.previousEvents = eventIDs
690
691 return nil
692 }
693
694 func (p *EventPoller) WaitForLatestUnknownEvent(ctx context.Context) (*Event, error) {
695 ticker := time.NewTicker(p.client.pollInterval)
696 defer ticker.Stop()
697
698 f := Filter{
699 OrderBy: "created",
700 Order: Descending,
701 }
702 f.AddField(Eq, "entity.type", p.EntityType)
703 f.AddField(Eq, "entity.id", p.EntityID)
704 f.AddField(Eq, "action", p.Action)
705
706 fBytes, err := f.MarshalJSON()
707 if err != nil {
708 return nil, err
709 }
710
711 listOpts := ListOptions{
712 Filter: string(fBytes),
713 PageOptions: &PageOptions{Page: 1},
714 }
715
716 for {
717 select {
718 case <-ticker.C:
719 events, err := p.client.ListEvents(ctx, &listOpts)
720 if err != nil {
721 return nil, fmt.Errorf("failed to list events: %w", err)
722 }
723
724 for _, event := range events {
725 if p.SecondaryEntityID != nil && !eventMatchesSecondary(p.SecondaryEntityID, event) {
726 continue
727 }
728
729 if _, ok := p.previousEvents[event.ID]; !ok {
730 // Store this event so it is no longer picked up
731 // on subsequent jobs
732 p.previousEvents[event.ID] = true
733
734 return &event, nil
735 }
736 }
737 case <-ctx.Done():
738 return nil, fmt.Errorf("failed to wait for event: %w", ctx.Err())
739 }
740 }
741 }
742
743 // WaitForFinished waits for a new event to be finished.
744 func (p *EventPoller) WaitForFinished(
745 ctx context.Context, timeoutSeconds int,
746 ) (*Event, error) {
747 ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSeconds)*time.Second)
748 defer cancel()
749
750 ticker := time.NewTicker(p.client.pollInterval)
751 defer ticker.Stop()
752
753 event, err := p.WaitForLatestUnknownEvent(ctx)
754 if err != nil {
755 return nil, fmt.Errorf("failed to wait for event: %w", err)
756 }
757
758 for {
759 select {
760 case <-ticker.C:
761 event, err = p.client.GetEvent(ctx, event.ID)
762 if err != nil {
763 return nil, fmt.Errorf("failed to get event: %w", err)
764 }
765
766 switch event.Status {
767 case EventFinished:
768 return event, nil
769 case EventFailed:
770 return nil, fmt.Errorf("event %d has failed", event.ID)
771 case EventScheduled, EventStarted, EventNotification:
772 continue
773 }
774 case <-ctx.Done():
775 return nil, fmt.Errorf("failed to wait for event finished: %w", ctx.Err())
776 }
777 }
778 }
779
780 // WaitForResourceFree waits for a resource to have no running events.
781 func (client Client) WaitForResourceFree(
782 ctx context.Context, entityType EntityType, entityID any, timeoutSeconds int,
783 ) error {
784 apiFilter := Filter{
785 Order: Descending,
786 OrderBy: "created",
787 }
788 apiFilter.AddField(Eq, "entity.id", entityID)
789 apiFilter.AddField(Eq, "entity.type", entityType)
790
791 filterStr, err := apiFilter.MarshalJSON()
792 if err != nil {
793 return fmt.Errorf("failed to create filter: %s", err)
794 }
795
796 ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSeconds)*time.Second)
797 defer cancel()
798
799 ticker := time.NewTicker(client.pollInterval)
800 defer ticker.Stop()
801
802 // A helper function to determine whether a resource is busy
803 checkIsBusy := func(events []Event) bool {
804 for _, event := range events {
805 if event.Status == EventStarted || event.Status == EventScheduled {
806 return true
807 }
808 }
809
810 return false
811 }
812
813 for {
814 select {
815 case <-ticker.C:
816 events, err := client.ListEvents(ctx, &ListOptions{
817 Filter: string(filterStr),
818 })
819 if err != nil {
820 return fmt.Errorf("failed to list events: %s", err)
821 }
822
823 if !checkIsBusy(events) {
824 return nil
825 }
826
827 case <-ctx.Done():
828 return fmt.Errorf("failed to wait for resource free: %s", ctx.Err())
829 }
830 }
831 }
832
833 // eventMatchesSecondary returns whether the given event's secondary entity
834 // matches the configured secondary ID.
835 // This logic has been broken out to improve readability.
836 func eventMatchesSecondary(configuredID any, e Event) bool {
837 // We should return false if the event has no secondary entity.
838 // e.g. A previous disk deletion has completed.
839 if e.SecondaryEntity == nil {
840 return false
841 }
842
843 secondaryID := e.SecondaryEntity.ID
844
845 // Evil hack to correct IDs parsed as floats
846 if value, ok := secondaryID.(float64); ok {
847 secondaryID = int(value)
848 }
849
850 return secondaryID == configuredID
851 }
852