Skip to content

Commit

Permalink
server: Fix reconnection to dcrlnd
Browse files Browse the repository at this point in the history
This fixes the server's Run method to properly attempt reconnections to
the dcrlnd node in case of errors.

Previously, two things were wrong in this method:

- The context used in some calls was ctx instead of gctx, which caused
  some of the subsystems to not exit correctly on failures and thus mask
  the underlying connection error.
- There was no re-attempt at failed operations, when the reason for
  failure was a connection error (as opposed to a graceful termination
  of the server).

This commit fixes both issues by using the correct context everywhere
and ensuring all subsystems re-attempt their functions until the context
is canceled, with an appropriate delay.
  • Loading branch information
matheusd committed Mar 22, 2024
1 parent 50a4360 commit 8cf7a36
Showing 1 changed file with 63 additions and 41 deletions.
104 changes: 63 additions & 41 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,54 +354,74 @@ func (s *Server) openChannel(ctx context.Context, winv waitingInvoice) {

// listenToInvoices reacts to invoice events.
func (s *Server) listenToInvoices(ctx context.Context) error {
stream, err := s.lc.SubscribeInvoices(ctx, &lnrpc.InvoiceSubscription{})
if err != nil {
return err

delay := func() {
select {
case <-time.After(time.Second):
case <-ctx.Done():
}
}

for {
inv, err := stream.Recv()
nextConn:
for ctx.Err() == nil {
stream, err := s.lc.SubscribeInvoices(ctx, &lnrpc.InvoiceSubscription{})
if err != nil {
return err
s.log.Errorf("Unable to subscribe to invoices: %v", err)
delay()
continue nextConn
}

switch {
case inv.State == lnrpc.Invoice_CANCELED:
fpath := filepath.Join(s.root, invoicesDir,
hex.EncodeToString(inv.RHash))
if err := s.removeFile(fpath); err != nil {
return err
}

case inv.State == lnrpc.Invoice_SETTLED:
fpath := filepath.Join(s.root, invoicesDir,
hex.EncodeToString(inv.RHash))
var winv waitingInvoice
err := s.readJsonFile(fpath, &winv)
if errors.Is(err, errNotExists) {
// Payment for something that isn't channel
// creation.
continue
}
for {
inv, err := stream.Recv()
if err != nil {
return err
s.log.Errorf("Unable to receive next invoice update: %v", err)
delay()
continue nextConn
}

wantAtoms := int64(s.amountForNewChan(winv.ChannelSize))
if inv.AmtPaidAtoms < wantAtoms {
s.log.Warnf("Received payment for invoice %x "+
"lower than required (%d < %d)",
inv.AmtPaidAtoms < wantAtoms)
continue
}
switch {
case inv.State == lnrpc.Invoice_CANCELED:
fpath := filepath.Join(s.root, invoicesDir,
hex.EncodeToString(inv.RHash))
if err := s.removeFile(fpath); err != nil {
return err
}

case inv.State == lnrpc.Invoice_SETTLED:
fpath := filepath.Join(s.root, invoicesDir,
hex.EncodeToString(inv.RHash))
var winv waitingInvoice
err := s.readJsonFile(fpath, &winv)
if errors.Is(err, errNotExists) {
// Payment for something that isn't channel
// creation.
continue
}
if err != nil {
// Fatal failure.
return err
}

// Create channel.
if err := s.removeFile(fpath); err != nil {
return err
wantAtoms := int64(s.amountForNewChan(winv.ChannelSize))
if inv.AmtPaidAtoms < wantAtoms {
s.log.Warnf("Received payment for invoice %x "+
"lower than required (%d < %d)",
inv.AmtPaidAtoms < wantAtoms)
continue
}

// Create channel.
if err := s.removeFile(fpath); err != nil {
// Fatal failure.
return err
}
go s.openChannel(ctx, winv)
}
go s.openChannel(ctx, winv)

}
}

return ctx.Err()
}

// closeChannel closes the given channel.
Expand Down Expand Up @@ -527,7 +547,7 @@ func (s *Server) FetchManagedChannels(ctx context.Context) (res ManagementInfo,
// manage the channels.
bal, err := s.lc.WalletBalance(ctx, &lnrpc.WalletBalanceRequest{})
if err != nil {
return res, err
return res, fmt.Errorf("unable to fetch wallet balance: %v", err)
}
res.WalletBalance = dcrutil.Amount(bal.TotalBalance)

Expand Down Expand Up @@ -697,7 +717,8 @@ func (s *Server) runManageChannels(ctx context.Context) error {
case <-time.After(s.cfg.CloseCheckInterval):
err := s.manageChannels(ctx)
if err != nil {
return err
s.log.Errorf("Unable to manage channels in "+
"this inverval: %v", err)
}
case <-ctx.Done():
return ctx.Err()
Expand All @@ -717,8 +738,8 @@ func (s *Server) Run(ctx context.Context) error {
g.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-gctx.Done():
return gctx.Err()
case <-time.After(time.Minute):
}

Expand All @@ -740,12 +761,13 @@ func (s *Server) Run(ctx context.Context) error {
})

// Close low activity channels.
g.Go(func() error { return s.runManageChannels(ctx) })
g.Go(func() error { return s.runManageChannels(gctx) })

// Shutdown conn once an error occurrs. This unblocks any outstanding
// calls.
g.Go(func() error {
<-gctx.Done()
s.log.Infof("Closing connection to dcrlnd")
if err := s.conn.Close(); err != nil {
s.log.Warnf("Error while closing conn: %v", err)
}
Expand Down

0 comments on commit 8cf7a36

Please sign in to comment.