1 package wallet
2 3 import (
4 "fmt"
5 // This enables pprof
6 // _ "net/http/pprof"
7 "sync"
8 9 "github.com/p9c/p9/pkg/qu"
10 11 "github.com/p9c/p9/pkg/log"
12 "github.com/p9c/p9/pkg/chaincfg"
13 "github.com/p9c/p9/pod/config"
14 "github.com/p9c/p9/pod/state"
15 16 "github.com/p9c/p9/pkg/interrupt"
17 18 "github.com/p9c/p9/pkg/chainclient"
19 )
20 21 // Main is a work-around main function that is required since deferred functions
22 // (such as log flushing) are not called with calls to os.Exit. Instead, main
23 // runs this function and checks for a non-nil error, at point any defers have
24 // already run, and if the error is non-nil, the program can be exited with an
25 // error exit status.
26 func Main(cx *state.State) (e error) {
27 // cx.WaitGroup.Add(1)
28 cx.WaitAdd()
29 // if *config.Profile != "" {
30 // go func() {
31 // listenAddr := net.JoinHostPort("127.0.0.1", *config.Profile)
32 // I.Ln("profile server listening on", listenAddr)
33 // profileRedirect := http.RedirectHandler("/debug/pprof",
34 // http.StatusSeeOther)
35 // http.Handle("/", profileRedirect)
36 // fmt.Println(http.ListenAndServe(listenAddr, nil))
37 // }()
38 // }
39 loader := NewLoader(cx.ActiveNet, cx.Config.WalletFile.V(), 250)
40 // Create and start HTTP server to serve wallet client connections. This will be updated with the wallet and chain
41 // server RPC client created below after each is created.
42 D.Ln("starting RPC servers")
43 var legacyServer *Server
44 if legacyServer, e = startRPCServers(cx, loader); E.Chk(e) {
45 E.Ln("unable to create RPC servers:", e)
46 return
47 }
48 loader.RunAfterLoad(
49 func(w *Wallet) {
50 D.Ln("starting wallet RPC services", w != nil)
51 startWalletRPCServices(w, legacyServer)
52 // cx.WalletChan <- w
53 },
54 )
55 if !cx.Config.NoInitialLoad.True() {
56 go func() {
57 D.Ln("loading wallet", cx.Config.WalletPass.V())
58 if e = LoadWallet(loader, cx, legacyServer); E.Chk(e) {
59 }
60 }()
61 }
62 interrupt.AddHandler(cx.WalletKill.Q)
63 select {
64 case <-cx.WalletKill.Wait():
65 D.Ln("wallet killswitch activated")
66 if legacyServer != nil {
67 D.Ln("stopping wallet RPC server")
68 legacyServer.Stop()
69 I.Ln("stopped wallet RPC server")
70 }
71 I.Ln("wallet shutdown from killswitch complete")
72 cx.WaitDone()
73 return
74 case <-cx.KillAll.Wait():
75 D.Ln("killall")
76 cx.WalletKill.Q()
77 case <-interrupt.HandlersDone.Wait():
78 }
79 I.Ln("wallet shutdown complete")
80 cx.WaitDone()
81 return
82 }
83 84 // LoadWallet ...
85 func LoadWallet(
86 loader *Loader, cx *state.State, legacyServer *Server,
87 ) (e error) {
88 T.Ln("starting rpc client connection handler", cx.Config.WalletPass.V())
89 // Create and start chain RPC client so it's ready to connect to the wallet when
90 // loaded later. Load the wallet database. It must have been created already or
91 // this will return an appropriate error.
92 var w *Wallet
93 T.Ln("opening existing wallet, pass:", cx.Config.WalletPass.V())
94 if w, e = loader.OpenExistingWallet(cx.Config.WalletPass.Bytes(), true, cx.Config, nil); E.Chk(e) {
95 T.Ln("failed to open existing wallet")
96 return
97 }
98 T.Ln("opened existing wallet")
99 // go func() {
100 // W.Ln("refilling mining addresses", cx.Config, cx.StateCfg)
101 // addresses.RefillMiningAddresses(w, cx.Config, cx.StateCfg)
102 // W.Ln("done refilling mining addresses")
103 // D.S(*cx.Config.MiningAddrs)
104 // save.Save(cx.Config)
105 // }()
106 loader.Wallet = w
107 // D.Ln("^^^^^^^^^^^ sending back wallet")
108 // cx.WalletChan <- w
109 T.Ln("starting rpcClientConnectLoop")
110 go rpcClientConnectLoop(cx, legacyServer, loader)
111 T.Ln("adding interrupt handler to unload wallet")
112 // Add interrupt handlers to shutdown the various process components before
113 // exiting. Interrupt handlers run in LIFO order, so the wallet (which should be
114 // closed last) is added first.
115 interrupt.AddHandler(
116 func() {
117 D.Ln("wallet.CtlMain interrupt")
118 e := loader.UnloadWallet()
119 if e != nil && e != ErrNotLoaded {
120 E.Ln("failed to close wallet:", e)
121 }
122 },
123 )
124 if legacyServer != nil {
125 interrupt.AddHandler(
126 func() {
127 D.Ln("stopping wallet RPC server")
128 legacyServer.Stop()
129 D.Ln("wallet RPC server shutdown")
130 },
131 )
132 }
133 go func() {
134 select {
135 case <-cx.KillAll.Wait():
136 case <-legacyServer.RequestProcessShutdownChan().Wait():
137 }
138 interrupt.Request()
139 }()
140 return
141 }
142 143 // rpcClientConnectLoop continuously attempts a connection to the consensus RPC
144 // server. When a connection is established, the client is used to sync the
145 // loaded wallet, either immediately or when loaded at a later time.
146 //
147 // The legacy RPC is optional. If set, the connected RPC client will be
148 // associated with the server for RPC pass-through and to enable additional
149 // methods.
150 func rpcClientConnectLoop(
151 cx *state.State, legacyServer *Server,
152 loader *Loader,
153 ) {
154 T.Ln("rpcClientConnectLoop", log.Caller("which was started at:", 2))
155 // var certs []byte
156 // if !cx.PodConfig.UseSPV {
157 certs := cx.Config.ReadCAFile()
158 // }
159 for {
160 var (
161 chainClient chainclient.Interface
162 e error
163 )
164 // if cx.PodConfig.UseSPV {
165 // var (
166 // chainService *neutrino.ChainService
167 // spvdb walletdb.DB
168 // )
169 // netDir := networkDir(cx.PodConfig.AppDataDir.value, ActiveNet.Params)
170 // spvdb, e = walletdb.Create("bdb",
171 // filepath.Join(netDir, "neutrino.db"))
172 // defer spvdb.Close()
173 // if e != nil {
174 // log<-cl.Errorf{"unable to create Neutrino DB: %s", e)
175 // continue
176 // }
177 // chainService, e = neutrino.NewChainService(
178 // neutrino.Config{
179 // DataDir: netDir,
180 // Database: spvdb,
181 // ChainParams: *ActiveNet.Params,
182 // ConnectPeers: cx.PodConfig.ConnectPeers,
183 // AddPeers: cx.PodConfig.AddPeers,
184 // })
185 // if e != nil {
186 // log<-cl.Errorf{"couldn't create Neutrino ChainService: %s", e)
187 // continue
188 // }
189 // chainClient = chain.NewNeutrinoClient(ActiveNet.Params, chainService)
190 // e = chainClient.Start()
191 // if e != nil {
192 // log<-cl.Errorf{"couldn't start Neutrino client: %s", e)
193 // }
194 // } else {
195 var cc *chainclient.RPCClient
196 T.Ln("starting wallet's ChainClient")
197 cc, e = StartChainRPC(cx.Config, cx.ActiveNet, certs, cx.KillAll)
198 if e != nil {
199 E.Ln(
200 "unable to open connection to consensus RPC server:", e,
201 )
202 continue
203 }
204 T.Ln("storing chain client")
205 cx.ChainClient = cc
206 cx.ChainClientReady.Q()
207 chainClient = cc
208 // Rather than inlining this logic directly into the loader callback, a function
209 // variable is used to avoid running any of this after the client disconnects by
210 // setting it to nil. This prevents the callback from associating a wallet
211 // loaded at a later time with a client that has already disconnected. A mutex
212 // is used to make this concurrent safe.
213 associateRPCClient := func(w *Wallet) {
214 T.Ln("associating chain client")
215 if w != nil {
216 w.SynchronizeRPC(chainClient)
217 }
218 if legacyServer != nil {
219 legacyServer.SetChainServer(chainClient)
220 }
221 }
222 T.Ln("adding wallet loader hook to connect to chain")
223 mu := new(sync.Mutex)
224 loader.RunAfterLoad(
225 func(w *Wallet) {
226 T.Ln("running associate chain client")
227 mu.Lock()
228 associate := associateRPCClient
229 mu.Unlock()
230 if associate != nil {
231 associate(w)
232 T.Ln("wallet is now associated by chain client")
233 } else {
234 T.Ln("wallet chain client associate function is nil")
235 }
236 },
237 )
238 chainClient.WaitForShutdown()
239 mu.Lock()
240 associateRPCClient = nil
241 mu.Unlock()
242 loadedWallet, ok := loader.LoadedWallet()
243 if ok {
244 // Do not attempt a reconnect when the wallet was explicitly stopped.
245 if loadedWallet.ShuttingDown() {
246 return
247 }
248 loadedWallet.SetChainSynced(false)
249 // TODO: Rework the wallet so changing the RPC client does not
250 // require stopping and restarting everything.
251 loadedWallet.Stop()
252 loadedWallet.WaitForShutdown()
253 loadedWallet.Start()
254 }
255 }
256 }
257 258 // StartChainRPC opens a RPC client connection to a pod server for blockchain
259 // services. This function uses the RPC options from the global config and there
260 // is no recovery in case the server is not available or if there is an
261 // authentication error. Instead, all requests to the client will simply error.
262 func StartChainRPC(
263 config *config.Config,
264 activeNet *chaincfg.Params,
265 certs []byte,
266 quit qu.C,
267 ) (rpcC *chainclient.RPCClient, e error) {
268 D.F(
269 "attempting RPC client connection to %v, TLS: %s",
270 config.RPCConnect.V(),
271 fmt.Sprint(config.ClientTLS.True()),
272 )
273 if rpcC, e = chainclient.NewRPCClient(
274 activeNet,
275 config.RPCConnect.V(),
276 config.Username.V(),
277 config.Password.V(),
278 certs,
279 config.ClientTLS.True(),
280 0,
281 quit,
282 ); E.Chk(e) {
283 return nil, e
284 }
285 e = rpcC.Start()
286 return rpcC, e
287 }
288