Skip to content

Commit

Permalink
feat: protocol debug logs using configured logger (#736)
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Gianelloni <[email protected]>
  • Loading branch information
wolf31o2 authored Oct 2, 2024
1 parent e65b6dd commit 2243736
Show file tree
Hide file tree
Showing 12 changed files with 441 additions and 214 deletions.
19 changes: 19 additions & 0 deletions protocol/blockfetch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
Name: ProtocolName,
ProtocolId: ProtocolId,
Muxer: protoOptions.Muxer,
Logger: protoOptions.Logger,
ErrorChan: protoOptions.ErrorChan,
Mode: protoOptions.Mode,
Role: protocol.ProtocolRoleClient,
Expand All @@ -80,6 +81,8 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {

func (c *Client) Start() {
c.onceStart.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("starting protocol: %s", ProtocolName))
c.Protocol.Start()
// Start goroutine to cleanup resources on protocol shutdown
go func() {
Expand All @@ -93,6 +96,8 @@ func (c *Client) Start() {
func (c *Client) Stop() error {
var err error
c.onceStop.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("stopping protocol: %s", ProtocolName))
msg := NewMsgClientDone()
err = c.SendMessage(msg)
})
Expand All @@ -101,6 +106,8 @@ func (c *Client) Stop() error {

// GetBlockRange starts an async process to fetch all blocks in the specified range (inclusive)
func (c *Client) GetBlockRange(start common.Point, end common.Point) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("client called %s GetBlockRange(start: %+v, end: %+v)", ProtocolName, start, end))
c.busyMutex.Lock()
c.blockUseCallback = true
msg := NewMsgRequestRange(start, end)
Expand All @@ -121,6 +128,8 @@ func (c *Client) GetBlockRange(start common.Point, end common.Point) error {

// GetBlock requests and returns a single block specified by the provided point
func (c *Client) GetBlock(point common.Point) (ledger.Block, error) {
c.Protocol.Logger().
Debug(fmt.Sprintf("client called %s GetBlock(point: %+v)", ProtocolName, point))
c.busyMutex.Lock()
c.blockUseCallback = false
msg := NewMsgRequestRange(point, point)
Expand All @@ -144,6 +153,8 @@ func (c *Client) GetBlock(point common.Point) (ledger.Block, error) {
}

func (c *Client) messageHandler(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client message for %s", ProtocolName))
var err error
switch msg.Type() {
case MessageTypeStartBatch:
Expand All @@ -165,17 +176,23 @@ func (c *Client) messageHandler(msg protocol.Message) error {
}

func (c *Client) handleStartBatch() error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client start batch for %s", ProtocolName))
c.startBatchResultChan <- nil
return nil
}

func (c *Client) handleNoBlocks() error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client no blocks found for %s", ProtocolName))
err := fmt.Errorf("block(s) not found")
c.startBatchResultChan <- err
return nil
}

func (c *Client) handleBlock(msgGeneric protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client block found for %s", ProtocolName))
msg := msgGeneric.(*MsgBlock)
// Decode only enough to get the block type value
var wrappedBlock WrappedBlock
Expand All @@ -201,6 +218,8 @@ func (c *Client) handleBlock(msgGeneric protocol.Message) error {
}

func (c *Client) handleBatchDone() error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client batch done for %s", ProtocolName))
c.busyMutex.Unlock()
return nil
}
15 changes: 15 additions & 0 deletions protocol/blockfetch/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (s *Server) initProtocol() {
Name: ProtocolName,
ProtocolId: ProtocolId,
Muxer: s.protoOptions.Muxer,
Logger: s.protoOptions.Logger,
ErrorChan: s.protoOptions.ErrorChan,
Mode: s.protoOptions.Mode,
Role: protocol.ProtocolRoleServer,
Expand All @@ -59,16 +60,22 @@ func (s *Server) initProtocol() {
}

func (s *Server) NoBlocks() error {
s.Protocol.Logger().
Debug(fmt.Sprintf("server called %s NoBlocks()", ProtocolName))
msg := NewMsgNoBlocks()
return s.SendMessage(msg)
}

func (s *Server) StartBatch() error {
s.Protocol.Logger().
Debug(fmt.Sprintf("server called %s StartBatch()", ProtocolName))
msg := NewMsgStartBatch()
return s.SendMessage(msg)
}

func (s *Server) Block(blockType uint, blockData []byte) error {
s.Protocol.Logger().
Debug(fmt.Sprintf("server called %s Block(blockType: %+v, blockData: %x)", ProtocolName, blockType, blockData))
wrappedBlock := WrappedBlock{
Type: blockType,
RawBlock: blockData,
Expand All @@ -82,11 +89,15 @@ func (s *Server) Block(blockType uint, blockData []byte) error {
}

func (s *Server) BatchDone() error {
s.Protocol.Logger().
Debug(fmt.Sprintf("server called %s BatchDone()", ProtocolName))
msg := NewMsgBatchDone()
return s.SendMessage(msg)
}

func (s *Server) messageHandler(msg protocol.Message) error {
s.Protocol.Logger().
Debug(fmt.Sprintf("handling server message for %s", ProtocolName))
var err error
switch msg.Type() {
case MessageTypeRequestRange:
Expand All @@ -104,6 +115,8 @@ func (s *Server) messageHandler(msg protocol.Message) error {
}

func (s *Server) handleRequestRange(msg protocol.Message) error {
s.Protocol.Logger().
Debug(fmt.Sprintf("handling server request range for %s", ProtocolName))
if s.config == nil || s.config.RequestRangeFunc == nil {
return fmt.Errorf(
"received block-fetch RequestRange message but no callback function is defined",
Expand All @@ -118,6 +131,8 @@ func (s *Server) handleRequestRange(msg protocol.Message) error {
}

func (s *Server) handleClientDone() error {
s.Protocol.Logger().
Debug(fmt.Sprintf("handling server client done for %s", ProtocolName))
// Restart protocol
s.Protocol.Stop()
s.initProtocol()
Expand Down
69 changes: 46 additions & 23 deletions protocol/chainsync/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func NewClient(
Name: ProtocolName,
ProtocolId: ProtocolId,
Muxer: protoOptions.Muxer,
Logger: protoOptions.Logger,
ErrorChan: protoOptions.ErrorChan,
Mode: protoOptions.Mode,
Role: protocol.ProtocolRoleClient,
Expand All @@ -115,6 +116,8 @@ func NewClient(

func (c *Client) Start() {
c.onceStart.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("starting protocol: %s", ProtocolName))
c.Protocol.Start()
// Start goroutine to cleanup resources on protocol shutdown
go func() {
Expand All @@ -124,33 +127,12 @@ func (c *Client) Start() {
})
}

func (c *Client) messageHandler(msg protocol.Message) error {
var err error
switch msg.Type() {
case MessageTypeAwaitReply:
err = c.handleAwaitReply()
case MessageTypeRollForward:
err = c.handleRollForward(msg)
case MessageTypeRollBackward:
err = c.handleRollBackward(msg)
case MessageTypeIntersectFound:
err = c.handleIntersectFound(msg)
case MessageTypeIntersectNotFound:
err = c.handleIntersectNotFound(msg)
default:
err = fmt.Errorf(
"%s: received unexpected message type %d",
ProtocolName,
msg.Type(),
)
}
return err
}

// Stop transitions the protocol to the Done state. No more protocol operations will be possible afterward
func (c *Client) Stop() error {
var err error
c.onceStop.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("stopping protocol: %s", ProtocolName))
c.busyMutex.Lock()
defer c.busyMutex.Unlock()
msg := NewMsgDone()
Expand All @@ -163,6 +145,8 @@ func (c *Client) Stop() error {

// GetCurrentTip returns the current chain tip
func (c *Client) GetCurrentTip() (*Tip, error) {
c.Protocol.Logger().
Debug(fmt.Sprintf("client called %s GetCurrentTip()", ProtocolName))
done := atomic.Bool{}
requestResultChan := make(chan Tip, 1)
requestErrorChan := make(chan error, 1)
Expand Down Expand Up @@ -220,6 +204,8 @@ func (c *Client) GetCurrentTip() (*Tip, error) {
func (c *Client) GetAvailableBlockRange(
intersectPoints []common.Point,
) (common.Point, common.Point, error) {
c.Protocol.Logger().
Debug(fmt.Sprintf("client called %s GetAvailableBlockRange(intersectPoints: %+v)", ProtocolName, intersectPoints))
c.busyMutex.Lock()
defer c.busyMutex.Unlock()

Expand Down Expand Up @@ -293,6 +279,8 @@ func (c *Client) GetAvailableBlockRange(
// Sync begins a chain-sync operation using the provided intersect point(s). Incoming blocks will be delivered
// via the RollForward callback function specified in the protocol config
func (c *Client) Sync(intersectPoints []common.Point) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("client called %s Sync(intersectPoints: %+v)", ProtocolName, intersectPoints))
c.busyMutex.Lock()
defer c.busyMutex.Unlock()
// Use origin if no intersect points were specified
Expand Down Expand Up @@ -441,11 +429,40 @@ func (c *Client) requestFindIntersect(
}
}

func (c *Client) messageHandler(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client message for %s", ProtocolName))
var err error
switch msg.Type() {
case MessageTypeAwaitReply:
err = c.handleAwaitReply()
case MessageTypeRollForward:
err = c.handleRollForward(msg)
case MessageTypeRollBackward:
err = c.handleRollBackward(msg)
case MessageTypeIntersectFound:
err = c.handleIntersectFound(msg)
case MessageTypeIntersectNotFound:
err = c.handleIntersectNotFound(msg)
default:
err = fmt.Errorf(
"%s: received unexpected message type %d",
ProtocolName,
msg.Type(),
)
}
return err
}

func (c *Client) handleAwaitReply() error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client await reply for %s", ProtocolName))
return nil
}

func (c *Client) handleRollForward(msgGeneric protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client roll forward for %s", ProtocolName))
firstBlockChan := func() chan<- clientPointResult {
select {
case ch := <-c.wantFirstBlockChan:
Expand Down Expand Up @@ -554,6 +571,8 @@ func (c *Client) handleRollForward(msgGeneric protocol.Message) error {
}

func (c *Client) handleRollBackward(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client roll backward for %s", ProtocolName))
msgRollBackward := msg.(*MsgRollBackward)
c.sendCurrentTip(msgRollBackward.Tip)
if len(c.wantFirstBlockChan) == 0 {
Expand All @@ -579,6 +598,8 @@ func (c *Client) handleRollBackward(msg protocol.Message) error {
}

func (c *Client) handleIntersectFound(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client intersect found for %s", ProtocolName))
msgIntersectFound := msg.(*MsgIntersectFound)
c.sendCurrentTip(msgIntersectFound.Tip)

Expand All @@ -591,6 +612,8 @@ func (c *Client) handleIntersectFound(msg protocol.Message) error {
}

func (c *Client) handleIntersectNotFound(msgGeneric protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client intersect not found for %s", ProtocolName))
msgIntersectNotFound := msgGeneric.(*MsgIntersectNotFound)
c.sendCurrentTip(msgIntersectNotFound.Tip)

Expand Down
17 changes: 17 additions & 0 deletions protocol/chainsync/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (s *Server) initProtocol() {
Name: ProtocolName,
ProtocolId: ProtocolId,
Muxer: s.protoOptions.Muxer,
Logger: s.protoOptions.Logger,
ErrorChan: s.protoOptions.ErrorChan,
Mode: s.protoOptions.Mode,
Role: protocol.ProtocolRoleServer,
Expand All @@ -77,16 +78,22 @@ func (s *Server) initProtocol() {
}

func (s *Server) RollBackward(point common.Point, tip Tip) error {
s.Protocol.Logger().
Debug(fmt.Sprintf("server called %s RollBackward(point: %+v, tip: %+v)", ProtocolName, point, tip))
msg := NewMsgRollBackward(point, tip)
return s.SendMessage(msg)
}

func (s *Server) AwaitReply() error {
s.Protocol.Logger().
Debug(fmt.Sprintf("server called %s AwaitReply()", ProtocolName))
msg := NewMsgAwaitReply()
return s.SendMessage(msg)
}

func (s *Server) RollForward(blockType uint, blockData []byte, tip Tip) error {
s.Protocol.Logger().
Debug(fmt.Sprintf("server called %s Rollforward(blockType: %+v, blockData: %x, tip: %+v)", ProtocolName, blockType, blockData, tip))
if s.Mode() == protocol.ProtocolModeNodeToNode {
eraId := ledger.BlockToBlockHeaderTypeMap[blockType]
msg := NewMsgRollForwardNtN(
Expand All @@ -107,6 +114,8 @@ func (s *Server) RollForward(blockType uint, blockData []byte, tip Tip) error {
}

func (s *Server) messageHandler(msg protocol.Message) error {
s.Protocol.Logger().
Debug(fmt.Sprintf("handling server message for %s", ProtocolName))
var err error
switch msg.Type() {
case MessageTypeRequestNext:
Expand All @@ -126,6 +135,10 @@ func (s *Server) messageHandler(msg protocol.Message) error {
}

func (s *Server) handleRequestNext() error {
// TODO: figure out why this one log message causes a panic (and only this one)
// during tests
// s.Protocol.Logger().
// Debug(fmt.Sprintf("handling server request next for %s", ProtocolName))
if s.config == nil || s.config.RequestNextFunc == nil {
return fmt.Errorf(
"received chain-sync RequestNext message but no callback function is defined",
Expand All @@ -135,6 +148,8 @@ func (s *Server) handleRequestNext() error {
}

func (s *Server) handleFindIntersect(msg protocol.Message) error {
s.Protocol.Logger().
Debug(fmt.Sprintf("handling server find intersect for %s", ProtocolName))
if s.config == nil || s.config.FindIntersectFunc == nil {
return fmt.Errorf(
"received chain-sync FindIntersect message but no callback function is defined",
Expand Down Expand Up @@ -163,6 +178,8 @@ func (s *Server) handleFindIntersect(msg protocol.Message) error {
}

func (s *Server) handleDone() error {
s.Protocol.Logger().
Debug(fmt.Sprintf("handling server done for %s", ProtocolName))
// Restart protocol
s.Protocol.Stop()
s.initProtocol()
Expand Down
Loading

0 comments on commit 2243736

Please sign in to comment.