package wallet import ( "bytes" "github.com/p9c/p9/pkg/amt" "github.com/p9c/p9/pkg/btcaddr" "sync" "github.com/p9c/p9/pkg/chainhash" "github.com/p9c/p9/pkg/txscript" "github.com/p9c/p9/pkg/waddrmgr" "github.com/p9c/p9/pkg/walletdb" "github.com/p9c/p9/pkg/wtxmgr" ) // AccountBalance associates a total (zero confirmation) balance with an account. Balances for other minimum // confirmation counts require more expensive logic and it is not clear which minimums a client is interested in, so // they are not included. type AccountBalance struct { Account uint32 TotalBalance amt.Amount } // AccountNotification contains properties regarding an account, such as its name and the number of derived and imported // keys. When any of these properties change, the notification is fired. type AccountNotification struct { AccountNumber uint32 AccountName string ExternalKeyCount uint32 InternalKeyCount uint32 ImportedKeyCount uint32 } // AccountNotificationsClient receives AccountNotifications over the channel C. type AccountNotificationsClient struct { C chan *AccountNotification server *NotificationServer } // Block contains the properties and all relevant transactions of an attached // block. type Block struct { Hash *chainhash.Hash Height int32 Timestamp int64 Transactions []TransactionSummary } // TODO: It would be good to send errors during notification creation to the rpc server instead of just logging them // here so the client is aware that wallet isn't working correctly and notifications are missing. // // TODO: Anything dealing with accounts here is expensive because the database is not organized correctly for true // account support, but do the slow thing instead of the easy thing since the db can be fixed later, and we want the api // correct now. // NotificationServer is a server that interested clients may hook into to receive notifications of changes // in a wallet. A client is created for each registered notification. Clients are guaranteed to receive messages in the // order wallet created them, but there is no guaranteed synchronization between different clients. type NotificationServer struct { transactions []chan *TransactionNotifications currentTxNtfn *TransactionNotifications // coalesce this since wallet does not add mined txs together spentness map[uint32][]chan *SpentnessNotifications accountClients []chan *AccountNotification mu sync.Mutex // Only protects registered client channels wallet *Wallet // smells like hacks } // SpentnessNotifications is a notification that is fired for transaction outputs controlled by some account's keys. The // notification may be about a newly added unspent transaction output or that a previously unspent output is now spent. // When spent, the notification includes the spending transaction's hash and input index. type SpentnessNotifications struct { hash *chainhash.Hash spenderHash *chainhash.Hash index uint32 spenderIndex uint32 } // SpentnessNotificationsClient receives SpentnessNotifications from the NotificationServer over the channel C. type SpentnessNotificationsClient struct { C <-chan *SpentnessNotifications account uint32 server *NotificationServer } // TransactionNotifications is a notification of changes to the wallet's transaction set and the current chain tip that // wallet is considered to be synced with. All transactions added to the blockchain are organized by the block they were // mined in. // // During a chain switch, all removed block hashes are included. Detached blocks are sorted in the reverse order they // were mined. Attached blocks are sorted in the order mined. // // All newly added unmined transactions are included. Removed unmined transactions are not explicitly included. Instead, // the hashes of all transactions still unmined are included. // // If any transactions were involved, each affected account's new total balance is included. // // TODO: Because this includes stuff about blocks and can be fired without any changes to transactions, it needs a // better name. type TransactionNotifications struct { AttachedBlocks []Block DetachedBlocks []*chainhash.Hash UnminedTransactions []TransactionSummary UnminedTransactionHashes []*chainhash.Hash NewBalances []AccountBalance } // TransactionNotificationsClient receives TransactionNotifications from the NotificationServer over the channel C. type TransactionNotificationsClient struct { C <-chan *TransactionNotifications server *NotificationServer } // TransactionSummary contains a transaction relevant to the wallet and marks which inputs and outputs were relevant. type TransactionSummary struct { Hash *chainhash.Hash Transaction []byte MyInputs []TransactionSummaryInput MyOutputs []TransactionSummaryOutput Fee amt.Amount Timestamp int64 } // TransactionSummaryInput describes a transaction input that is relevant to the wallet. The Index field marks the // transaction input index of the transaction (not included here). The PreviousAccount and PreviousAmount fields // describe how much this input debits from a wallet account. type TransactionSummaryInput struct { Index uint32 PreviousAccount uint32 PreviousAmount amt.Amount } // TransactionSummaryOutput describes wallet properties of a transaction output controlled by the wallet. The Index // field marks the transaction output index of the transaction (not included here). type TransactionSummaryOutput struct { Index uint32 Account uint32 Internal bool } // Done unregisters the client from the server and drains any remaining messages. It must be called exactly once when // the client is finished receiving notifications. func (c *AccountNotificationsClient) Done() { go func() { for range c.C { } }() go func() { s := c.server s.mu.Lock() clients := s.accountClients for i, ch := range clients { if c.C == ch { clients[i] = clients[len(clients)-1] s.accountClients = clients[:len(clients)-1] close(ch) break } } s.mu.Unlock() }() } // AccountNotifications returns a client for receiving AccountNotifications over a channel. The channel is unbuffered. // When finished, the client's Done method should be called to disassociate the client from the server. func (s *NotificationServer) AccountNotifications() AccountNotificationsClient { c := make(chan *AccountNotification) s.mu.Lock() s.accountClients = append(s.accountClients, c) s.mu.Unlock() return AccountNotificationsClient{ C: c, server: s, } } // AccountSpentnessNotifications registers a client for spentness changes of outputs controlled by the account. func (s *NotificationServer) AccountSpentnessNotifications(account uint32) SpentnessNotificationsClient { c := make(chan *SpentnessNotifications) s.mu.Lock() s.spentness[account] = append(s.spentness[account], c) s.mu.Unlock() return SpentnessNotificationsClient{ C: c, account: account, server: s, } } // TransactionNotifications returns a client for receiving TransactionNotifications notifications over a channel. The // channel is unbuffered. // // When finished, the Done method should be called on the client to disassociate it from the server. func (s *NotificationServer) TransactionNotifications() TransactionNotificationsClient { c := make(chan *TransactionNotifications) s.mu.Lock() s.transactions = append(s.transactions, c) s.mu.Unlock() return TransactionNotificationsClient{ C: c, server: s, } } func (s *NotificationServer) notifyAccountProperties(props *waddrmgr.AccountProperties) { defer s.mu.Unlock() s.mu.Lock() clients := s.accountClients if len(clients) == 0 { return } n := &AccountNotification{ AccountNumber: props.AccountNumber, AccountName: props.AccountName, ExternalKeyCount: props.ExternalKeyCount, InternalKeyCount: props.InternalKeyCount, ImportedKeyCount: props.ImportedKeyCount, } for _, c := range clients { c <- n } } func (s *NotificationServer) notifyAttachedBlock(dbtx walletdb.ReadTx, block *wtxmgr.BlockMeta) { if s.currentTxNtfn == nil { s.currentTxNtfn = &TransactionNotifications{} } // Add block details if it wasn't already included for previously notified mined transactions. n := len(s.currentTxNtfn.AttachedBlocks) if n == 0 || *s.currentTxNtfn.AttachedBlocks[n-1].Hash != block.Hash { s.currentTxNtfn.AttachedBlocks = append( s.currentTxNtfn.AttachedBlocks, Block{ Hash: &block.Hash, Height: block.Height, Timestamp: block.Time.Unix(), }, ) } // For now (until notification coalescing isn't necessary) just use chain length to determine if this is the new // best block. if s.wallet.ChainSynced() { if len(s.currentTxNtfn.DetachedBlocks) >= len(s.currentTxNtfn.AttachedBlocks) { return } } defer s.mu.Unlock() s.mu.Lock() clients := s.transactions if len(clients) == 0 { s.currentTxNtfn = nil return } // The UnminedTransactions field is intentionally not set. Since the hashes of all detached blocks are reported, and // all transactions moved from a mined block back to unconfirmed are either in the UnminedTransactionHashes slice or // don't exist due to conflicting with a mined transaction in the new best chain, there is no possiblity of a new, // previously unseen transaction appearing in unconfirmed. txmgrNs := dbtx.ReadBucket(wtxmgrNamespaceKey) unminedHashes, e := s.wallet.TxStore.UnminedTxHashes(txmgrNs) if e != nil { E.Ln( "cannot fetch unmined transaction hashes:", e, ) return } s.currentTxNtfn.UnminedTransactionHashes = unminedHashes bals := make(map[uint32]amt.Amount) for _, b := range s.currentTxNtfn.AttachedBlocks { relevantAccounts(s.wallet, bals, b.Transactions) } e = totalBalances(dbtx, s.wallet, bals) if e != nil { E.Ln( "cannot determine balances for relevant accounts:", e, ) return } s.currentTxNtfn.NewBalances = flattenBalanceMap(bals) for _, c := range clients { c <- s.currentTxNtfn } s.currentTxNtfn = nil } func (s *NotificationServer) notifyDetachedBlock(hash *chainhash.Hash) { if s.currentTxNtfn == nil { s.currentTxNtfn = &TransactionNotifications{} } s.currentTxNtfn.DetachedBlocks = append(s.currentTxNtfn.DetachedBlocks, hash) } func (s *NotificationServer) notifyMinedTransaction( dbtx walletdb.ReadTx, details *wtxmgr.TxDetails, block *wtxmgr.BlockMeta, ) { if s.currentTxNtfn == nil { s.currentTxNtfn = &TransactionNotifications{} } n := len(s.currentTxNtfn.AttachedBlocks) if n == 0 || *s.currentTxNtfn.AttachedBlocks[n-1].Hash != block.Hash { s.currentTxNtfn.AttachedBlocks = append( s.currentTxNtfn.AttachedBlocks, Block{ Hash: &block.Hash, Height: block.Height, Timestamp: block.Time.Unix(), }, ) n++ } txs := s.currentTxNtfn.AttachedBlocks[n-1].Transactions s.currentTxNtfn.AttachedBlocks[n-1].Transactions = append(txs, makeTxSummary(dbtx, s.wallet, details)) } // // notifySpentOutput notifies registered clients that a previously-unspent // // output is now spent, and includes the spender hash and input index in the // // notification. // func (s *NotificationServer) notifySpentOutput(account uint32, op *wire.OutPoint, spenderHash *chainhash.Hash, spenderIndex uint32) { // defer s.mu.Unlock() // s.mu.Lock() // clients := s.spentness[account] // if len(clients) == 0 { // return // } // n := &SpentnessNotifications{ // hash: &op.Hash, // index: op.Index, // spenderHash: spenderHash, // spenderIndex: spenderIndex, // } // for _, c := range clients { // c <- n // } // } func (s *NotificationServer) notifyUnminedTransaction(dbtx walletdb.ReadTx, details *wtxmgr.TxDetails) { // Sanity check: should not be currently coalescing a notification for mined transactions at the same time that an // unmined tx is notified. if s.currentTxNtfn != nil { E.Ln( "notifying unmined tx notification (", details.Hash.String(), ") while creating notification for blocks", ) } defer s.mu.Unlock() s.mu.Lock() clients := s.transactions if len(clients) == 0 { return } unminedTxs := []TransactionSummary{makeTxSummary(dbtx, s.wallet, details)} unminedHashes, e := s.wallet.TxStore.UnminedTxHashes(dbtx.ReadBucket(wtxmgrNamespaceKey)) if e != nil { E.Ln( "cannot fetch unmined transaction hashes:", e, ) return } bals := make(map[uint32]amt.Amount) relevantAccounts(s.wallet, bals, unminedTxs) e = totalBalances(dbtx, s.wallet, bals) if e != nil { E.Ln( "cannot determine balances for relevant accounts:", e, ) return } n := &TransactionNotifications{ UnminedTransactions: unminedTxs, UnminedTransactionHashes: unminedHashes, NewBalances: flattenBalanceMap(bals), } for _, c := range clients { c <- n } } // notifyUnspentOutput notifies registered clients of a new unspent output that is controlled by the wallet. func (s *NotificationServer) notifyUnspentOutput(account uint32, hash *chainhash.Hash, index uint32) { defer s.mu.Unlock() s.mu.Lock() clients := s.spentness[account] if len(clients) == 0 { return } n := &SpentnessNotifications{ hash: hash, index: index, } for _, c := range clients { c <- n } } // Hash returns the transaction hash of the spent output. func (n *SpentnessNotifications) Hash() *chainhash.Hash { return n.hash } // Index returns the transaction output index of the spent output. func (n *SpentnessNotifications) Index() uint32 { return n.index } // Spender returns the spending transaction's hash and input index, if any. If the output is unspent, the final bool // return is false. func (n *SpentnessNotifications) Spender() (*chainhash.Hash, uint32, bool) { return n.spenderHash, n.spenderIndex, n.spenderHash != nil } // Done unregisters the client from the server and drains any remaining messages. It must be called exactly once when // the client is finished receiving notifications. func (c *SpentnessNotificationsClient) Done() { go func() { // Drain notifications until the client channel is removed from the server and closed. for range c.C { } }() go func() { s := c.server s.mu.Lock() clients := s.spentness[c.account] for i, ch := range clients { if c.C == ch { clients[i] = clients[len(clients)-1] s.spentness[c.account] = clients[:len(clients)-1] close(ch) break } } s.mu.Unlock() }() } // Done unregisters the client from the server and drains any remaining messages. It must be called exactly once when // the client is finished receiving notifications. func (c *TransactionNotificationsClient) Done() { go func() { // Drain notifications until the client channel is removed from the server and closed. for range c.C { } }() go func() { s := c.server s.mu.Lock() clients := s.transactions for i, ch := range clients { if c.C == ch { clients[i] = clients[len(clients)-1] s.transactions = clients[:len(clients)-1] close(ch) break } } s.mu.Unlock() }() } func flattenBalanceMap(m map[uint32]amt.Amount) []AccountBalance { s := make([]AccountBalance, 0, len(m)) for k, v := range m { s = append(s, AccountBalance{Account: k, TotalBalance: v}) } return s } func lookupInputAccount(dbtx walletdb.ReadTx, w *Wallet, details *wtxmgr.TxDetails, deb wtxmgr.DebitRecord) uint32 { addrmgrNs := dbtx.ReadBucket(waddrmgrNamespaceKey) txmgrNs := dbtx.ReadBucket(wtxmgrNamespaceKey) // TODO: Debits should record which account(s?) they // debit from so this doesn't need to be looked up. prevOP := &details.MsgTx.TxIn[deb.Index].PreviousOutPoint prev, e := w.TxStore.TxDetails(txmgrNs, &prevOP.Hash) if e != nil { E.F( "cannot query previous transaction details for %v: %v", prevOP.Hash, e, ) return 0 } if prev == nil { E.Ln( "missing previous transaction", prevOP.Hash, ) return 0 } prevOut := prev.MsgTx.TxOut[prevOP.Index] var addrs []btcaddr.Address _, addrs, _, e = txscript.ExtractPkScriptAddrs(prevOut.PkScript, w.chainParams) var inputAcct uint32 if e == nil && len(addrs) > 0 { _, inputAcct, e = w.Manager.AddrAccount(addrmgrNs, addrs[0]) } if e != nil { E.F( "cannot fetch account for previous output %v: %v", prevOP, e, ) inputAcct = 0 } return inputAcct } func lookupOutputChain( dbtx walletdb.ReadTx, w *Wallet, details *wtxmgr.TxDetails, cred wtxmgr.CreditRecord, ) (account uint32, internal bool) { addrmgrNs := dbtx.ReadBucket(waddrmgrNamespaceKey) output := details.MsgTx.TxOut[cred.Index] var addrs []btcaddr.Address var e error _, addrs, _, e = txscript.ExtractPkScriptAddrs(output.PkScript, w.chainParams) var ma waddrmgr.ManagedAddress if e == nil && len(addrs) > 0 { ma, e = w.Manager.Address(addrmgrNs, addrs[0]) } if e != nil { E.Ln( "cannot fetch account for wallet output:", e, ) } else { account = ma.Account() internal = ma.Internal() } return } func makeTxSummary(dbtx walletdb.ReadTx, w *Wallet, details *wtxmgr.TxDetails) TransactionSummary { serializedTx := details.SerializedTx if serializedTx == nil { var buf bytes.Buffer e := details.MsgTx.Serialize(&buf) if e != nil { E.Ln("transaction serialization:", e) } serializedTx = buf.Bytes() } var fee amt.Amount if len(details.Debits) == len(details.MsgTx.TxIn) { for _, deb := range details.Debits { fee += deb.Amount } for _, txOut := range details.MsgTx.TxOut { fee -= amt.Amount(txOut.Value) } } var inputs []TransactionSummaryInput if len(details.Debits) != 0 { inputs = make([]TransactionSummaryInput, len(details.Debits)) for i, d := range details.Debits { inputs[i] = TransactionSummaryInput{ Index: d.Index, PreviousAccount: lookupInputAccount(dbtx, w, details, d), PreviousAmount: d.Amount, } } } outputs := make([]TransactionSummaryOutput, 0, len(details.MsgTx.TxOut)) for i := range details.MsgTx.TxOut { credIndex := len(outputs) mine := len(details.Credits) > credIndex && details.Credits[credIndex].Index == uint32(i) if !mine { continue } acct, internal := lookupOutputChain(dbtx, w, details, details.Credits[credIndex]) output := TransactionSummaryOutput{ Index: uint32(i), Account: acct, Internal: internal, } outputs = append(outputs, output) } return TransactionSummary{ Hash: &details.Hash, Transaction: serializedTx, MyInputs: inputs, MyOutputs: outputs, Fee: fee, Timestamp: details.Received.Unix(), } } func newNotificationServer(wallet *Wallet) *NotificationServer { return &NotificationServer{ spentness: make(map[uint32][]chan *SpentnessNotifications), wallet: wallet, } } func relevantAccounts(w *Wallet, m map[uint32]amt.Amount, txs []TransactionSummary) { for _, tx := range txs { for _, d := range tx.MyInputs { m[d.PreviousAccount] = 0 } for _, c := range tx.MyOutputs { m[c.Account] = 0 } } } func totalBalances(dbtx walletdb.ReadTx, w *Wallet, m map[uint32]amt.Amount) (e error) { addrmgrNs := dbtx.ReadBucket(waddrmgrNamespaceKey) unspent, e := w.TxStore.UnspentOutputs(dbtx.ReadBucket(wtxmgrNamespaceKey)) if e != nil { return e } for i := range unspent { output := &unspent[i] var outputAcct uint32 var addrs []btcaddr.Address _, addrs, _, e = txscript.ExtractPkScriptAddrs( output.PkScript, w.chainParams, ) if e == nil && len(addrs) > 0 { _, outputAcct, e = w.Manager.AddrAccount(addrmgrNs, addrs[0]) } if e == nil { _, ok := m[outputAcct] if ok { m[outputAcct] += output.Amount } } } return nil }