pubsub.go raw

   1  /*
   2   *
   3   * Copyright 2023 gRPC authors.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *     http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   *
  17   */
  18  
  19  package grpcsync
  20  
  21  import (
  22  	"context"
  23  	"sync"
  24  )
  25  
  26  // Subscriber represents an entity that is subscribed to messages published on
  27  // a PubSub. It wraps the callback to be invoked by the PubSub when a new
  28  // message is published.
  29  type Subscriber interface {
  30  	// OnMessage is invoked when a new message is published. Implementations
  31  	// must not block in this method.
  32  	OnMessage(msg any)
  33  }
  34  
  35  // PubSub is a simple one-to-many publish-subscribe system that supports
  36  // messages of arbitrary type. It guarantees that messages are delivered in
  37  // the same order in which they were published.
  38  //
  39  // Publisher invokes the Publish() method to publish new messages, while
  40  // subscribers interested in receiving these messages register a callback
  41  // via the Subscribe() method.
  42  //
  43  // Once a PubSub is stopped, no more messages can be published, but any pending
  44  // published messages will be delivered to the subscribers.  Done may be used
  45  // to determine when all published messages have been delivered.
  46  type PubSub struct {
  47  	cs *CallbackSerializer
  48  
  49  	// Access to the below fields are guarded by this mutex.
  50  	mu          sync.Mutex
  51  	msg         any
  52  	subscribers map[Subscriber]bool
  53  }
  54  
  55  // NewPubSub returns a new PubSub instance.  Users should cancel the
  56  // provided context to shutdown the PubSub.
  57  func NewPubSub(ctx context.Context) *PubSub {
  58  	return &PubSub{
  59  		cs:          NewCallbackSerializer(ctx),
  60  		subscribers: map[Subscriber]bool{},
  61  	}
  62  }
  63  
  64  // Subscribe registers the provided Subscriber to the PubSub.
  65  //
  66  // If the PubSub contains a previously published message, the Subscriber's
  67  // OnMessage() callback will be invoked asynchronously with the existing
  68  // message to begin with, and subsequently for every newly published message.
  69  //
  70  // The caller is responsible for invoking the returned cancel function to
  71  // unsubscribe itself from the PubSub.
  72  func (ps *PubSub) Subscribe(sub Subscriber) (cancel func()) {
  73  	ps.mu.Lock()
  74  	defer ps.mu.Unlock()
  75  
  76  	ps.subscribers[sub] = true
  77  
  78  	if ps.msg != nil {
  79  		msg := ps.msg
  80  		ps.cs.TrySchedule(func(context.Context) {
  81  			ps.mu.Lock()
  82  			defer ps.mu.Unlock()
  83  			if !ps.subscribers[sub] {
  84  				return
  85  			}
  86  			sub.OnMessage(msg)
  87  		})
  88  	}
  89  
  90  	return func() {
  91  		ps.mu.Lock()
  92  		defer ps.mu.Unlock()
  93  		delete(ps.subscribers, sub)
  94  	}
  95  }
  96  
  97  // Publish publishes the provided message to the PubSub, and invokes
  98  // callbacks registered by subscribers asynchronously.
  99  func (ps *PubSub) Publish(msg any) {
 100  	ps.mu.Lock()
 101  	defer ps.mu.Unlock()
 102  
 103  	ps.msg = msg
 104  	for sub := range ps.subscribers {
 105  		s := sub
 106  		ps.cs.TrySchedule(func(context.Context) {
 107  			ps.mu.Lock()
 108  			defer ps.mu.Unlock()
 109  			if !ps.subscribers[s] {
 110  				return
 111  			}
 112  			s.OnMessage(msg)
 113  		})
 114  	}
 115  }
 116  
 117  // Done returns a channel that is closed after the context passed to NewPubSub
 118  // is canceled and all updates have been sent to subscribers.
 119  func (ps *PubSub) Done() <-chan struct{} {
 120  	return ps.cs.Done()
 121  }
 122