main.go raw

   1  package wallet
   2  
   3  import (
   4  	"fmt"
   5  	// This enables pprof
   6  	// _ "net/http/pprof"
   7  	"sync"
   8  
   9  	"github.com/p9c/p9/pkg/qu"
  10  
  11  	"github.com/p9c/p9/pkg/log"
  12  	"github.com/p9c/p9/pkg/chaincfg"
  13  	"github.com/p9c/p9/pod/config"
  14  	"github.com/p9c/p9/pod/state"
  15  
  16  	"github.com/p9c/p9/pkg/interrupt"
  17  
  18  	"github.com/p9c/p9/pkg/chainclient"
  19  )
  20  
  21  // Main is a work-around main function that is required since deferred functions
  22  // (such as log flushing) are not called with calls to os.Exit. Instead, main
  23  // runs this function and checks for a non-nil error, at point any defers have
  24  // already run, and if the error is non-nil, the program can be exited with an
  25  // error exit status.
  26  func Main(cx *state.State) (e error) {
  27  	// cx.WaitGroup.Add(1)
  28  	cx.WaitAdd()
  29  	// if *config.Profile != "" {
  30  	//	go func() {
  31  	//		listenAddr := net.JoinHostPort("127.0.0.1", *config.Profile)
  32  	//		I.Ln("profile server listening on", listenAddr)
  33  	//		profileRedirect := http.RedirectHandler("/debug/pprof",
  34  	//			http.StatusSeeOther)
  35  	//		http.Handle("/", profileRedirect)
  36  	//		fmt.Println(http.ListenAndServe(listenAddr, nil))
  37  	//	}()
  38  	// }
  39  	loader := NewLoader(cx.ActiveNet, cx.Config.WalletFile.V(), 250)
  40  	// Create and start HTTP server to serve wallet client connections. This will be updated with the wallet and chain
  41  	// server RPC client created below after each is created.
  42  	D.Ln("starting RPC servers")
  43  	var legacyServer *Server
  44  	if legacyServer, e = startRPCServers(cx, loader); E.Chk(e) {
  45  		E.Ln("unable to create RPC servers:", e)
  46  		return
  47  	}
  48  	loader.RunAfterLoad(
  49  		func(w *Wallet) {
  50  			D.Ln("starting wallet RPC services", w != nil)
  51  			startWalletRPCServices(w, legacyServer)
  52  			// cx.WalletChan <- w
  53  		},
  54  	)
  55  	if !cx.Config.NoInitialLoad.True() {
  56  		go func() {
  57  			D.Ln("loading wallet", cx.Config.WalletPass.V())
  58  			if e = LoadWallet(loader, cx, legacyServer); E.Chk(e) {
  59  			}
  60  		}()
  61  	}
  62  	interrupt.AddHandler(cx.WalletKill.Q)
  63  	select {
  64  	case <-cx.WalletKill.Wait():
  65  		D.Ln("wallet killswitch activated")
  66  		if legacyServer != nil {
  67  			D.Ln("stopping wallet RPC server")
  68  			legacyServer.Stop()
  69  			I.Ln("stopped wallet RPC server")
  70  		}
  71  		I.Ln("wallet shutdown from killswitch complete")
  72  		cx.WaitDone()
  73  		return
  74  	case <-cx.KillAll.Wait():
  75  		D.Ln("killall")
  76  		cx.WalletKill.Q()
  77  	case <-interrupt.HandlersDone.Wait():
  78  	}
  79  	I.Ln("wallet shutdown complete")
  80  	cx.WaitDone()
  81  	return
  82  }
  83  
  84  // LoadWallet ...
  85  func LoadWallet(
  86  	loader *Loader, cx *state.State, legacyServer *Server,
  87  ) (e error) {
  88  	T.Ln("starting rpc client connection handler", cx.Config.WalletPass.V())
  89  	// Create and start chain RPC client so it's ready to connect to the wallet when
  90  	// loaded later. Load the wallet database. It must have been created already or
  91  	// this will return an appropriate error.
  92  	var w *Wallet
  93  	T.Ln("opening existing wallet, pass:", cx.Config.WalletPass.V())
  94  	if w, e = loader.OpenExistingWallet(cx.Config.WalletPass.Bytes(), true, cx.Config, nil); E.Chk(e) {
  95  		T.Ln("failed to open existing wallet")
  96  		return
  97  	}
  98  	T.Ln("opened existing wallet")
  99  	// go func() {
 100  	// W.Ln("refilling mining addresses", cx.Config, cx.StateCfg)
 101  	// addresses.RefillMiningAddresses(w, cx.Config, cx.StateCfg)
 102  	// W.Ln("done refilling mining addresses")
 103  	// D.S(*cx.Config.MiningAddrs)
 104  	// save.Save(cx.Config)
 105  	// }()
 106  	loader.Wallet = w
 107  	// D.Ln("^^^^^^^^^^^ sending back wallet")
 108  	// cx.WalletChan <- w
 109  	T.Ln("starting rpcClientConnectLoop")
 110  	go rpcClientConnectLoop(cx, legacyServer, loader)
 111  	T.Ln("adding interrupt handler to unload wallet")
 112  	// Add interrupt handlers to shutdown the various process components before
 113  	// exiting. Interrupt handlers run in LIFO order, so the wallet (which should be
 114  	// closed last) is added first.
 115  	interrupt.AddHandler(
 116  		func() {
 117  			D.Ln("wallet.CtlMain interrupt")
 118  			e := loader.UnloadWallet()
 119  			if e != nil && e != ErrNotLoaded {
 120  				E.Ln("failed to close wallet:", e)
 121  			}
 122  		},
 123  	)
 124  	if legacyServer != nil {
 125  		interrupt.AddHandler(
 126  			func() {
 127  				D.Ln("stopping wallet RPC server")
 128  				legacyServer.Stop()
 129  				D.Ln("wallet RPC server shutdown")
 130  			},
 131  		)
 132  	}
 133  	go func() {
 134  		select {
 135  		case <-cx.KillAll.Wait():
 136  		case <-legacyServer.RequestProcessShutdownChan().Wait():
 137  		}
 138  		interrupt.Request()
 139  	}()
 140  	return
 141  }
 142  
 143  // rpcClientConnectLoop continuously attempts a connection to the consensus RPC
 144  // server. When a connection is established, the client is used to sync the
 145  // loaded wallet, either immediately or when loaded at a later time.
 146  //
 147  // The legacy RPC is optional. If set, the connected RPC client will be
 148  // associated with the server for RPC pass-through and to enable additional
 149  // methods.
 150  func rpcClientConnectLoop(
 151  	cx *state.State, legacyServer *Server,
 152  	loader *Loader,
 153  ) {
 154  	T.Ln("rpcClientConnectLoop", log.Caller("which was started at:", 2))
 155  	// var certs []byte
 156  	// if !cx.PodConfig.UseSPV {
 157  	certs := cx.Config.ReadCAFile()
 158  	// }
 159  	for {
 160  		var (
 161  			chainClient chainclient.Interface
 162  			e           error
 163  		)
 164  		// if cx.PodConfig.UseSPV {
 165  		// 	var (
 166  		// 		chainService *neutrino.ChainService
 167  		// 		spvdb        walletdb.DB
 168  		// 	)
 169  		// 	netDir := networkDir(cx.PodConfig.AppDataDir.value, ActiveNet.Params)
 170  		// 	spvdb, e = walletdb.Create("bdb",
 171  		// 		filepath.Join(netDir, "neutrino.db"))
 172  		// 	defer spvdb.Close()
 173  		// 	if e != nil  {
 174  		// 		log<-cl.Errorf{"unable to create Neutrino DB: %s", e)
 175  		// 		continue
 176  		// 	}
 177  		// 	chainService, e = neutrino.NewChainService(
 178  		// 		neutrino.Config{
 179  		// 			DataDir:      netDir,
 180  		// 			Database:     spvdb,
 181  		// 			ChainParams:  *ActiveNet.Params,
 182  		// 			ConnectPeers: cx.PodConfig.ConnectPeers,
 183  		// 			AddPeers:     cx.PodConfig.AddPeers,
 184  		// 		})
 185  		// 	if e != nil  {
 186  		// 		log<-cl.Errorf{"couldn't create Neutrino ChainService: %s", e)
 187  		// 		continue
 188  		// 	}
 189  		// 	chainClient = chain.NewNeutrinoClient(ActiveNet.Params, chainService)
 190  		// 	e = chainClient.Start()
 191  		// 	if e != nil  {
 192  		// 		log<-cl.Errorf{"couldn't start Neutrino client: %s", e)
 193  		// 	}
 194  		// } else {
 195  		var cc *chainclient.RPCClient
 196  		T.Ln("starting wallet's ChainClient")
 197  		cc, e = StartChainRPC(cx.Config, cx.ActiveNet, certs, cx.KillAll)
 198  		if e != nil {
 199  			E.Ln(
 200  				"unable to open connection to consensus RPC server:", e,
 201  			)
 202  			continue
 203  		}
 204  		T.Ln("storing chain client")
 205  		cx.ChainClient = cc
 206  		cx.ChainClientReady.Q()
 207  		chainClient = cc
 208  		// Rather than inlining this logic directly into the loader callback, a function
 209  		// variable is used to avoid running any of this after the client disconnects by
 210  		// setting it to nil. This prevents the callback from associating a wallet
 211  		// loaded at a later time with a client that has already disconnected. A mutex
 212  		// is used to make this concurrent safe.
 213  		associateRPCClient := func(w *Wallet) {
 214  			T.Ln("associating chain client")
 215  			if w != nil {
 216  				w.SynchronizeRPC(chainClient)
 217  			}
 218  			if legacyServer != nil {
 219  				legacyServer.SetChainServer(chainClient)
 220  			}
 221  		}
 222  		T.Ln("adding wallet loader hook to connect to chain")
 223  		mu := new(sync.Mutex)
 224  		loader.RunAfterLoad(
 225  			func(w *Wallet) {
 226  				T.Ln("running associate chain client")
 227  				mu.Lock()
 228  				associate := associateRPCClient
 229  				mu.Unlock()
 230  				if associate != nil {
 231  					associate(w)
 232  					T.Ln("wallet is now associated by chain client")
 233  				} else {
 234  					T.Ln("wallet chain client associate function is nil")
 235  				}
 236  			},
 237  		)
 238  		chainClient.WaitForShutdown()
 239  		mu.Lock()
 240  		associateRPCClient = nil
 241  		mu.Unlock()
 242  		loadedWallet, ok := loader.LoadedWallet()
 243  		if ok {
 244  			// Do not attempt a reconnect when the wallet was explicitly stopped.
 245  			if loadedWallet.ShuttingDown() {
 246  				return
 247  			}
 248  			loadedWallet.SetChainSynced(false)
 249  			// TODO: Rework the wallet so changing the RPC client does not
 250  			//  require stopping and restarting everything.
 251  			loadedWallet.Stop()
 252  			loadedWallet.WaitForShutdown()
 253  			loadedWallet.Start()
 254  		}
 255  	}
 256  }
 257  
 258  // StartChainRPC opens a RPC client connection to a pod server for blockchain
 259  // services. This function uses the RPC options from the global config and there
 260  // is no recovery in case the server is not available or if there is an
 261  // authentication error. Instead, all requests to the client will simply error.
 262  func StartChainRPC(
 263  	config *config.Config,
 264  	activeNet *chaincfg.Params,
 265  	certs []byte,
 266  	quit qu.C,
 267  ) (rpcC *chainclient.RPCClient, e error) {
 268  	D.F(
 269  		"attempting RPC client connection to %v, TLS: %s",
 270  		config.RPCConnect.V(),
 271  		fmt.Sprint(config.ClientTLS.True()),
 272  	)
 273  	if rpcC, e = chainclient.NewRPCClient(
 274  		activeNet,
 275  		config.RPCConnect.V(),
 276  		config.Username.V(),
 277  		config.Password.V(),
 278  		certs,
 279  		config.ClientTLS.True(),
 280  		0,
 281  		quit,
 282  	); E.Chk(e) {
 283  		return nil, e
 284  	}
 285  	e = rpcC.Start()
 286  	return rpcC, e
 287  }
 288