Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mm: Fix balance handling for shared txs across matches #3138

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 51 additions & 29 deletions client/mm/exchange_adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,13 @@ type pendingDEXOrder struct {

// swaps, redeems, and refunds are caches of transactions. This avoids
// having to query the wallet for transactions that are already confirmed.
txsMtx sync.RWMutex
swaps map[string]*asset.WalletTransaction
redeems map[string]*asset.WalletTransaction
refunds map[string]*asset.WalletTransaction
txsMtx sync.RWMutex
swaps map[string]*asset.WalletTransaction
swapCoinIDToTxID map[string]string
redeems map[string]*asset.WalletTransaction
redeemCoinIDToTxID map[string]string
refunds map[string]*asset.WalletTransaction
refundCoinIDToTxID map[string]string
// txsMtx is required to be locked for writes to state
state atomic.Value // *dexOrderState

Expand Down Expand Up @@ -699,8 +702,13 @@ type dexOrderInfo struct {
func (u *unifiedExchangeAdaptor) updateDEXOrderEvent(o *pendingDEXOrder, complete bool) {
o.txsMtx.RLock()
transactions := make([]*asset.WalletTransaction, 0, len(o.swaps)+len(o.redeems)+len(o.refunds))
txIDSeen := make(map[string]bool)
addTxs := func(txs map[string]*asset.WalletTransaction) {
for _, tx := range txs {
if txIDSeen[tx.ID] {
continue
}
txIDSeen[tx.ID] = true
transactions = append(transactions, tx)
}
}
Expand Down Expand Up @@ -994,13 +1002,16 @@ func (u *unifiedExchangeAdaptor) placeMultiTrade(placements []*dexOrderInfo, sel
cexEffects.Reserved[toAsset] = reserved

pendingOrder := &pendingDEXOrder{
eventLogID: u.eventLogID.Add(1),
timestamp: time.Now().Unix(),
swaps: make(map[string]*asset.WalletTransaction),
redeems: make(map[string]*asset.WalletTransaction),
refunds: make(map[string]*asset.WalletTransaction),
placementIndex: placements[i].placementIndex,
counterTradeRate: placements[i].counterTradeRate,
eventLogID: u.eventLogID.Add(1),
timestamp: time.Now().Unix(),
swaps: make(map[string]*asset.WalletTransaction),
redeems: make(map[string]*asset.WalletTransaction),
refunds: make(map[string]*asset.WalletTransaction),
swapCoinIDToTxID: make(map[string]string),
redeemCoinIDToTxID: make(map[string]string),
refundCoinIDToTxID: make(map[string]string),
placementIndex: placements[i].placementIndex,
counterTradeRate: placements[i].counterTradeRate,
}

pendingOrder.state.Store(
Expand Down Expand Up @@ -2532,9 +2543,10 @@ func dexOrderComplete(o *core.Order) bool {
return o.AllFeesConfirmed
}

// orderTransactions returns all of the swap, redeem, and refund transactions
// involving a dex order.
func orderTransactions(o *core.Order) (swaps map[string]bool, redeems map[string]bool, refunds map[string]bool) {
// orderCoinIDs returns all of the swap, redeem, and refund transactions
// involving a dex order. There may be multiple coin IDs representing the
// same transaction.
func orderCoinIDs(o *core.Order) (swaps map[string]bool, redeems map[string]bool, refunds map[string]bool) {
swaps = make(map[string]bool)
redeems = make(map[string]bool)
refunds = make(map[string]bool)
Expand Down Expand Up @@ -2652,34 +2664,44 @@ func dexOrderEffects(o *core.Order, swaps, redeems, refunds map[string]*asset.Wa
// The mutex only needs to be locked for reading if the caller wants a consistent
// view of the transactions and the state.
func (p *pendingDEXOrder) updateState(o *core.Order, getTx func(uint32, string) (*asset.WalletTransaction, error), baseTraits, quoteTraits asset.WalletTrait) {
swaps, redeems, refunds := orderTransactions(o)
swaps, redeems, refunds := orderCoinIDs(o)

// Add new txs to tx cache
fromAsset, _, toAsset, _ := orderAssets(o.BaseID, o.QuoteID, o.Sell)
processTxs := func(assetID uint32, m map[string]*asset.WalletTransaction, txs map[string]bool) {
// Add new txs to tx cache
for txID := range txs {
if _, found := m[txID]; !found {
m[txID] = &asset.WalletTransaction{}
}
}

processTxs := func(assetID uint32, txs map[string]*asset.WalletTransaction, coinIDs map[string]bool, coinIDToTxID map[string]string) {
// Query the wallet regarding all unconfirmed transactions
for txID, oldTx := range m {
for txID, oldTx := range txs {
if oldTx.Confirmed {
continue
}
tx, err := getTx(assetID, txID)
if err != nil {
// p.log.Errorf("Error getting tx %s: %v", txID, err)
continue
}
m[txID] = tx
txs[tx.ID] = tx
}

// Add new txs to tx cache
for coinID := range coinIDs {
txID, found := coinIDToTxID[coinID]
if found {
continue
}
if _, found := txs[txID]; found {
continue
}
tx, err := getTx(assetID, coinID)
if err != nil {
continue
}
coinIDToTxID[coinID] = tx.ID
txs[tx.ID] = tx
}
}

processTxs(fromAsset, p.swaps, swaps)
processTxs(toAsset, p.redeems, redeems)
processTxs(fromAsset, p.refunds, refunds)
processTxs(fromAsset, p.swaps, swaps, p.swapCoinIDToTxID)
processTxs(toAsset, p.redeems, redeems, p.redeemCoinIDToTxID)
processTxs(fromAsset, p.refunds, refunds, p.refundCoinIDToTxID)

dexEffects, cexEffects := dexOrderEffects(o, p.swaps, p.redeems, p.refunds, p.counterTradeRate, baseTraits, quoteTraits)
p.state.Store(&dexOrderState{
Expand Down
Loading
Loading