Skip to content

Commit

Permalink
[Keystone] Minor bugfixes (#13436)
Browse files Browse the repository at this point in the history
1. Fix workflowName decoding in KeystoneFeedsConsumer.sol
2. Fix potential panic in triggerSubscriber.UnregisterTrigger() (KS-225) and make (un)register idempotent
3. Fix empty report check in write_target
4. Some comment/log improvements
  • Loading branch information
bolekk authored Jun 6, 2024
1 parent 9e0dfba commit f37afb9
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 18 deletions.
5 changes: 5 additions & 0 deletions .changeset/chatty-masks-divide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#internal #bugfix keystone bugfixes
5 changes: 5 additions & 0 deletions contracts/.changeset/witty-onions-relate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@chainlink/contracts': patch
---

#internal KeystoneFeedsConsumer bytes10 decoding bugfix
4 changes: 2 additions & 2 deletions contracts/src/v0.8/keystone/KeystoneFeedsConsumer.sol
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ contract KeystoneFeedsConsumer is IReceiver, ConfirmedOwner {
// workflow_owner // offset 74, size 20
// report_name // offset 94, size 2
assembly {
// shift right by 22 bytes to get the actual value
workflowName := shr(mul(22, 8), mload(add(metadata, 64)))
// no shifting needed for bytes10 type
workflowName := mload(add(metadata, 64))
// shift right by 12 bytes to get the actual value
workflowOwner := shr(mul(12, 8), mload(add(metadata, 74)))
}
Expand Down
28 changes: 20 additions & 8 deletions core/capabilities/remote/trigger_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type triggerEventKey struct {
}

type subRegState struct {
callback chan<- commoncap.CapabilityResponse
callback chan commoncap.CapabilityResponse
rawRequest []byte
}

Expand Down Expand Up @@ -103,14 +103,20 @@ func (s *triggerSubscriber) RegisterTrigger(ctx context.Context, request commonc
s.mu.Lock()
defer s.mu.Unlock()

callback := make(chan commoncap.CapabilityResponse, defaultSendChannelBufferSize)
s.registeredWorkflows[request.Metadata.WorkflowID] = &subRegState{
callback: callback,
rawRequest: rawRequest,
s.lggr.Infow("RegisterTrigger called", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "workflowID", request.Metadata.WorkflowID)
regState, ok := s.registeredWorkflows[request.Metadata.WorkflowID]
if !ok {
regState = &subRegState{
callback: make(chan commoncap.CapabilityResponse, defaultSendChannelBufferSize),
rawRequest: rawRequest,
}
s.registeredWorkflows[request.Metadata.WorkflowID] = regState
} else {
regState.rawRequest = rawRequest
s.lggr.Warnw("RegisterTrigger re-registering trigger", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "workflowID", request.Metadata.WorkflowID)
}

s.lggr.Infow("RegisterTrigger called", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "workflowID", request.Metadata.WorkflowID)
return callback, nil
return regState.callback, nil
}

func (s *triggerSubscriber) registrationLoop() {
Expand All @@ -124,6 +130,9 @@ func (s *triggerSubscriber) registrationLoop() {
case <-ticker.C:
s.mu.RLock()
s.lggr.Infow("register trigger for remote capability", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "nMembers", len(s.capDonInfo.Members), "nWorkflows", len(s.registeredWorkflows))
if len(s.registeredWorkflows) == 0 {
s.lggr.Infow("no workflows to register")
}
for _, registration := range s.registeredWorkflows {
// NOTE: send to all by default, introduce different strategies later (KS-76)
for _, peerID := range s.capDonInfo.Members {
Expand All @@ -149,7 +158,10 @@ func (s *triggerSubscriber) UnregisterTrigger(ctx context.Context, request commo
s.mu.Lock()
defer s.mu.Unlock()

close(s.registeredWorkflows[request.Metadata.WorkflowID].callback)
state := s.registeredWorkflows[request.Metadata.WorkflowID]
if state != nil && state.callback != nil {
close(state.callback)
}
delete(s.registeredWorkflows, request.Metadata.WorkflowID)
// Registrations will quickly expire on all remote nodes.
// Alternatively, we could send UnregisterTrigger messages right away.
Expand Down
7 changes: 5 additions & 2 deletions core/capabilities/remote/trigger_subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,12 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
subscriber := remote.NewTriggerSubscriber(config, capInfo, capDonInfo, workflowDonInfo, dispatcher, nil, lggr)
require.NoError(t, subscriber.Start(ctx))

triggerEventCallbackCh, err := subscriber.RegisterTrigger(ctx, commoncap.CapabilityRequest{
req := commoncap.CapabilityRequest{
Metadata: commoncap.RequestMetadata{
WorkflowID: workflowID1,
},
})
}
triggerEventCallbackCh, err := subscriber.RegisterTrigger(ctx, req)
require.NoError(t, err)
<-awaitRegistrationMessageCh

Expand All @@ -99,5 +100,7 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
response := <-triggerEventCallbackCh
require.Equal(t, response.Value, triggerEventValue)

require.NoError(t, subscriber.UnregisterTrigger(ctx, req))
require.NoError(t, subscriber.UnregisterTrigger(ctx, req))
require.NoError(t, subscriber.Close())
}
2 changes: 1 addition & 1 deletion core/capabilities/targets/write_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (cap *WriteTarget) Execute(ctx context.Context, request capabilities.Capabi
return nil, err
}

if inputs.Report == nil {
if len(inputs.Report) == 0 {
// We received any empty report -- this means we should skip transmission.
cap.lggr.Debugw("Skipping empty report", "request", request)
return success(), nil
Expand Down
2 changes: 1 addition & 1 deletion core/capabilities/targets/write_target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestWriteTarget(t *testing.T) {
t.Run("succeeds with empty report", func(t *testing.T) {
emptyInputs, err2 := values.NewMap(map[string]any{
"signed_report": map[string]any{
"report": nil,
"report": []byte{},
},
"signatures": [][]byte{},
})
Expand Down
8 changes: 4 additions & 4 deletions core/services/relay/evm/cap_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,22 +113,22 @@ func prependMetadataFields(meta consensustypes.Metadata, userPayload []byte) ([]
binary.BigEndian.PutUint32(cfgVersionBytes, meta.DONConfigVersion)
result = append(result, cfgVersionBytes...)

// 5. Workflow ID / spec hash (32 bytes)
// 6. Workflow ID / spec hash (32 bytes)
if result, err = decodeAndAppend(meta.WorkflowID, 32, result, "WorkflowID"); err != nil {
return nil, err
}

// 6. Workflow Name (10 bytes)
// 7. Workflow Name (10 bytes)
if result, err = decodeAndAppend(meta.WorkflowName, 10, result, "WorkflowName"); err != nil {
return nil, err
}

// 7. Workflow Owner (20 bytes)
// 8. Workflow Owner (20 bytes)
if result, err = decodeAndAppend(meta.WorkflowOwner, 20, result, "WorkflowOwner"); err != nil {
return nil, err
}

// 8. Report ID (2 bytes)
// 9. Report ID (2 bytes)
if result, err = decodeAndAppend(meta.ReportID, 2, result, "ReportID"); err != nil {
return nil, err
}
Expand Down

0 comments on commit f37afb9

Please sign in to comment.