callback_serializer.go raw

   1  /*
   2   *
   3   * Copyright 2022 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  package grpcsync
  20  
  21  import (
  22  	"context"
  23  
  24  	"google.golang.org/grpc/internal/buffer"
  25  )
  26  
  27  // CallbackSerializer provides a mechanism to schedule callbacks in a
  28  // synchronized manner. It provides a FIFO guarantee on the order of execution
  29  // of scheduled callbacks. New callbacks can be scheduled by invoking the
  30  // Schedule() method.
  31  //
  32  // This type is safe for concurrent access.
  33  type CallbackSerializer struct {
  34  	// done is closed once the serializer is shut down completely, i.e all
  35  	// scheduled callbacks are executed and the serializer has deallocated all
  36  	// its resources.
  37  	done chan struct{}
  38  
  39  	callbacks *buffer.Unbounded
  40  }
  41  
  42  // NewCallbackSerializer returns a new CallbackSerializer instance. The provided
  43  // context will be passed to the scheduled callbacks. Users should cancel the
  44  // provided context to shutdown the CallbackSerializer. It is guaranteed that no
  45  // callbacks will be added once this context is canceled, and any pending un-run
  46  // callbacks will be executed before the serializer is shut down.
  47  func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {
  48  	cs := &CallbackSerializer{
  49  		done:      make(chan struct{}),
  50  		callbacks: buffer.NewUnbounded(),
  51  	}
  52  	go cs.run(ctx)
  53  	return cs
  54  }
  55  
  56  // TrySchedule tries to schedule the provided callback function f to be
  57  // executed in the order it was added. This is a best-effort operation. If the
  58  // context passed to NewCallbackSerializer was canceled before this method is
  59  // called, the callback will not be scheduled.
  60  //
  61  // Callbacks are expected to honor the context when performing any blocking
  62  // operations, and should return early when the context is canceled.
  63  func (cs *CallbackSerializer) TrySchedule(f func(ctx context.Context)) {
  64  	cs.callbacks.Put(f)
  65  }
  66  
  67  // ScheduleOr schedules the provided callback function f to be executed in the
  68  // order it was added. If the context passed to NewCallbackSerializer has been
  69  // canceled before this method is called, the onFailure callback will be
  70  // executed inline instead.
  71  //
  72  // Callbacks are expected to honor the context when performing any blocking
  73  // operations, and should return early when the context is canceled.
  74  func (cs *CallbackSerializer) ScheduleOr(f func(ctx context.Context), onFailure func()) {
  75  	if cs.callbacks.Put(f) != nil {
  76  		onFailure()
  77  	}
  78  }
  79  
  80  func (cs *CallbackSerializer) run(ctx context.Context) {
  81  	defer close(cs.done)
  82  
  83  	// Close the buffer when the context is canceled
  84  	// to prevent new callbacks from being added.
  85  	context.AfterFunc(ctx, cs.callbacks.Close)
  86  
  87  	// Run all callbacks.
  88  	for cb := range cs.callbacks.Get() {
  89  		cs.callbacks.Load()
  90  		cb.(func(context.Context))(ctx)
  91  	}
  92  }
  93  
  94  // Done returns a channel that is closed after the context passed to
  95  // NewCallbackSerializer is canceled and all callbacks have been executed.
  96  func (cs *CallbackSerializer) Done() <-chan struct{} {
  97  	return cs.done
  98  }
  99