Skip to content

Commit

Permalink
Node/CCQServer: Quorum not met (#3758)
Browse files Browse the repository at this point in the history
* Node/CCQServer: Quorum not met

* Better track failed queries

* Add total responses by chain and peer ID
  • Loading branch information
bruce-riley authored Feb 2, 2024
1 parent 696cb90 commit b48cb66
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 15 deletions.
14 changes: 11 additions & 3 deletions node/cmd/ccq/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) {
return
}

pendingResponse := NewPendingResponse(signedQueryRequest)
pendingResponse := NewPendingResponse(signedQueryRequest, permEntry.userName)
added := s.pendingResponses.Add(pendingResponse)
if !added {
s.logger.Info("duplicate request", zap.String("userId", permEntry.userName), zap.String("requestId", requestId))
Expand Down Expand Up @@ -168,14 +168,16 @@ func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) {
case <-time.After(query.RequestTimeout + 5*time.Second):
s.logger.Info("publishing time out to client", zap.String("userId", permEntry.userName), zap.String("requestId", requestId))
http.Error(w, "Timed out waiting for response", http.StatusGatewayTimeout)
queryTimeoutsByUser.WithLabelValues(permEntry.userName).Inc()
failedQueriesByUser.WithLabelValues(permEntry.userName).Inc()
case res := <-pendingResponse.ch:
s.logger.Info("publishing response to client", zap.String("userId", permEntry.userName), zap.String("requestId", requestId))
resBytes, err := res.Response.Marshal()
if err != nil {
s.logger.Error("failed to marshal response", zap.String("userId", permEntry.userName), zap.String("requestId", requestId), zap.Error(err))
http.Error(w, err.Error(), http.StatusInternalServerError)
invalidQueryRequestReceived.WithLabelValues("failed_to_marshal_response").Inc()
invalidRequestsByUser.WithLabelValues(permEntry.userName).Inc()
failedQueriesByUser.WithLabelValues(permEntry.userName).Inc()
break
}
// Signature indices must be ascending for on-chain verification
Expand All @@ -197,9 +199,15 @@ func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) {
s.logger.Error("failed to encode response", zap.String("userId", permEntry.userName), zap.String("requestId", requestId), zap.Error(err))
http.Error(w, err.Error(), http.StatusInternalServerError)
invalidQueryRequestReceived.WithLabelValues("failed_to_encode_response").Inc()
invalidRequestsByUser.WithLabelValues(permEntry.userName).Inc()
failedQueriesByUser.WithLabelValues(permEntry.userName).Inc()
break
}
successfulQueriesByUser.WithLabelValues(permEntry.userName).Inc()
case errEntry := <-pendingResponse.errCh:
s.logger.Info("publishing error response to client", zap.String("userId", permEntry.userName), zap.String("requestId", requestId), zap.Int("status", errEntry.status), zap.Error(errEntry.err))
http.Error(w, errEntry.err.Error(), errEntry.status)
// Metrics have already been pegged.
break
}

totalQueryTime.Observe(float64(time.Since(start).Milliseconds()))
Expand Down
30 changes: 30 additions & 0 deletions node/cmd/ccq/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,30 @@ var (
Help: "Total number of requests by user name",
}, []string{"user_name"})

successfulQueriesByUser = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "ccq_server_successful_queries_by_user",
Help: "Total number of successful queries by user name",
}, []string{"user_name"})

failedQueriesByUser = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "ccq_server_failed_queries_by_user",
Help: "Total number of failed queries by user name",
}, []string{"user_name"})

queryTimeoutsByUser = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "ccq_server_query_timeouts_by_user",
Help: "Total number of query timeouts by user name",
}, []string{"user_name"})

quorumNotMetByUser = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "ccq_server_quorum_not_met_by_user",
Help: "Total number of query failures due to quorum not met by user name",
}, []string{"user_name"})

invalidRequestsByUser = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "ccq_server_invalid_requests_by_user",
Expand All @@ -48,6 +72,12 @@ var (
Help: "Total number of query responses received by peer ID",
}, []string{"peer_id"})

queryResponsesReceivedByChainAndPeerID = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "ccq_server_total_query_responses_received_by_chain_and_peer_id",
Help: "Total number of query responses received by chain and peer ID",
}, []string{"chain_name", "peer_id"})

inboundP2pError = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "ccq_server_inbound_p2p_errors",
Expand Down
52 changes: 45 additions & 7 deletions node/cmd/ccq/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/hex"
"fmt"
"net/http"
"time"

"github.com/certusone/wormhole/node/pkg/p2p"
Expand Down Expand Up @@ -166,6 +167,9 @@ func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, boot
inboundP2pError.WithLabelValues("failed_to_unmarshal_response").Inc()
continue
}
for _, pcr := range queryResponse.PerChainResponses {
queryResponsesReceivedByChainAndPeerID.WithLabelValues(pcr.ChainId.String(), peerId).Inc()
}
requestSignature := hex.EncodeToString(queryResponse.Request.Signature)
logger.Info("query response received from gossip", zap.String("peerId", peerId), zap.Any("requestId", requestSignature))
if loggingMap.ShouldLogResponse(requestSignature) {
Expand Down Expand Up @@ -230,8 +234,9 @@ func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, boot
delete(responses, requestSignature)
select {
case pendingResponse.ch <- s:
logger.Info("forwarded query response",
logger.Info("quorum reached, forwarded query response",
zap.String("peerId", peerId),
zap.String("userId", pendingResponse.userName),
zap.Any("requestId", requestSignature),
zap.Int("numSigners", numSigners),
zap.Int("quorum", quorum),
Expand All @@ -241,12 +246,45 @@ func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, boot
// Leave the request in the pending map. It will get cleaned up if it times out.
}
} else {
logger.Info("waiting for more query responses",
zap.String("peerId", peerId),
zap.Any("requestId", requestSignature),
zap.Int("numSigners", numSigners),
zap.Int("quorum", quorum),
)
// Proxy should return early if quorum is no longer possible - i.e maxMatchingResponses + outstandingResponses < quorum
var totalSigners, maxMatchingResponses int
for _, signers := range responses[requestSignature] {
totalSigners += len(signers)
if len(signers) > maxMatchingResponses {
maxMatchingResponses = len(signers)
}
}
outstandingResponses := len(guardianSet.Keys) - totalSigners
if maxMatchingResponses+outstandingResponses < quorum {
quorumNotMetByUser.WithLabelValues(pendingResponse.userName).Inc()
failedQueriesByUser.WithLabelValues(pendingResponse.userName).Inc()
delete(responses, requestSignature)
select {
case pendingResponse.errCh <- &ErrorEntry{err: fmt.Errorf("quorum not met"), status: http.StatusBadRequest}:
logger.Info("query failed, quorum not met",
zap.String("peerId", peerId),
zap.String("userId", pendingResponse.userName),
zap.Any("requestId", requestSignature),
zap.Int("numSigners", numSigners),
zap.Int("maxMatchingResponses", maxMatchingResponses),
zap.Int("outstandingResponses", outstandingResponses),
zap.Int("quorum", quorum),
)
default:
logger.Error("failed to write query error response to channel, dropping it", zap.String("peerId", peerId), zap.Any("requestId", requestSignature))
// Leave the request in the pending map. It will get cleaned up if it times out.
}
} else {
logger.Info("waiting for more query responses",
zap.String("peerId", peerId),
zap.String("userId", pendingResponse.userName),
zap.Any("requestId", requestSignature),
zap.Int("numSigners", numSigners),
zap.Int("maxMatchingResponses", maxMatchingResponses),
zap.Int("outstandingResponses", outstandingResponses),
zap.Int("quorum", quorum),
)
}
}
} else {
logger.Warn("received observation by unknown guardian - is our guardian set outdated?",
Expand Down
19 changes: 14 additions & 5 deletions node/cmd/ccq/pending_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,23 @@ import (
)

type PendingResponse struct {
req *gossipv1.SignedQueryRequest
ch chan *SignedResponse
req *gossipv1.SignedQueryRequest
userName string
ch chan *SignedResponse
errCh chan *ErrorEntry
}

func NewPendingResponse(req *gossipv1.SignedQueryRequest) *PendingResponse {
type ErrorEntry struct {
err error
status int
}

func NewPendingResponse(req *gossipv1.SignedQueryRequest, userName string) *PendingResponse {
return &PendingResponse{
req: req,
ch: make(chan *SignedResponse),
req: req,
userName: userName,
ch: make(chan *SignedResponse),
errCh: make(chan *ErrorEntry),
}
}

Expand Down

0 comments on commit b48cb66

Please sign in to comment.