Skip to content

Commit

Permalink
Merge branch 'develop' of github.com:smartcontractkit/chainlink into …
Browse files Browse the repository at this point in the history
…develop
  • Loading branch information
Tofel committed Apr 22, 2024
2 parents a0d77c1 + 9d359de commit eb17edf
Show file tree
Hide file tree
Showing 184 changed files with 2,735 additions and 1,639 deletions.
5 changes: 5 additions & 0 deletions .changeset/fresh-lizards-love.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#internal Updates required to work with chainlink-common changes to support grpc streams for capabilities
5 changes: 5 additions & 0 deletions .changeset/lucky-ghosts-give.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

core/services/keystore: switch to sqlutil.DataStore #internal
5 changes: 5 additions & 0 deletions .changeset/plenty-wombats-grab.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#wip Regenerate Keystone wrappers
5 changes: 4 additions & 1 deletion .github/actions/golangci-lint/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ name: CI lint for Golang
description: Runs CI lint for Golang
inputs:
# general inputs
id:
description: Unique metrics collection id
required: true
name:
description: Name of the lint action
default: lint
Expand Down Expand Up @@ -72,7 +75,7 @@ runs:
if: always()
uses: smartcontractkit/push-gha-metrics-action@dea9b546553cb4ca936607c2267a09c004e4ab3f # v3.0.0
with:
id: chainlink-golang-ci
id: chainlink-golang-ci-${{ inputs.id }}
basic-auth: ${{ inputs.gc-basic-auth }}
hostname: ${{ inputs.gc-host }}
org-id: ${{ inputs.gc-org-id }}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/ci-core.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ jobs:
uses: ./.github/actions/golangci-lint
if: ${{ needs.filter.outputs.changes == 'true' }}
with:
id: core
gc-basic-auth: ${{ secrets.GRAFANA_INTERNAL_BASIC_AUTH }}
gc-host: ${{ secrets.GRAFANA_INTERNAL_HOST }}
gc-org-id: ${{ secrets.GRAFANA_INTERNAL_TENANT_ID }}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/ci-scripts.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ jobs:
- name: Golang Lint
uses: ./.github/actions/golangci-lint
with:
id: scripts
name: lint-scripts
go-directory: core/scripts
go-version-file: core/scripts/go.mod
Expand Down
27 changes: 16 additions & 11 deletions .github/workflows/helm-chart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,29 @@ on:
jobs:
ci-lint-helm-charts:
runs-on: ubuntu-latest
permissions:
id-token: write
contents: read
actions: read
steps:
- name: Add repositories
run: |
helm repo add mockserver https://www.mock-server.com
helm repo add opentelemetry-collector https://open-telemetry.github.io/opentelemetry-helm-charts
helm repo add tempo https://grafana.github.io/helm-charts
helm repo add grafana https://grafana.github.io/helm-charts
- name: ci-lint-helm-charts
uses: smartcontractkit/.github/actions/ci-lint-charts@6b08487b176ef7cad086526d0b54ddff6691c044 # ci-lint-charts@0.1.2
uses: smartcontractkit/.github/actions/ci-lint-charts@7fa39741b11e66ed59f8aad786d4b9356c389f3f # ci-lint-charts@0.2.0
with:
# chart testing inputs
chart-testing-extra-args: "--lint-conf=lintconf.yaml"
charts-dir: charts/chainlink-cluster
# grafana inputs
metrics-job-name: ci-lint-helm-charts
gc-basic-auth: ${{ secrets.GRAFANA_INTERNAL_BASIC_AUTH }}
gc-host: ${{ secrets.GRAFANA_INTERNAL_HOST }}
gc-org-id: ${{ secrets.GRAFANA_INTERNAL_TENANT_ID }}

ci-kubeconform:
runs-on: ubuntu-latest
steps:
- name: ci-kubeconform
uses: smartcontractkit/.github/actions/ci-kubeconform@1ae8a9a984814c4daf50aa96f03be2cba0ef3fec # [email protected]
with:
# kubeform inputs
charts-dir: charts/chainlink-cluster
# grafana inputs
metrics-job-name: ci-kubeconform
gc-basic-auth: ${{ secrets.GRAFANA_INTERNAL_BASIC_AUTH }}
gc-host: ${{ secrets.GRAFANA_INTERNAL_HOST }}
gc-org-id: ${{ secrets.GRAFANA_INTERNAL_TENANT_ID }}
4 changes: 2 additions & 2 deletions .goreleaser.devspace.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ builds:
post: ./tools/bin/goreleaser_utils build_post_hook {{ dir .Path }} {{ .Os }} {{ .Arch }}
env:
- CGO_ENABLED=1
- CC=$ZIG_EXEC cc -target x86_64-linux-gnu
- CCX=$ZIG_EXEC c++ -target x86_64-linux-gnu
- CC=$ZIG_EXEC cc -target x86_64-linux-gnu -Wno-error=unused-command-line-argument
- CCX=$ZIG_EXEC c++ -target x86_64-linux-gnu -Wno-error=unused-command-line-argument
flags:
- -trimpath
- -buildmode=pie
Expand Down
5 changes: 5 additions & 0 deletions contracts/.changeset/old-pianos-trade.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@chainlink/contracts": patch
---

#wip Add Capability Registry skeleton
1 change: 1 addition & 0 deletions contracts/scripts/native_solc_compile_all_keystone
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ compileContract () {
"$ROOT"/contracts/src/v0.8/"$1"
}

compileContract keystone/CapabilityRegistry.sol
compileContract keystone/KeystoneForwarder.sol
compileContract keystone/OCR3Capability.sol
41 changes: 41 additions & 0 deletions contracts/src/v0.8/keystone/CapabilityRegistry.sol
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.0;

import {TypeAndVersionInterface} from "../interfaces/TypeAndVersionInterface.sol";
import {OwnerIsCreator} from "../shared/access/OwnerIsCreator.sol";

struct Capability {
// Capability type, e.g. "data-streams-reports"
// bytes32(string); validation regex: ^[a-z0-9_\-:]{1,32}$
// Not "type" because that's a reserved keyword in Solidity.
bytes32 capabilityType;
// Semver, e.g., "1.2.3"
// bytes32(string); must be valid Semver + max 32 characters.
bytes32 version;
}

contract CapabilityRegistry is OwnerIsCreator, TypeAndVersionInterface {
mapping(bytes32 => Capability) private s_capabilities;

event CapabilityAdded(bytes32 indexed capabilityId);

function typeAndVersion() external pure override returns (string memory) {
return "CapabilityRegistry 1.0.0";
}

function addCapability(Capability calldata capability) external onlyOwner {
bytes32 capabilityId = getCapabilityID(capability.capabilityType, capability.version);
s_capabilities[capabilityId] = capability;
emit CapabilityAdded(capabilityId);
}

function getCapability(bytes32 capabilityID) public view returns (Capability memory) {
return s_capabilities[capabilityID];
}

/// @notice This functions returns a Capability ID packed into a bytes32 for cheaper access
/// @return A unique identifier for the capability
function getCapabilityID(bytes32 capabilityType, bytes32 version) public pure returns (bytes32) {
return keccak256(abi.encodePacked(capabilityType, version));
}
}
21 changes: 21 additions & 0 deletions contracts/src/v0.8/keystone/test/CapabilityRegistry.t.sol
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.19;

import {Test} from "forge-std/Test.sol";
import {Capability, CapabilityRegistry} from "../CapabilityRegistry.sol";

contract CapabilityRegistryTest is Test {
function setUp() public virtual {}

function testAddCapability() public {
CapabilityRegistry capabilityRegistry = new CapabilityRegistry();

capabilityRegistry.addCapability(Capability("data-streams-reports", "1.0.0"));

bytes32 capabilityId = capabilityRegistry.getCapabilityID(bytes32("data-streams-reports"), bytes32("1.0.0"));
Capability memory capability = capabilityRegistry.getCapability(capabilityId);

assertEq(capability.capabilityType, "data-streams-reports");
assertEq(capability.version, "1.0.0");
}
}
6 changes: 3 additions & 3 deletions core/capabilities/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ type mockCapability struct {
capabilities.CapabilityInfo
}

func (m *mockCapability) Execute(ctx context.Context, callback chan<- capabilities.CapabilityResponse, req capabilities.CapabilityRequest) error {
return nil
func (m *mockCapability) Execute(ctx context.Context, req capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) {
return nil, nil
}

func (m *mockCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestRegistry_ChecksExecutionAPIByType(t *testing.T) {
{
name: "trigger",
newCapability: func(ctx context.Context, reg *coreCapabilities.Registry) (string, error) {
odt := triggers.NewOnDemand()
odt := triggers.NewOnDemand(logger.TestLogger(t))
info, err := odt.Info(ctx)
require.NoError(t, err)
return info.ID, reg.Add(ctx, odt)
Expand Down
8 changes: 5 additions & 3 deletions core/capabilities/remote/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (c *remoteTargetCaller) UnregisterFromWorkflow(ctx context.Context, request
return errors.New("not implemented")
}

func (c *remoteTargetCaller) Execute(ctx context.Context, callback chan<- commoncap.CapabilityResponse, request commoncap.CapabilityRequest) error {
func (c *remoteTargetCaller) Execute(ctx context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) {
c.lggr.Debugw("not implemented - executing fake remote target capability", "capabilityId", c.capInfo.ID, "nMembers", len(c.donInfo.Members))
for _, peerID := range c.donInfo.Members {
m := &types.MessageBody{
Expand All @@ -60,10 +60,12 @@ func (c *remoteTargetCaller) Execute(ctx context.Context, callback chan<- common
}
err := c.dispatcher.Send(peerID, m)
if err != nil {
return err
return nil, err
}
}
return nil

// TODO: return a channel that will be closed when all responses are received
return nil, nil
}

func (c *remoteTargetCaller) Receive(msg *types.MessageBody) {
Expand Down
6 changes: 4 additions & 2 deletions core/capabilities/remote/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package remote_test
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
Expand All @@ -24,5 +24,7 @@ func TestTarget_Placeholder(t *testing.T) {
dispatcher := remoteMocks.NewDispatcher(t)
dispatcher.On("Send", mock.Anything, mock.Anything).Return(nil)
target := remote.NewRemoteTargetCaller(commoncap.CapabilityInfo{}, donInfo, dispatcher, lggr)
require.NoError(t, target.Execute(ctx, nil, commoncap.CapabilityRequest{}))

_, err := target.Execute(ctx, commoncap.CapabilityRequest{})
assert.NoError(t, err)
}
8 changes: 3 additions & 5 deletions core/capabilities/remote/trigger_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type registrationKey struct {
}

type pubRegState struct {
callback chan<- commoncap.CapabilityResponse
callback <-chan commoncap.CapabilityResponse
request commoncap.CapabilityRequest
}

Expand Down Expand Up @@ -112,9 +112,8 @@ func (p *triggerPublisher) Receive(msg *types.MessageBody) {
p.lggr.Errorw("failed to unmarshal request", "capabilityId", p.capInfo.ID, "err", err)
return
}
callbackCh := make(chan commoncap.CapabilityResponse)
ctx, cancel := p.stopCh.NewCtx()
err = p.underlying.RegisterTrigger(ctx, callbackCh, unmarshaled)
callbackCh, err := p.underlying.RegisterTrigger(ctx, unmarshaled)
cancel()
if err == nil {
p.registrations[key] = &pubRegState{
Expand Down Expand Up @@ -153,7 +152,6 @@ func (p *triggerPublisher) registrationCleanupLoop() {
cancel()
p.lggr.Infow("unregistered trigger", "capabilityId", p.capInfo.ID, "callerDonID", key.callerDonId, "workflowId", key.workflowId, "err", err)
// after calling UnregisterTrigger, the underlying trigger will not send any more events to the channel
close(req.callback)
delete(p.registrations, key)
p.messageCache.Delete(key)
}
Expand All @@ -163,7 +161,7 @@ func (p *triggerPublisher) registrationCleanupLoop() {
}
}

func (p *triggerPublisher) triggerEventLoop(callbackCh chan commoncap.CapabilityResponse, key registrationKey) {
func (p *triggerPublisher) triggerEventLoop(callbackCh <-chan commoncap.CapabilityResponse, key registrationKey) {
defer p.wg.Done()
for {
select {
Expand Down
4 changes: 2 additions & 2 deletions core/capabilities/remote/trigger_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ func (t *testTrigger) Info(_ context.Context) (commoncap.CapabilityInfo, error)
return t.info, nil
}

func (t *testTrigger) RegisterTrigger(_ context.Context, _ chan<- commoncap.CapabilityResponse, request commoncap.CapabilityRequest) error {
func (t *testTrigger) RegisterTrigger(_ context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) {
t.registrationsCh <- request
return nil
return nil, nil
}

func (t *testTrigger) UnregisterTrigger(_ context.Context, request commoncap.CapabilityRequest) error {
Expand Down
19 changes: 14 additions & 5 deletions core/capabilities/remote/trigger_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ var _ commoncap.TriggerCapability = &triggerSubscriber{}
var _ types.Receiver = &triggerSubscriber{}
var _ services.Service = &triggerSubscriber{}

func NewTriggerSubscriber(config types.RemoteTriggerConfig, capInfo commoncap.CapabilityInfo, capDonInfo types.DON, localDonInfo types.DON, dispatcher types.Dispatcher, aggregator types.Aggregator, lggr logger.Logger) *triggerSubscriber {
// TODO makes this configurable with a default
const defaultSendChannelBufferSize = 1000

func NewTriggerSubscriber(config types.RemoteTriggerConfig, capInfo commoncap.CapabilityInfo, capDonInfo types.DON, localDonInfo types.DON,
dispatcher types.Dispatcher, aggregator types.Aggregator, lggr logger.Logger) *triggerSubscriber {
if aggregator == nil {
lggr.Warnw("no aggregator provided, using default MODE aggregator", "capabilityId", capInfo.ID)
aggregator = NewDefaultModeAggregator(uint32(capDonInfo.F + 1))
Expand Down Expand Up @@ -88,22 +92,25 @@ func (s *triggerSubscriber) Info(ctx context.Context) (commoncap.CapabilityInfo,
return s.capInfo, nil
}

func (s *triggerSubscriber) RegisterTrigger(ctx context.Context, callback chan<- commoncap.CapabilityResponse, request commoncap.CapabilityRequest) error {
func (s *triggerSubscriber) RegisterTrigger(ctx context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) {
rawRequest, err := pb.MarshalCapabilityRequest(request)
if err != nil {
return err
return nil, err
}
if request.Metadata.WorkflowID == "" {
return errors.New("empty workflowID")
return nil, errors.New("empty workflowID")
}
s.mu.Lock()
defer s.mu.Unlock()

callback := make(chan commoncap.CapabilityResponse, defaultSendChannelBufferSize)
s.registeredWorkflows[request.Metadata.WorkflowID] = &subRegState{
callback: callback,
rawRequest: rawRequest,
}

s.lggr.Infow("RegisterTrigger called", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "workflowID", request.Metadata.WorkflowID)
return nil
return callback, nil
}

func (s *triggerSubscriber) registrationLoop() {
Expand Down Expand Up @@ -141,6 +148,8 @@ func (s *triggerSubscriber) registrationLoop() {
func (s *triggerSubscriber) UnregisterTrigger(ctx context.Context, request commoncap.CapabilityRequest) error {
s.mu.Lock()
defer s.mu.Unlock()

close(s.registeredWorkflows[request.Metadata.WorkflowID].callback)
delete(s.registeredWorkflows, request.Metadata.WorkflowID)
// Registrations will quickly expire on all remote nodes.
// Alternatively, we could send UnregisterTrigger messages right away.
Expand Down
7 changes: 4 additions & 3 deletions core/capabilities/remote/trigger_subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,13 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
}
subscriber := remote.NewTriggerSubscriber(config, capInfo, capDonInfo, workflowDonInfo, dispatcher, nil, lggr)
require.NoError(t, subscriber.Start(ctx))
triggerEventCallbackCh := make(chan commoncap.CapabilityResponse, 2)
require.NoError(t, subscriber.RegisterTrigger(ctx, triggerEventCallbackCh, commoncap.CapabilityRequest{

triggerEventCallbackCh, err := subscriber.RegisterTrigger(ctx, commoncap.CapabilityRequest{
Metadata: commoncap.RequestMetadata{
WorkflowID: workflowID1,
},
}))
})
require.NoError(t, err)
<-awaitRegistrationMessageCh

// receive trigger event
Expand Down
Loading

0 comments on commit eb17edf

Please sign in to comment.