Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KC: Stop Connectors and Reset Connector Offsets #573

Merged
merged 41 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
fd5fedf
allow development behind proxy
Sep 26, 2024
70517a0
Add dropdown options to pause and stop connectors
Sep 30, 2024
6ce7078
format kafkbat-ui openapi definition
Sep 30, 2024
503a2a1
Add a dropdown option to reset the offsets of stopped connectors
Sep 30, 2024
b819013
Disable the "Remove Connector" button if the action is not allowed fo…
Oct 1, 2024
26e075d
Disable reset offsets button when connector is not stopped
Oct 2, 2024
d9d269b
Add frontend tests of connector reset
Oct 4, 2024
971c2c9
Add frontend tests for the reset connector offsets button
Oct 4, 2024
2c8f054
Merge branch 'main' into feature/reset-connector-offsets
Dugong42 Oct 4, 2024
c9d8aad
Fix code formatting and typos
Oct 4, 2024
a36bc20
Fix the confirmation message
Oct 4, 2024
b21d04e
fix missing resetConnectorOffsets mock in frontend unit tests
Oct 4, 2024
0c3495d
Revert "format kafkbat-ui openapi definition"
Oct 7, 2024
193daa0
Revert formatting changes in "Add a dropdown option to reset the offs…
Oct 7, 2024
05df53e
Revert formatting changes in "Fix code formatting and typos"
Oct 7, 2024
7b289c3
revert formatting changes to the controller
Oct 7, 2024
0d695cd
fix frontend tests
Oct 7, 2024
76ae0f1
fix prettier linter checks
Oct 7, 2024
d95b56f
Improve refresh behaviour on connector actions
Oct 7, 2024
f7345da
Merge branch 'main' into feature/reset-connector-offsets
Dugong42 Oct 8, 2024
cffa239
refacto using static imports for resetConnectorOffsets actions
Oct 29, 2024
951b8c3
Merge remote-tracking branch 'origin/main' into feature/reset-connect…
Oct 30, 2024
e8e4347
tests: reset stopped connector
Nov 5, 2024
67599c1
kafka connect API get offsets
Nov 5, 2024
2aebcf6
FIX: Type casting errors
Dugong42 Nov 5, 2024
e52d6cc
Merge branch 'fix/type-casting-errors' into feature/reset-connector-o…
Dugong42 Nov 5, 2024
f7f4591
Test bad request when resetting running connector
Dugong42 Nov 5, 2024
8c64835
Throw controlled BadRequest and NotFound errors for resetConnectorOff…
Dugong42 Nov 6, 2024
cf7cbdd
Use Confluent 7.7 and fix tests for resetConnectorOffsets
Dugong42 Nov 6, 2024
8f5915c
typo in test
Dugong42 Nov 6, 2024
62598e4
retry on kafka connect test setup failure
Dugong42 Nov 6, 2024
793c0a5
Handle error 500 by repeating the kafka connect test setUp
Dugong42 Nov 7, 2024
4a499e4
Optimistic creation of test connectors. Check existence and ignore cr…
Dugong42 Nov 8, 2024
035f2b3
cleanup
Nov 8, 2024
2b23258
Merge branch 'main' into feature/reset-connector-offsets
Dugong42 Nov 18, 2024
cb06d48
Merge commit 'a8811d1be3a0db2b02afa744c404ac961572b0d1' into feature/…
Dugong42 Jan 3, 2025
0d8ba54
adapt to connect retry mechanism
Dugong42 Jan 3, 2025
e802bf1
rollback test workaround leftovers
Dugong42 Jan 3, 2025
0e1bdb9
rollback forced typing leftovers
Dugong42 Jan 3, 2025
fbb6ddd
fix indent
Jan 3, 2025
513e3f9
Merge branch 'main' into feature/reset-connector-offsets
Haarolean Jan 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .mvn/jvm.config
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-Djava.net.useSystemProxies=true
1 change: 1 addition & 0 deletions api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@
</goals>
<configuration>
<arguments>build</arguments>
<pnpmInheritsProxyConfigFromMaven>false</pnpmInheritsProxyConfigFromMaven>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,16 @@ public Mono<ResponseEntity<Void>> pauseConnectorWithHttpInfo(String connectorNam
return withRetryOnConflictOrRebalance(super.pauseConnectorWithHttpInfo(connectorName));
}

@Override
public Mono<Void> stopConnector(String connectorName) throws WebClientResponseException {
return withRetryOnConflictOrRebalance(super.stopConnector(connectorName));
}

@Override
public Mono<ResponseEntity<Void>> stopConnectorWithHttpInfo(String connectorName) throws WebClientResponseException {
return withRetryOnConflictOrRebalance(super.stopConnectorWithHttpInfo(connectorName));
}

@Override
public Mono<Void> restartConnector(String connectorName, Boolean includeTasks, Boolean onlyFailed)
throws WebClientResponseException {
Expand All @@ -261,6 +271,18 @@ public Mono<ResponseEntity<Void>> restartConnectorTaskWithHttpInfo(String connec
return withRetryOnConflictOrRebalance(super.restartConnectorTaskWithHttpInfo(connectorName, taskId));
}

@Override
public Mono<Void> resetConnectorOffsets(String connectorName)
throws WebClientResponseException {
return withRetryOnConflictOrRebalance(super.resetConnectorOffsets(connectorName));
}

@Override
public Mono<ResponseEntity<Void>> resetConnectorOffsetsWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflictOrRebalance(super.resetConnectorOffsetsWithHttpInfo(connectorName));
}

@Override
public Mono<Void> resumeConnector(String connectorName) throws WebClientResponseException {
return withRetryOnRebalance(super.resumeConnector(connectorName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART;
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART_ALL_TASKS;
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART_FAILED_TASKS;
import static io.kafbat.ui.model.rbac.permission.ConnectAction.RESET_OFFSETS;
import static io.kafbat.ui.model.rbac.permission.ConnectAction.VIEW;

import io.kafbat.ui.api.KafkaConnectApi;
import io.kafbat.ui.model.ConnectDTO;
Expand Down Expand Up @@ -285,4 +287,23 @@ private Comparator<FullConnectorInfoDTO> getConnectorsComparator(ConnectorColumn
default -> defaultComparator;
};
}

@Override
public Mono<ResponseEntity<Void>> resetConnectorOffsets(String clusterName, String connectName,
Dugong42 marked this conversation as resolved.
Show resolved Hide resolved
String connectorName,
ServerWebExchange exchange) {

var context = AccessContext.builder()
.cluster(clusterName)
.connectActions(connectName, VIEW, RESET_OFFSETS)
.operationName("resetConnectorOffsets")
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
.build();

return validateAccess(context).then(
kafkaConnectService
.resetConnectorOffsets(getCluster(clusterName), connectName, connectorName)
.map(ResponseEntity::ok))
.doOnEach(sig -> audit(context, sig));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.kafbat.ui.exception;

public class ConnectorOffsetsResetException extends CustomBaseException {

public ConnectorOffsetsResetException(String message) {
super(message);
}

@Override
public ErrorCode getErrorCode() {
return ErrorCode.CONNECTOR_OFFSETS_RESET_ERROR;
}
}
1 change: 1 addition & 0 deletions api/src/main/java/io/kafbat/ui/exception/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public enum ErrorCode {
TOPIC_ANALYSIS_ERROR(4018, HttpStatus.BAD_REQUEST),
FILE_UPLOAD_EXCEPTION(4019, HttpStatus.INTERNAL_SERVER_ERROR),
CEL_ERROR(4020, HttpStatus.BAD_REQUEST),
CONNECTOR_OFFSETS_RESET_ERROR(4021, HttpStatus.BAD_REQUEST),
;

static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ public enum ConnectAction implements PermissibleAction {
EDIT(VIEW),
CREATE(VIEW),
RESTART(VIEW),
DELETE(VIEW)
DELETE(VIEW),
RESET_OFFSETS(VIEW)

;

Expand All @@ -20,7 +21,7 @@ public enum ConnectAction implements PermissibleAction {
this.dependantActions = dependantActions;
}

public static final Set<ConnectAction> ALTER_ACTIONS = Set.of(CREATE, EDIT, DELETE, RESTART);
public static final Set<ConnectAction> ALTER_ACTIONS = Set.of(CREATE, EDIT, DELETE, RESTART, RESET_OFFSETS);

@Nullable
public static ConnectAction fromString(String name) {
Expand Down
18 changes: 18 additions & 0 deletions api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
import io.kafbat.ui.connect.model.ConnectorTopics;
import io.kafbat.ui.connect.model.TaskStatus;
import io.kafbat.ui.exception.ConnectorOffsetsResetException;
import io.kafbat.ui.exception.NotFoundException;
import io.kafbat.ui.exception.ValidationException;
import io.kafbat.ui.mapper.ClusterMapper;
Expand Down Expand Up @@ -213,6 +214,7 @@ public Mono<Void> updateConnectorState(KafkaCluster cluster, String connectName,
case RESTART_FAILED_TASKS -> restartTasks(cluster, connectName, connectorName,
t -> t.getStatus().getState() == ConnectorTaskStatusDTO.FAILED);
case PAUSE -> client.pauseConnector(connectorName);
case STOP -> client.stopConnector(connectorName);
case RESUME -> client.resumeConnector(connectorName);
});
}
Expand Down Expand Up @@ -272,4 +274,20 @@ private ReactiveFailover<KafkaConnectClientApi> api(KafkaCluster cluster, String
}
return client;
}

public Mono<Void> resetConnectorOffsets(KafkaCluster cluster, String connectName,
String connectorName) {
return api(cluster, connectName)
.mono(client -> client.resetConnectorOffsets(connectorName))
.onErrorResume(WebClientResponseException.NotFound.class,
e -> {
throw new NotFoundException("Connector %s not found in %s".formatted(connectorName, connectName));
})
.onErrorResume(WebClientResponseException.BadRequest.class,
e -> {
throw new ConnectorOffsetsResetException(
"Failed to reset offsets of connector %s of %s. Make sure it is STOPPED first."
.formatted(connectorName, connectName));
});
}
}
60 changes: 57 additions & 3 deletions api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpStatus;
import org.springframework.test.web.reactive.server.ExchangeResult;
import org.springframework.test.web.reactive.server.WebTestClient;

@Slf4j
Expand All @@ -45,6 +47,7 @@ public class KafkaConnectServiceTests extends AbstractIntegrationTest {

@BeforeEach
public void setUp() {

webTestClient.post()
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
.bodyValue(new NewConnectorDTO()
Expand All @@ -54,11 +57,10 @@ public void setUp() {
"tasks.max", "1",
"topics", "output-topic",
"file", "/tmp/test",
"test.password", "test-credentials"
))
)
"test.password", "test-credentials")))
.exchange()
.expectStatus().isOk();

}

@AfterEach
Expand Down Expand Up @@ -418,4 +420,56 @@ public void shouldReturn400WhenTryingToCreateConnectorWithExistingName() {
.expectStatus()
.isBadRequest();
}

@Test
public void shouldResetConnectorWhenInStoppedState() {

webTestClient.get()
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}",
LOCAL, connectName, connectorName)
.exchange()
.expectStatus().isOk()
.expectBody(ConnectorDTO.class)
.value(connector -> assertThat(connector.getStatus().getState()).isEqualTo(ConnectorStateDTO.RUNNING));

webTestClient.post()
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/action/STOP",
LOCAL, connectName, connectorName)
.exchange()
.expectStatus().isOk();

webTestClient.get()
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}",
LOCAL, connectName, connectorName)
.exchange()
.expectStatus().isOk()
.expectBody(ConnectorDTO.class)
.value(connector -> assertThat(connector.getStatus().getState()).isEqualTo(ConnectorStateDTO.STOPPED));

webTestClient.delete()
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/offsets",
LOCAL, connectName, connectorName)
.exchange()
.expectStatus().isOk();

}

@Test
public void shouldReturn400WhenResettingConnectorInRunningState() {

webTestClient.get()
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}",
LOCAL, connectName, connectorName)
.exchange()
.expectStatus().isOk()
.expectBody(ConnectorDTO.class)
.value(connector -> assertThat(connector.getStatus().getState()).isEqualTo(ConnectorStateDTO.RUNNING));

webTestClient.delete()
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/offsets", LOCAL,
connectName, connectorName)
.exchange()
.expectStatus().isBadRequest();

}
}
1 change: 1 addition & 0 deletions contract/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@
</goals>
<configuration>
<arguments>gen:sources</arguments>
<pnpmInheritsProxyConfigFromMaven>false</pnpmInheritsProxyConfigFromMaven>
</configuration>
</execution>
</executions>
Expand Down
39 changes: 37 additions & 2 deletions contract/src/main/resources/swagger/kafbat-ui-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1565,7 +1565,7 @@ paths:
post:
tags:
- Kafka Connect
summary: update connector state (restart, pause or resume)
summary: update connector state (restart, pause, stop or resume)
operationId: updateConnectorState
parameters:
- name: clusterName
Expand Down Expand Up @@ -1722,6 +1722,31 @@ paths:
200:
description: OK

/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/offsets:
delete:
tags:
- Kafka Connect
summary: reset the offsets for the specified connector
operationId: resetConnectorOffsets
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: connectName
in: path
required: true
schema:
type: string
- name: connectorName
in: path
required: true
schema:
type: string
responses:
200:
description: OK

/api/clusters/{clusterName}/ksql/v2:
post:
Expand Down Expand Up @@ -3567,6 +3592,7 @@ components:
- RESTART_FAILED_TASKS
- PAUSE
- RESUME
- STOP

TaskAction:
type: string
Expand Down Expand Up @@ -3953,7 +3979,16 @@ components:

KafkaAcl:
type: object
required: [resourceType, resourceName, namePatternType, principal, host, operation, permission]
required:
[
resourceType,
resourceName,
namePatternType,
principal,
host,
operation,
permission,
]
properties:
resourceType:
$ref: '#/components/schemas/KafkaAclResourceType'
Expand Down
Loading
Loading