1 package chainrpc
2 3 import (
4 "sync/atomic"
5 6 "github.com/p9c/p9/pkg/block"
7 8 "github.com/p9c/p9/pkg/blockchain"
9 "github.com/p9c/p9/pkg/chainhash"
10 "github.com/p9c/p9/pkg/mempool"
11 "github.com/p9c/p9/pkg/netsync"
12 "github.com/p9c/p9/pkg/peer"
13 "github.com/p9c/p9/pkg/wire"
14 )
15 16 // Peer provides a peer for use with the RPC Server and implements the RPCServerPeer interface.
17 type Peer NodePeer
18 19 // Ensure rpcPeer implements the RPCServerPeer interface.
20 var _ ServerPeer = (*Peer)(nil)
21 22 // ToPeer returns the underlying peer instance.
23 //
24 // This function is safe for concurrent access and is part of the RPCServerPeer interface implementation.
25 func (p *Peer) ToPeer() *peer.Peer {
26 if p == nil {
27 return nil
28 }
29 return (*NodePeer)(p).Peer
30 }
31 32 // IsTxRelayDisabled returns whether or not the peer has disabled transaction relay.
33 //
34 // This function is safe for concurrent access and is part of the RPCServerPeer interface implementation.
35 func (p *Peer) IsTxRelayDisabled() bool {
36 return (*NodePeer)(p).DisableRelayTx
37 }
38 39 // GetBanScore returns the current integer value that represents how close the peer is to being banned.
40 //
41 // This function is safe for concurrent access and is part of the RPCServerPeer interface implementation.
42 func (p *Peer) GetBanScore() uint32 {
43 return (*NodePeer)(p).BanScore.Int()
44 }
45 46 // GetFeeFilter returns the requested current minimum fee rate for which transactions should be announced.
47 //
48 // This function is safe for concurrent access and is part of the RPCServerPeer interface implementation.
49 func (p *Peer) GetFeeFilter() int64 {
50 return atomic.LoadInt64(&(*NodePeer)(p).FeeFilter)
51 }
52 53 // ConnManager provides a connection manager for use with the RPC Server and implements the rpcserver ConnManager
54 // interface.
55 type ConnManager struct {
56 Server *Node
57 }
58 59 // Ensure rpcConnManager implements the RPCServerConnManager interface.
60 var _ ServerConnManager = &ConnManager{}
61 62 // Connect adds the provided address as a new outbound peer.
63 //
64 // The permanent flag indicates whether or not to make the peer persistent and reconnect if the connection is lost.
65 //
66 // Attempting to connect to an already existing peer will return an error.
67 //
68 // This function is safe for concurrent access and is part of the RPCServerConnManager interface implementation.
69 func (cm *ConnManager) Connect(addr string, permanent bool) (e error) {
70 replyChan := make(chan error)
71 cm.Server.Query <- ConnectNodeMsg{
72 Addr: addr,
73 Permanent: permanent,
74 Reply: replyChan,
75 }
76 return <-replyChan
77 }
78 79 // RemoveByID removes the peer associated with the provided id from the list of persistent peers.
80 //
81 // Attempting to remove an id that does not exist will return an error.
82 //
83 // This function is safe for concurrent access and is part of the RPCServerConnManager interface implementation.
84 func (cm *ConnManager) RemoveByID(id int32) (e error) {
85 replyChan := make(chan error)
86 cm.Server.Query <- RemoveNodeMsg{
87 Cmp: func(sp *NodePeer) bool { return sp.ID() == id },
88 Reply: replyChan,
89 }
90 return <-replyChan
91 }
92 93 // RemoveByAddr removes the peer associated with the provided address from the list of persistent peers.
94 //
95 // Attempting to remove an address that does not exist will return an error.
96 //
97 // This function is safe for concurrent access and is part of the RPCServerConnManager interface implementation.
98 func (cm *ConnManager) RemoveByAddr(addr string) (e error) {
99 replyChan := make(chan error)
100 cm.Server.Query <- RemoveNodeMsg{
101 Cmp: func(sp *NodePeer) bool { return sp.Addr() == addr },
102 Reply: replyChan,
103 }
104 return <-replyChan
105 }
106 107 // DisconnectByID disconnects the peer associated with the provided id.
108 //
109 // This applies to both inbound and outbound peers.
110 //
111 // Attempting to remove an id that does not exist will return an error.
112 //
113 // This function is safe for concurrent access and is part of the RPCServerConnManager interface implementation.
114 func (cm *ConnManager) DisconnectByID(id int32) (e error) {
115 replyChan := make(chan error)
116 cm.Server.Query <- DisconnectNodeMsg{
117 Cmp: func(sp *NodePeer) bool { return sp.ID() == id },
118 Reply: replyChan,
119 }
120 return <-replyChan
121 }
122 123 // DisconnectByAddr disconnects the peer associated with the provided address. This applies to both inbound and outbound
124 // peers.
125 //
126 // Attempting to remove an address that does not exist will return an error.
127 //
128 // This function is safe for concurrent access and is part of the RPCServerConnManager interface implementation.
129 func (cm *ConnManager) DisconnectByAddr(addr string) (e error) {
130 replyChan := make(chan error)
131 cm.Server.Query <- DisconnectNodeMsg{
132 Cmp: func(sp *NodePeer) bool { return sp.Addr() == addr },
133 Reply: replyChan,
134 }
135 return <-replyChan
136 }
137 138 // ConnectedCount returns the number of currently connected peers.
139 //
140 // This function is safe for concurrent access and is part of the RPCServerConnManager interface implementation.
141 func (cm *ConnManager) ConnectedCount() int32 {
142 return cm.Server.ConnectedCount()
143 }
144 145 // NetTotals returns the sum of all bytes received and sent across the network for all peers.
146 //
147 // This function is safe for concurrent access and is part of the RPCServerConnManager interface implementation.
148 func (cm *ConnManager) NetTotals() (uint64, uint64) {
149 return cm.Server.NetTotals()
150 }
151 152 // ConnectedPeers returns an array consisting of all connected peers.
153 //
154 // This function is safe for concurrent access and is part of the RPCServerConnManager interface implementation.
155 func (cm *ConnManager) ConnectedPeers() []ServerPeer {
156 replyChan := make(chan []*NodePeer)
157 cm.Server.Query <- GetPeersMsg{Reply: replyChan}
158 serverPeers := <-replyChan
159 // Convert to RPC Server peers.
160 peers := make([]ServerPeer, 0, len(serverPeers))
161 for _, sp := range serverPeers {
162 peers = append(peers, (*Peer)(sp))
163 }
164 return peers
165 }
166 167 // PersistentPeers returns an array consisting of all the added persistent peers.
168 //
169 // This function is safe for concurrent access and is part of the RPCServerConnManager interface implementation.
170 func (cm *ConnManager) PersistentPeers() []ServerPeer {
171 replyChan := make(chan []*NodePeer)
172 cm.Server.Query <- GetAddedNodesMsg{Reply: replyChan}
173 serverPeers := <-replyChan
174 // Convert to generic peers.
175 peers := make([]ServerPeer, 0, len(serverPeers))
176 for _, sp := range serverPeers {
177 peers = append(peers, (*Peer)(sp))
178 }
179 return peers
180 }
181 182 // BroadcastMessage sends the provided message to all currently connected peers.
183 //
184 // This function is safe for concurrent access and is part of the RPCServerConnManager interface implementation.
185 func (cm *ConnManager) BroadcastMessage(msg wire.Message) {
186 cm.Server.BroadcastMessage(msg)
187 }
188 189 // AddRebroadcastInventory adds the provided inventory to the list of inventories to be rebroadcast at random intervals
190 // until they show up in a block.
191 //
192 // This function is safe for concurrent access and is part of the RPCServerConnManager interface implementation.
193 func (cm *ConnManager) AddRebroadcastInventory(iv *wire.InvVect,
194 data interface{},
195 ) {
196 cm.Server.AddRebroadcastInventory(iv, data)
197 }
198 199 // RelayTransactions generates and relays inventory vectors for all of the passed transactions to all connected peers.
200 func (cm *ConnManager) RelayTransactions(txns []*mempool.TxDesc) {
201 cm.Server.RelayTransactions(txns)
202 }
203 204 // SyncManager provides a block manager for use with the RPC Server and implements the RPCServerSyncManager interface.
205 type SyncManager struct {
206 Server *Node
207 SyncMgr *netsync.SyncManager
208 }
209 210 // Ensure rpcSyncMgr implements the RPCServerSyncManager interface.
211 var _ ServerSyncManager = (*SyncManager)(nil)
212 213 // IsCurrent returns whether or not the sync manager believes the chain is current as compared to the rest of the
214 // network.
215 //
216 // This function is safe for concurrent access and is part of the RPCServerSyncManager interface implementation.
217 func (b *SyncManager) IsCurrent() bool {
218 return b.SyncMgr.IsCurrent()
219 }
220 221 // SubmitBlock submits the provided block to the network after processing it locally.
222 //
223 // This function is safe for concurrent access and is part of the RPCServerSyncManager interface implementation.
224 func (b *SyncManager) SubmitBlock(block *block.Block,
225 flags blockchain.BehaviorFlags,
226 ) (bool, error) {
227 return b.SyncMgr.ProcessBlock(block, flags)
228 }
229 230 // Pause pauses the sync manager until the returned channel is closed.
231 //
232 // This function is safe for concurrent access and is part of the RPCServerSyncManager interface implementation.
233 func (b *SyncManager) Pause() chan<- struct{} {
234 return b.SyncMgr.Pause()
235 }
236 237 // SyncPeerID returns the peer that is currently the peer being used to sync from.
238 //
239 // This function is safe for concurrent access and is part of the RPCServerSyncManager interface implementation.
240 func (b *SyncManager) SyncPeerID() int32 {
241 return b.SyncMgr.SyncPeerID()
242 }
243 244 // LocateHeaders returns the hashes of the blocks after the first known block in
245 // the provided locators until the provided
246 // stop hash or the current tip is reached, up to a max of wire.MaxBlockHeadersPerMsg hashes.
247 //
248 // This function is safe for concurrent access and is part of the RPCServerSyncManager interface implementation.
249 func (b *SyncManager) LocateHeaders(locators []*chainhash.Hash,
250 hashStop *chainhash.Hash,
251 ) []wire.BlockHeader {
252 return b.Server.Chain.LocateHeaders(locators, hashStop)
253 }
254