1 /*
2 *
3 * Copyright 2023 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 19 package grpcsync
20 21 import (
22 "context"
23 "sync"
24 )
25 26 // Subscriber represents an entity that is subscribed to messages published on
27 // a PubSub. It wraps the callback to be invoked by the PubSub when a new
28 // message is published.
29 type Subscriber interface {
30 // OnMessage is invoked when a new message is published. Implementations
31 // must not block in this method.
32 OnMessage(msg any)
33 }
34 35 // PubSub is a simple one-to-many publish-subscribe system that supports
36 // messages of arbitrary type. It guarantees that messages are delivered in
37 // the same order in which they were published.
38 //
39 // Publisher invokes the Publish() method to publish new messages, while
40 // subscribers interested in receiving these messages register a callback
41 // via the Subscribe() method.
42 //
43 // Once a PubSub is stopped, no more messages can be published, but any pending
44 // published messages will be delivered to the subscribers. Done may be used
45 // to determine when all published messages have been delivered.
46 type PubSub struct {
47 cs *CallbackSerializer
48 49 // Access to the below fields are guarded by this mutex.
50 mu sync.Mutex
51 msg any
52 subscribers map[Subscriber]bool
53 }
54 55 // NewPubSub returns a new PubSub instance. Users should cancel the
56 // provided context to shutdown the PubSub.
57 func NewPubSub(ctx context.Context) *PubSub {
58 return &PubSub{
59 cs: NewCallbackSerializer(ctx),
60 subscribers: map[Subscriber]bool{},
61 }
62 }
63 64 // Subscribe registers the provided Subscriber to the PubSub.
65 //
66 // If the PubSub contains a previously published message, the Subscriber's
67 // OnMessage() callback will be invoked asynchronously with the existing
68 // message to begin with, and subsequently for every newly published message.
69 //
70 // The caller is responsible for invoking the returned cancel function to
71 // unsubscribe itself from the PubSub.
72 func (ps *PubSub) Subscribe(sub Subscriber) (cancel func()) {
73 ps.mu.Lock()
74 defer ps.mu.Unlock()
75 76 ps.subscribers[sub] = true
77 78 if ps.msg != nil {
79 msg := ps.msg
80 ps.cs.TrySchedule(func(context.Context) {
81 ps.mu.Lock()
82 defer ps.mu.Unlock()
83 if !ps.subscribers[sub] {
84 return
85 }
86 sub.OnMessage(msg)
87 })
88 }
89 90 return func() {
91 ps.mu.Lock()
92 defer ps.mu.Unlock()
93 delete(ps.subscribers, sub)
94 }
95 }
96 97 // Publish publishes the provided message to the PubSub, and invokes
98 // callbacks registered by subscribers asynchronously.
99 func (ps *PubSub) Publish(msg any) {
100 ps.mu.Lock()
101 defer ps.mu.Unlock()
102 103 ps.msg = msg
104 for sub := range ps.subscribers {
105 s := sub
106 ps.cs.TrySchedule(func(context.Context) {
107 ps.mu.Lock()
108 defer ps.mu.Unlock()
109 if !ps.subscribers[s] {
110 return
111 }
112 s.OnMessage(msg)
113 })
114 }
115 }
116 117 // Done returns a channel that is closed after the context passed to NewPubSub
118 // is canceled and all updates have been sent to subscribers.
119 func (ps *PubSub) Done() <-chan struct{} {
120 return ps.cs.Done()
121 }
122