Skip to content

Commit

Permalink
feature: wip on connector status improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
bojand committed Apr 18, 2023
1 parent 5e4e573 commit 31a88f2
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 13 deletions.
12 changes: 5 additions & 7 deletions backend/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ github.com/cloudflare/circl v1.3.2 h1:VWp8dY3yH69fdM7lM6A1+NhhVoDu9vqK0jOgmkQHFW
github.com/cloudflare/circl v1.3.2/go.mod h1:+CauBF6R70Jqcyl8N2hC8pAXYbWkGIezuSbuGLtRhnw=
github.com/cloudhut/common v0.8.0 h1:hfsiuRyou9y54zES3o5HWg0yDPklHsSoMnH2dGb/RrA=
github.com/cloudhut/common v0.8.0/go.mod h1:XoriOltDOhfkw8dcjYUCGEHOCVgpOlQgPqriq9m050c=
github.com/cloudhut/connect-client v0.0.0-20220929214026-6c714f7b6651 h1:QdBHcaLwShl6xSZCe9KqMiLNc+UXVvfaX/5+60hO1GY=
github.com/cloudhut/connect-client v0.0.0-20220929214026-6c714f7b6651/go.mod h1:Kfy2sPCGDgWZGkjyu3ei9uacNA2CMBLU9/PBpCcyiGg=
github.com/cloudhut/connect-client v0.0.0-20230417124247-963e5bcdfee7 h1:+sK5g5HwcTFZTiUPAUU3pB4EB8jszudLxxzwDdc/nrs=
github.com/cloudhut/connect-client v0.0.0-20230417124247-963e5bcdfee7/go.mod h1:khC9xPwJVKZQ02jtn3n64gEgOKtlVrcWuSOvmj5hSO4=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
Expand Down Expand Up @@ -632,10 +634,8 @@ golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY
golang.org/x/net v0.0.0-20220812174116-3211cb980234/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/net v0.0.0-20220826154423-83b083e8dc8b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM=
golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -735,9 +735,7 @@ golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9sn
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.0.0-20220722155259-a9ba230a4035/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U=
golang.org/x/term v0.7.0 h1:BEvjmm5fURWqcfbSKTdpkDXYBrUS1c0m8agp14W48vQ=
golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
13 changes: 12 additions & 1 deletion backend/pkg/api/handle_kafka_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ func (api *API) handleGetConnectors() http.HandlerFunc {
}

return func(w http.ResponseWriter, r *http.Request) {
fmt.Println("handleGetConnectors")

ctx, cancel := context.WithTimeout(r.Context(), api.ConnectSvc.Cfg.RequestTimeout)
defer cancel()

Expand Down Expand Up @@ -74,6 +76,8 @@ func (api *API) handleGetClusterConnectors() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
clusterName := rest.GetURLParam(r, "clusterName")

fmt.Println("handleGetClusterConnectors " + clusterName)

ctx, cancel := context.WithTimeout(r.Context(), api.ConnectSvc.Cfg.RequestTimeout)
defer cancel()

Expand Down Expand Up @@ -220,6 +224,13 @@ func (api *API) handlePutConnectorConfig() http.HandlerFunc {
rest.SendRESTError(w, r, api.Logger, restErr)
return
}

// restart the instance and all the tasks
restErr = api.ConnectSvc.RestartConnector(r.Context(), clusterName, connectorName, true)
if restErr != nil {
rest.SendRESTError(w, r, api.Logger, restErr)
return
}
rest.SendResponse(w, r, api.Logger, http.StatusOK, cInfo)
}
}
Expand Down Expand Up @@ -441,7 +452,7 @@ func (api *API) handleRestartConnector() http.HandlerFunc {
return
}

restErr = api.ConnectSvc.RestartConnector(ctx, clusterName, connector)
restErr = api.ConnectSvc.RestartConnector(ctx, clusterName, connector, true)
if restErr != nil {
rest.SendRESTError(w, r, api.Logger, restErr)
return
Expand Down
33 changes: 31 additions & 2 deletions backend/pkg/connect/get_connectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,32 @@ import (
"github.com/redpanda-data/console/backend/pkg/config"
)

// connectorState is the connector instance state that comes directly from Kafka connector API
type connectorState = string

const (
connectorStateUnassigned connectorState = "UNASSIGNED"
connectorStateRunning connectorState = "RUNNING"
connectorStatePaused connectorState = "PAUSED"
connectorStateFAILED connectorState = "FAILED"
connectorStateFailed connectorState = "FAILED"
connectorStateRestarting connectorState = "RESTARTING"
)

// connectorStatus is our holistic unified connector status that takes into account not just the
// connector instance state, but also state of all the tasks within the connector
type connectorStatus = string

const (
// Connector is in "running" state, >0 tasks, all of them running state
connectorStatusHealthy connectorStatus = "HEALTHY"
// Connector is "error" state
connectorStatusUnhealthy connectorStatus = "UNHEALTHY"
// Connector is "running" state, 0 tasks OR at least one task in failed state
connectorStatusDegraded connectorStatus = "DEGRADED"
// Connector is in "paused" state, or at least one task is in paused state
connectorStatusPaused connectorStatus = "PAUSED"
// Connector is in "restarting" state or at least one task is in restarting state
connectorStatusRestarting connectorStatus = "RESTARTING"
)

// ClusterConnectors contains all available information about the deployed connectors
Expand All @@ -55,7 +74,8 @@ type ClusterConnectorInfo struct {
Config map[string]string `json:"config"`
Type string `json:"type"` // Source or Sink
Topic string `json:"topic"` // Kafka Topic name
State string `json:"state"` // Running, ..
State connectorState `json:"state"` // Running, ..
Status connectorStatus `json:"status"`
TotalTasks int `json:"totalTasks"`
RunningTasks int `json:"runningTasks"`
Trace string `json:"trace,omitempty"`
Expand All @@ -78,6 +98,8 @@ func (s *Service) GetAllClusterConnectors(ctx context.Context) ([]*ClusterConnec
return nil, ErrKafkaConnectNotConfigured
}

fmt.Println("GetAllClusterConnectors")

ch := make(chan *ClusterConnectors, len(s.ClientsByCluster))
for _, cluster := range s.ClientsByCluster {
go func(cfg config.ConnectCluster, c *con.Client) {
Expand Down Expand Up @@ -107,6 +129,9 @@ func (s *Service) GetAllClusterConnectors(ctx context.Context) ([]*ClusterConnec
totalConnectors := 0
runningConnectors := 0
for _, connector := range connectors {

fmt.Printf("connector: %+v\n", connector)

totalConnectors++
if connector.Status.Connector.State == connectorStateRunning {
runningConnectors++
Expand Down Expand Up @@ -136,6 +161,8 @@ func (s *Service) GetAllClusterConnectors(ctx context.Context) ([]*ClusterConnec
// GetClusterConnectors returns the GET /connectors response for a single connect cluster. A cluster can be referenced
// by it's name (as specified in the user config).
func (s *Service) GetClusterConnectors(ctx context.Context, clusterName string) (ClusterConnectors, *rest.Error) {
fmt.Println("GetClusterConnectors")

c, restErr := s.getConnectClusterByName(clusterName)
if restErr != nil {
return ClusterConnectors{}, restErr
Expand All @@ -149,6 +176,8 @@ func (s *Service) GetClusterConnectors(ctx context.Context, clusterName string)
errMsg = err.Error()
}

fmt.Printf("connectors: %+v\n", connectors)

return ClusterConnectors{
ClusterName: c.Cfg.Name,
ClusterAddress: c.Cfg.URL,
Expand Down
5 changes: 2 additions & 3 deletions backend/pkg/connect/restart_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@ import (
)

// RestartConnector restarts the connector. Return 409 (Conflict) if rebalance is in process.
// No tasks are restarted as a result of a call to this endpoint. To restart tasks, see restart task.
func (s *Service) RestartConnector(ctx context.Context, clusterName string, connector string) *rest.Error {
func (s *Service) RestartConnector(ctx context.Context, clusterName string, connector string, restartTasks bool) *rest.Error {
c, restErr := s.getConnectClusterByName(clusterName)
if restErr != nil {
return restErr
}

err := c.Client.RestartConnector(ctx, connector, connect.RestartConnectorOptions{})
err := c.Client.RestartConnector(ctx, connector, connect.RestartConnectorOptions{IncludeTasks: restartTasks})
if err != nil {
return &rest.Error{
Err: err,
Expand Down

0 comments on commit 31a88f2

Please sign in to comment.