Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for remote ES output #3051

Merged
merged 99 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from 87 commits
Commits
Show all changes
99 commits
Select commit Hold shift + click to select a range
44b6985
first draft of remote output
juliaElastic Oct 24, 2023
f870e55
remove commented code
juliaElastic Oct 24, 2023
4605e04
storing remote es clients in bulker
juliaElastic Oct 26, 2023
b6b47af
remote service_token from policy sent to agent
juliaElastic Oct 26, 2023
d82a117
fix lint
juliaElastic Oct 26, 2023
0ceb201
rename remoteEsList and type
juliaElastic Oct 26, 2023
c63fb1e
added missing check
juliaElastic Oct 26, 2023
6795a82
Merge branch 'main' into remote-es
juliaElastic Oct 30, 2023
6cd8491
fixes after merge main
juliaElastic Oct 30, 2023
4b72d71
fix lint
juliaElastic Oct 30, 2023
1d6c31f
added changelog
juliaElastic Oct 30, 2023
53de755
create bulker for remote es
juliaElastic Oct 30, 2023
5bbc1f1
cleanup unused remoteEsClients and functions
juliaElastic Oct 30, 2023
ab7487d
fix linter
juliaElastic Oct 30, 2023
e901a80
renamed to remote_elasticsearch
juliaElastic Oct 30, 2023
2465b2a
added tracer, removed opts
juliaElastic Nov 1, 2023
1fd90a8
enable intrumentation on remote es client
juliaElastic Nov 1, 2023
8db4da2
Merge branch 'main' into remote-es
juliaElastic Nov 1, 2023
7953456
updated changelog
juliaElastic Nov 1, 2023
9452494
Merge branch 'main' into remote-es
juliaElastic Nov 2, 2023
11bc90c
try copy policyData
juliaElastic Nov 2, 2023
8312727
revert policyData, printing out value of policy
juliaElastic Nov 2, 2023
8cae7a4
setting policyData in other integration tests
juliaElastic Nov 2, 2023
bff9015
added missing param
juliaElastic Nov 2, 2023
a52b32b
watching for remote output cfg changes and restart
juliaElastic Nov 2, 2023
41f4ced
fixes
juliaElastic Nov 2, 2023
649bfdb
fix linter
juliaElastic Nov 2, 2023
f6994bd
fix
juliaElastic Nov 2, 2023
84787bd
copy map instead of reference
juliaElastic Nov 3, 2023
d36f83a
added integration test
juliaElastic Nov 3, 2023
255c28b
unit test on CheckRemoteOutputChanged
juliaElastic Nov 3, 2023
108fe47
fix lint
juliaElastic Nov 3, 2023
fd7e406
fix lint
juliaElastic Nov 3, 2023
4f99d62
added tests on policy_output remote ES
juliaElastic Nov 3, 2023
cc6a02e
added test on parsed_policy change
juliaElastic Nov 3, 2023
6dc08cd
fix lint
juliaElastic Nov 6, 2023
9a0e8b1
invalidate api key if remote output is removed
juliaElastic Nov 7, 2023
830b401
fix lint
juliaElastic Nov 7, 2023
fa4698a
Merge branch 'main' into remote-es
juliaElastic Nov 8, 2023
62a49c6
fix after conflicts
juliaElastic Nov 8, 2023
0f1ed68
reporting remote es error in fleet-server state
juliaElastic Nov 8, 2023
1d865d8
ignore lint error
juliaElastic Nov 8, 2023
72a7249
fix test
juliaElastic Nov 9, 2023
6ee9086
fix test
juliaElastic Nov 9, 2023
374b1ae
fix test
juliaElastic Nov 9, 2023
dad52b0
fixed test
juliaElastic Nov 9, 2023
8d4dbbc
start new bulker if remote config changed
juliaElastic Nov 10, 2023
de6724a
fix tests
juliaElastic Nov 10, 2023
e465d3d
added test for retire remote api key
juliaElastic Nov 10, 2023
bf6c9e3
monitor tests
juliaElastic Nov 10, 2023
2cc1e7d
Merge branch 'main' into remote-es
juliaElastic Nov 13, 2023
608ba26
remote es ping in standalone self monitor
juliaElastic Nov 14, 2023
699ac62
added break
juliaElastic Nov 14, 2023
81899b3
fix lint
juliaElastic Nov 14, 2023
14f9507
fix issue
juliaElastic Nov 14, 2023
963b87a
license header
juliaElastic Nov 14, 2023
180cae8
fix lint
juliaElastic Nov 14, 2023
3989856
cleanup, more test
juliaElastic Nov 15, 2023
6b727b8
added test on engine
juliaElastic Nov 15, 2023
60a3122
use output bulker when read update api key
juliaElastic Nov 15, 2023
8797823
added remote es ping to self.go
juliaElastic Nov 15, 2023
1be0575
review comments
juliaElastic Nov 16, 2023
dcf39f9
invalidate all ids with corresponding output
juliaElastic Nov 16, 2023
bb32a41
only add toRetireAPIKeys if does not exist
juliaElastic Nov 16, 2023
3261472
added a retry loop to integration test
juliaElastic Nov 16, 2023
991c689
fixed monitor tests
juliaElastic Nov 16, 2023
75fdb8f
remove break loop in self monitor
juliaElastic Nov 20, 2023
a8447b2
fix lint
juliaElastic Nov 20, 2023
9c7d6fe
openapi spec: added degraded state to desc
juliaElastic Nov 20, 2023
8df35d1
added 2nd elasticsearch to integration test
juliaElastic Nov 20, 2023
3b2dc11
Merge branch 'main' into remote-es
juliaElastic Nov 20, 2023
10021a8
added build.Info to remote bulkers from fleet
juliaElastic Nov 20, 2023
4c99949
added semaphore for updating bulkerMap
juliaElastic Nov 21, 2023
8182bab
revert self monitor degraded on remote es error
juliaElastic Nov 21, 2023
37106b0
fix tests
juliaElastic Nov 21, 2023
64277ff
verify api key exists in remote es
juliaElastic Nov 21, 2023
ea1d3bd
fix lint
juliaElastic Nov 21, 2023
e15f995
remote remoteOutputErrorMap as not used
juliaElastic Nov 21, 2023
6d4f87e
fix lint
juliaElastic Nov 21, 2023
85142e2
revert openapi description about degraded state
juliaElastic Nov 22, 2023
727daa4
reading output from policies index if bulker not found
juliaElastic Nov 22, 2023
462c063
unit test on handleAck
juliaElastic Nov 22, 2023
96bdb42
added test to query policies
juliaElastic Nov 22, 2023
053fbed
fix test
juliaElastic Nov 23, 2023
60cd6ee
fix integration test
juliaElastic Nov 23, 2023
760ea2f
added integration test for invalidate api key
juliaElastic Nov 23, 2023
9382975
test for child bulker cancel
juliaElastic Nov 23, 2023
0006533
fixed test, replace semaphore with mutex
juliaElastic Nov 27, 2023
41219f0
removed unused arg
juliaElastic Nov 27, 2023
75c890b
added warning log if api keys orphaned
juliaElastic Nov 27, 2023
4d2d67d
removed unused error
juliaElastic Nov 27, 2023
cd877f1
fix lint
juliaElastic Nov 27, 2023
aa7bf11
Merge branch 'main' into remote-es
juliaElastic Nov 27, 2023
d1ffe3f
try to fix test
juliaElastic Nov 27, 2023
5885068
read output secret before prepare remote es
juliaElastic Nov 27, 2023
4d5b906
try to fix test
juliaElastic Nov 27, 2023
d39141a
removed test
juliaElastic Nov 28, 2023
299cdae
removed unused imports
juliaElastic Nov 28, 2023
6fdba62
Merge branch 'main' into remote-es
juliaElastic Nov 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,13 @@ export $(shell sed 's/=.*//' ./dev-tools/integration/.env)
# Start ES with docker without waiting
.PHONY: int-docker-start-async
int-docker-start-async:
@docker compose -f ./dev-tools/integration/docker-compose.yml --env-file ./dev-tools/integration/.env up -d --remove-orphans elasticsearch
@docker compose -f ./dev-tools/integration/docker-compose.yml --env-file ./dev-tools/integration/.env up -d --remove-orphans elasticsearch elasticsearch-remote

# Wait for ES to be ready
.PHONY: int-docker-wait
int-docker-wait:
@./dev-tools/integration/wait-for-elasticsearch.sh ${ELASTICSEARCH_USERNAME}:${ELASTICSEARCH_PASSWORD}@${TEST_ELASTICSEARCH_HOSTS}
@./dev-tools/integration/wait-for-elasticsearch.sh ${ELASTICSEARCH_USERNAME}:${ELASTICSEARCH_PASSWORD}@${TEST_REMOTE_ELASTICSEARCH_HOST}

# Start integration docker setup with wait for when the ES is ready
.PHONY: int-docker-start
Expand Down Expand Up @@ -331,7 +332,8 @@ test-int: prepare-test-context ## - Run integration tests with full setup (slow
.PHONY: test-int-set
test-int-set: ## - Run integration tests without setup
# Initialize indices one before running all the tests
ELASTICSEARCH_SERVICE_TOKEN=$(shell ./dev-tools/integration/get-elasticsearch-servicetoken.sh ${ELASTICSEARCH_USERNAME}:${ELASTICSEARCH_PASSWORD}@${TEST_ELASTICSEARCH_HOSTS}) \
ELASTICSEARCH_SERVICE_TOKEN=$(shell ./dev-tools/integration/get-elasticsearch-servicetoken.sh ${ELASTICSEARCH_USERNAME}:${ELASTICSEARCH_PASSWORD}@${TEST_ELASTICSEARCH_HOSTS} "fleet-server") \
REMOTE_ELASTICSEARCH_SERVICE_TOKEN=$(shell ./dev-tools/integration/get-elasticsearch-servicetoken.sh ${ELASTICSEARCH_USERNAME}:${ELASTICSEARCH_PASSWORD}@${TEST_REMOTE_ELASTICSEARCH_HOST} "fleet-server-remote") \
ELASTICSEARCH_HOSTS=${TEST_ELASTICSEARCH_HOSTS} ELASTICSEARCH_USERNAME=${ELASTICSEARCH_USERNAME} ELASTICSEARCH_PASSWORD=${ELASTICSEARCH_PASSWORD} \
go test -v -tags=integration -count=1 -race -p 1 ./...

Expand Down Expand Up @@ -372,7 +374,7 @@ test-e2e: docker-cover-e2e-binaries build-e2e-agent-image e2e-certs build-docker
.PHONY: test-e2e-set
test-e2e-set: ## - Run the blackbox end to end tests without setup.
cd testing; \
ELASTICSEARCH_SERVICE_TOKEN=$(shell ./dev-tools/integration/get-elasticsearch-servicetoken.sh ${ELASTICSEARCH_USERNAME}:${ELASTICSEARCH_PASSWORD}@${TEST_ELASTICSEARCH_HOSTS}) \
ELASTICSEARCH_SERVICE_TOKEN=$(shell ./dev-tools/integration/get-elasticsearch-servicetoken.sh ${ELASTICSEARCH_USERNAME}:${ELASTICSEARCH_PASSWORD}@${TEST_ELASTICSEARCH_HOSTS} "fleet-server") \
ELASTICSEARCH_HOSTS=${TEST_ELASTICSEARCH_HOSTS} ELASTICSEARCH_USERNAME=${ELASTICSEARCH_USERNAME} ELASTICSEARCH_PASSWORD=${ELASTICSEARCH_PASSWORD} \
AGENT_E2E_IMAGE=$(shell cat "build/e2e-image") \
STANDALONE_E2E_IMAGE=$(DOCKER_IMAGE):$(DOCKER_IMAGE_TAG)$(if $(DEV),-dev,) \
Expand Down
32 changes: 32 additions & 0 deletions changelog/fragments/1698660041-remote-es-output.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: Support for remote elasticsearch output

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component:

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: 3051

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
#issue: https://github.com/owner/repo/1234
3 changes: 2 additions & 1 deletion dev-tools/integration/.env
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
ELASTICSEARCH_VERSION=8.12.0-c18d0d89-SNAPSHOT
ELASTICSEARCH_VERSION=8.12.0-7521d760-SNAPSHOT
ELASTICSEARCH_USERNAME=elastic
ELASTICSEARCH_PASSWORD=changeme
TEST_ELASTICSEARCH_HOSTS=localhost:9200
TEST_REMOTE_ELASTICSEARCH_HOST=localhost:9201
24 changes: 24 additions & 0 deletions dev-tools/integration/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,27 @@ services:
- ./elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
ports:
- 127.0.0.1:9200:9200

elasticsearch-remote:
image: "docker.elastic.co/elasticsearch/elasticsearch:${ELASTICSEARCH_VERSION}-amd64"
container_name: elasticsearch-remote
environment:
- node.name=es02
- cluster.name=es-docker-cluster2
- discovery.seed_hosts=elasticsearch
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms1G -Xmx1G"
- "ELASTIC_USERNAME=${ELASTICSEARCH_USERNAME}"
- "ELASTIC_PASSWORD=${ELASTICSEARCH_PASSWORD}"
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
volumes:
- ./elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
ports:
- 127.0.0.1:9201:9200

7 changes: 4 additions & 3 deletions dev-tools/integration/get-elasticsearch-servicetoken.sh
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#!/bin/bash

host="$1"
account="$2"

jsonBody="$(curl -sSL -XPOST "$host/_security/service/elastic/fleet-server/credential/token/token1")"
jsonBody="$(curl -sSL -XPOST "$host/_security/service/elastic/$account/credential/token/token1")"

# use grep and sed to get the service token value as we may not have jq or a similar tool on the instance
token=$(echo ${jsonBody} | grep -Eo '"value"[^}]*' | grep -Eo ':.*' | sed -r "s/://" | sed -r 's/"//g')
Expand All @@ -11,9 +12,9 @@ token=$(echo ${jsonBody} | grep -Eo '"value"[^}]*' | grep -Eo ':.*' | sed -r "s
# very useful during development, without recreating elasticsearch instance every time.
if [ -z "$token" ]
then
token=`cat .service_token`
token=`cat .service_token_$account`
else
echo "$token" > .service_token
echo "$token" > .service_token_$account
fi

echo $token
57 changes: 49 additions & 8 deletions internal/pkg/api/handleAck.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,15 +416,15 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag
return nil
}

for _, output := range agent.Outputs {
for outputName, output := range agent.Outputs {
if output.Type != policy.OutputTypeElasticsearch {
continue
}

err := ack.updateAPIKey(ctx,
zlog,
agent.Id,
output.APIKeyID, output.PermissionsHash, output.ToRetireAPIKeyIds)
output.APIKeyID, output.PermissionsHash, output.ToRetireAPIKeyIds, outputName)
if err != nil {
return err
}
Expand All @@ -445,20 +445,31 @@ func (ack *AckT) updateAPIKey(ctx context.Context,
zlog zerolog.Logger,
agentID string,
apiKeyID, permissionHash string,
toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems) error {
toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems, outputName string) error {
bulk := ack.bulk
// use output bulker if exists
if outputName != "" {
outputBulk := ack.bulk.GetBulker(outputName)
if outputBulk != nil {
zlog.Debug().Str("outputName", outputName).Msg("Using output bulker in updateAPIKey")
bulk = outputBulk
}
}
if apiKeyID != "" {
res, err := ack.bulk.APIKeyRead(ctx, apiKeyID, true)
res, err := bulk.APIKeyRead(ctx, apiKeyID, true)
if err != nil {
if isAgentActive(ctx, zlog, ack.bulk, agentID) {
zlog.Error().
Err(err).
Str(LogAPIKeyID, apiKeyID).
Str("outputName", outputName).
Msg("Failed to read API Key roles")
} else {
// race when API key was invalidated before acking
zlog.Info().
Err(err).
Str(LogAPIKeyID, apiKeyID).
Str("outputName", outputName).
Msg("Failed to read invalidated API Key roles")

// prevents future checks
Expand All @@ -473,14 +484,15 @@ func (ack *AckT) updateAPIKey(ctx context.Context,
Str(LogAPIKeyID, apiKeyID).
Msg("Failed to cleanup roles")
} else if removedRolesCount > 0 {
if err := ack.bulk.APIKeyUpdate(ctx, apiKeyID, permissionHash, clean); err != nil {
zlog.Error().Err(err).RawJSON("roles", clean).Str(LogAPIKeyID, apiKeyID).Msg("Failed to update API Key")
if err := bulk.APIKeyUpdate(ctx, apiKeyID, permissionHash, clean); err != nil {
zlog.Error().Err(err).RawJSON("roles", clean).Str(LogAPIKeyID, apiKeyID).Str("outputName", outputName).Msg("Failed to update API Key")
} else {
zlog.Debug().
Str("hash.sha256", permissionHash).
Str(LogAPIKeyID, apiKeyID).
RawJSON("roles", clean).
Int("removedRoles", removedRolesCount).
Str("outputName", outputName).
Msg("Updating agent record to pick up reduced roles.")
}
}
Expand Down Expand Up @@ -556,19 +568,48 @@ func cleanRoles(roles json.RawMessage) (json.RawMessage, int, error) {

func (ack *AckT) invalidateAPIKeys(ctx context.Context, zlog zerolog.Logger, toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems, skip string) {
ids := make([]string, 0, len(toRetireAPIKeyIDs))
remoteIds := make(map[string][]string)
for _, k := range toRetireAPIKeyIDs {
if k.ID == skip || k.ID == "" {
continue
}
ids = append(ids, k.ID)
if k.Output != "" {
if remoteIds[k.Output] == nil {
remoteIds[k.Output] = make([]string, 0)
}
remoteIds[k.Output] = append(remoteIds[k.Output], k.ID)
} else {
ids = append(ids, k.ID)
}
}

if len(ids) > 0 {
zlog.Info().Strs("fleet.policy.apiKeyIDsToRetire", ids).Msg("Invalidate old API keys")
if err := ack.bulk.APIKeyInvalidate(ctx, ids...); err != nil {
zlog.Info().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys")
}
}
// using remote es bulker to invalidate api key
for outputName, outputIds := range remoteIds {
outputBulk := ack.bulk.GetBulker(outputName)

if outputBulk == nil {
// read output config from .fleet-policies, not filtering by policy id as agent could be reassigned
policy, err := dl.QueryOutputFromPolicy(ctx, ack.bulk, outputName)
if err != nil || policy == nil {
zlog.Debug().Str("outputName", outputName).Msg("Output policy not found")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can't find the policy associated with an output and need to invalidate the API key, that means it's an orphaned key, right?
Should we emit a WARN log about the key for the agent being orphaned?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to warning log

} else {
outputBulk, _, err = ack.bulk.CreateAndGetBulker(ctx, zlog, outputName, policy.Data.Outputs)
if err != nil {
zlog.Debug().Str("outputName", outputName).Msg("Failed to recreate output bulker")
juliaElastic marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
if outputBulk != nil {
if err := outputBulk.APIKeyInvalidate(ctx, outputIds...); err != nil {
zlog.Info().Err(err).Strs("ids", outputIds).Str("outputName", outputName).Msg("Failed to invalidate API keys")
}
}
}
}

func (ack *AckT) handleUnenroll(ctx context.Context, zlog zerolog.Logger, agent *model.Agent) error {
Expand Down
68 changes: 68 additions & 0 deletions internal/pkg/api/handleAck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,74 @@ func TestInvalidateAPIKeys(t *testing.T) {
}
}

func TestInvalidateAPIKeysRemoteOutput(t *testing.T) {
toRetire := []model.ToRetireAPIKeyIdsItems{{
ID: "toRetire1",
Output: "remote1",
}, {
ID: "toRetire11",
Output: "remote1",
}, {
ID: "toRetire2",
Output: "remote2",
}}

bulker := ftesting.NewMockBulk()
remoteBulker := ftesting.NewMockBulk()
remoteBulker2 := ftesting.NewMockBulk()
bulker.On("GetBulker", "remote1").Return(remoteBulker)
bulker.On("GetBulker", "remote2").Return(remoteBulker2)

remoteBulker.On("APIKeyInvalidate",
context.Background(), []string{"toRetire1", "toRetire11"}).
Return(nil)
remoteBulker2.On("APIKeyInvalidate",
context.Background(), []string{"toRetire2"}).
Return(nil)

logger := testlog.SetLogger(t)
ack := &AckT{bulk: bulker}
ack.invalidateAPIKeys(context.Background(), logger, toRetire, "")

bulker.AssertExpectations(t)
remoteBulker.AssertExpectations(t)
juliaElastic marked this conversation as resolved.
Show resolved Hide resolved
remoteBulker2.AssertExpectations(t)
}

func TestInvalidateAPIKeysRemoteOutputReadFromPolicies(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍
great test cases

toRetire := []model.ToRetireAPIKeyIdsItems{{
ID: "toRetire1",
Output: "remote1",
}}

remoteBulker := ftesting.NewMockBulk()
remoteBulker.On("APIKeyInvalidate",
context.Background(), []string{"toRetire1"}).
Return(nil)

bulkerFn := func(t *testing.T) *ftesting.MockBulk {
m := ftesting.NewMockBulk()
m.On("Search", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&es.ResultT{HitsT: es.HitsT{
Hits: []es.HitT{{
Source: []byte(`{"data":{"outputs":{"remote1":{}}}}`),
}},
}}, nil).Once()

m.On("CreateAndGetBulker", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(remoteBulker, false, nil)
m.On("GetBulker", "remote1").Return(nil)
return m
}

bulker := bulkerFn(t)

logger := testlog.SetLogger(t)
ack := &AckT{bulk: bulker}
ack.invalidateAPIKeys(context.Background(), logger, toRetire, "")

bulker.AssertExpectations(t)
remoteBulker.AssertExpectations(t)
}

func TestAckHandleUpgrade(t *testing.T) {
tests := []struct {
name string
Expand Down
Loading
Loading