Skip to content

Commit

Permalink
backend: (WIP) add CreateACL endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
weeco committed Nov 12, 2023
1 parent 0d32871 commit 68ad2c5
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 26 deletions.
35 changes: 35 additions & 0 deletions backend/pkg/api/connect/errors/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2023 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package errors

import (
"errors"

"connectrpc.com/connect"
"github.com/twmb/franz-go/pkg/kerr"

v1alpha1 "github.com/redpanda-data/console/backend/pkg/protogen/redpanda/api/dataplane/v1alpha1"
)

// NewConnectErrorFromKafkaErrorCode creates a new connect.Error for a given Kafka error code.
// Kafka error codes are described in the franz-go kerr package.
func NewConnectErrorFromKafkaErrorCode(code int16, msg *string) *connect.Error {
kafkaErr := kerr.ErrorForCode(code)

errMsg := kafkaErr.Error()
if msg != nil {
errMsg = *msg
}
return NewConnectError(
connect.CodeInternal,
errors.New(errMsg),
NewErrorInfo(v1alpha1.Reason_REASON_KAFKA_API_ERROR.String(), KeyValsFromKafkaError(kafkaErr)...),
)
}
36 changes: 36 additions & 0 deletions backend/pkg/api/connect/service/acl/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,42 @@ import (

type kafkaClientMapper struct{}

// aclCreateRequestToKafka maps the proto request to create a single ACL into a kmsg.CreateACLsRequest.
func (k *kafkaClientMapper) aclCreateRequestToKafka(req *v1alpha1.CreateACLRequest) (*kmsg.CreateACLsRequest, error) {
resourceType, err := k.aclResourceTypeToKafka(req.ResourceType)
if err != nil {
return nil, err
}

operation, err := k.aclOperationToKafka(req.Operation)
if err != nil {
return nil, err
}

permissionType, err := k.aclPermissionTypeToKafka(req.PermissionType)
if err != nil {
return nil, err
}

resourcePatternType, err := k.aclResourcePatternTypeToKafka(req.ResourcePatternType)
if err != nil {
return nil, err
}

creation := kmsg.NewCreateACLsRequestCreation()
creation.Host = req.Host
creation.Principal = req.Principal
creation.Operation = operation
creation.ResourceType = resourceType
creation.PermissionType = permissionType
creation.ResourcePatternType = resourcePatternType

kafkaReq := kmsg.NewCreateACLsRequest()
kafkaReq.Creations = []kmsg.CreateACLsRequestCreation{creation}

return &kafkaReq, nil
}

// aclFilterToKafka translates a proto ACL input into the kmsg.DescribeACLsRequest that is
// needed by the Kafka client to retrieve the list of applied ACLs.
// The parameter defaultToAny determines whether unspecified enum values for
Expand Down
57 changes: 40 additions & 17 deletions backend/pkg/api/connect/service/acl/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package acl
import (
"context"
"errors"
"strconv"

"connectrpc.com/connect"
"github.com/twmb/franz-go/pkg/kerr"
Expand Down Expand Up @@ -88,17 +89,7 @@ func (s *Service) ListACLs(ctx context.Context, req *connect.Request[v1alpha1.Li
// Handle Kafka error that may be set as part of the Kafka response
kafkaRes := aclOverview.KafkaResponse
if kafkaRes.ErrorCode != 0 {
kafkaErr := kerr.ErrorForCode(kafkaRes.ErrorCode)

errMsg := kafkaErr.Error()
if kafkaRes.ErrorMessage != nil {
errMsg = *kafkaRes.ErrorMessage
}
return nil, apierrors.NewConnectError(
connect.CodeInternal,
errors.New(errMsg),
apierrors.NewErrorInfo(v1alpha1.Reason_REASON_KAFKA_API_ERROR.String(), apierrors.KeyValsFromKafkaError(kafkaErr)...),
)
return nil, apierrors.NewConnectErrorFromKafkaErrorCode(kafkaRes.ErrorCode, kafkaRes.ErrorMessage)
}

resources := make([]*v1alpha1.ListACLsResponse_Resource, len(kafkaRes.Resources))
Expand All @@ -118,12 +109,44 @@ func (s *Service) ListACLs(ctx context.Context, req *connect.Request[v1alpha1.Li
}

// CreateACL implements the handler for the create ACL endpoint.
func (*Service) CreateACL(context.Context, *connect.Request[v1alpha1.CreateACLRequest]) (*connect.Response[v1alpha1.CreateACLResponse], error) {
return nil, apierrors.NewConnectError(
connect.CodeUnimplemented,
errors.New("endpoint is not implemented"),
apierrors.NewErrorInfo(v1alpha1.Reason_REASON_CONSOLE_ERROR.String()),
)
func (s *Service) CreateACL(ctx context.Context, req *connect.Request[v1alpha1.CreateACLRequest]) (*connect.Response[v1alpha1.CreateACLResponse], error) {
kafkaReq, err := s.kafkaClientMapper.aclCreateRequestToKafka(req.Msg)
if err != nil {
return nil, apierrors.NewConnectError(
connect.CodeInternal, // Internal because all input should already be validated, and thus no err possible
err,
apierrors.NewErrorInfo(v1alpha1.Reason_REASON_CONSOLE_ERROR.String()),
)
}

res, err := s.consoleSvc.CreateACLs(ctx, *kafkaReq)
if err != nil {
return nil, apierrors.NewConnectError(
connect.CodeInternal,
err,
apierrors.NewErrorInfo(v1alpha1.Reason_REASON_KAFKA_API_ERROR.String(), apierrors.KeyValsFromKafkaError(err)...),
)
}

if len(res.Results) != 1 {
// Should never happen since we only create one ACL, but if it happens we want to err early.
return nil, apierrors.NewConnectError(
connect.CodeInternal,
errors.New("unexpected number of results in create ACL response"),
apierrors.NewErrorInfo(v1alpha1.Reason_REASON_CONSOLE_ERROR.String(), apierrors.KeyVal{
Key: "retrieved_results",
Value: strconv.Itoa(len(res.Results)),
}),
)
}

// Check for inner Kafka error
result := res.Results[0]
if result.ErrorCode != 0 {
return nil, apierrors.NewConnectErrorFromKafkaErrorCode(result.ErrorCode, result.ErrorMessage)
}

return connect.NewResponse(&v1alpha1.CreateACLResponse{}), nil
}

// DeleteACLs implements the handler for the delete ACL endpoint.
Expand Down
10 changes: 9 additions & 1 deletion backend/pkg/console/create_acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import (

// CreateACL creates an ACL resource in your target Kafka cluster.
func (s *Service) CreateACL(ctx context.Context, createReq kmsg.CreateACLsRequestCreation) *rest.Error {
res, err := s.kafkaSvc.CreateACLs(ctx, []kmsg.CreateACLsRequestCreation{createReq})
req := kmsg.NewCreateACLsRequest()
req.Creations = []kmsg.CreateACLsRequestCreation{createReq}

res, err := s.kafkaSvc.CreateACLs(ctx, req)
if err != nil {
return &rest.Error{
Err: err,
Expand Down Expand Up @@ -58,3 +61,8 @@ func (s *Service) CreateACL(ctx context.Context, createReq kmsg.CreateACLsReques

return nil
}

// CreateACLs proxies the request/response to CreateACLs via the Kafka API.
func (s *Service) CreateACLs(ctx context.Context, req kmsg.CreateACLsRequest) (*kmsg.CreateACLsResponse, error) {
return s.kafkaSvc.CreateACLs(ctx, req)
}
11 changes: 11 additions & 0 deletions backend/pkg/console/servicer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
)

// Servicer is an interface for the Console package that offers all methods to serve the responses for the API layer.
// It may also be used to virtualize Console to serve many virtual clusters with a single Console deployment.
type Servicer interface {
GetAPIVersions(ctx context.Context) ([]APIVersion, error)
GetAllBrokerConfigs(ctx context.Context) (map[int32]BrokerConfig, error)
Expand Down Expand Up @@ -65,4 +66,14 @@ type Servicer interface {
CreateSchemaRegistrySchema(ctx context.Context, subjectName string, schema schema.Schema) (*CreateSchemaResponse, error)
ValidateSchemaRegistrySchema(ctx context.Context, subjectName string, version string, schema schema.Schema) *SchemaRegistrySchemaValidation
GetSchemaUsagesByID(ctx context.Context, schemaID int) ([]SchemaVersion, error)

// ------------------------------------------------------------------
// Plain Kafka requests, used by Connect API.
// The Console service was supposed to be a translation layer between the API (REST)
// and the Kafka package, but it's also used for virtualizing Console. Thus, even
// plain Kafka requests need to go through this package.
// ------------------------------------------------------------------

// CreateACLs proxies the request/response to CreateACLs via the Kafka API.
CreateACLs(ctx context.Context, createReq kmsg.CreateACLsRequest) (*kmsg.CreateACLsResponse, error)
}
5 changes: 1 addition & 4 deletions backend/pkg/kafka/create_acls.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ import (
)

// CreateACLs creates one or more ACL entries.
func (s *Service) CreateACLs(ctx context.Context, createACLReqs []kmsg.CreateACLsRequestCreation) (*kmsg.CreateACLsResponse, error) {
req := kmsg.NewCreateACLsRequest()
req.Creations = createACLReqs

func (s *Service) CreateACLs(ctx context.Context, req kmsg.CreateACLsRequest) (*kmsg.CreateACLsResponse, error) {
res, err := req.RequestWith(ctx, s.KafkaClient)
if err != nil {
return nil, fmt.Errorf("acl create request has failed: %w", err)
Expand Down
21 changes: 17 additions & 4 deletions proto/redpanda/api/dataplane/v1alpha1/acl.proto
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,26 @@ message ListACLsResponse {
}

message CreateACLRequest {
ACL.ResourceType resource_type = 1;
ACL.ResourceType resource_type = 1 [
(buf.validate.field).enum.defined_only = true,
(buf.validate.field).required = true
];

string resource_name = 2;
ACL.ResourcePatternType resource_pattern_type = 3;
ACL.ResourcePatternType resource_pattern_type = 3 [
(buf.validate.field).enum.defined_only = true,
(buf.validate.field).required = true
];
string principal = 4;
string host = 5;
ACL.Operation operation = 6;
ACL.PermissionType permission_type = 7;
ACL.Operation operation = 6 [
(buf.validate.field).enum.defined_only = true,
(buf.validate.field).required = true
];
ACL.PermissionType permission_type = 7 [
(buf.validate.field).enum.defined_only = true,
(buf.validate.field).required = true
];
}

message CreateACLResponse {}
Expand Down

0 comments on commit 68ad2c5

Please sign in to comment.