Skip to content

Commit

Permalink
[RHCLOUD-28513] avoid npe on connector returns (RedHatInsights#2199)
Browse files Browse the repository at this point in the history
* [RHCLOUD-28513] avoid npe on connector returns if original endpoint has been deleted

* Update from code review: history must be updated even if endpoint has gone
  • Loading branch information
g-duval authored Sep 27, 2023
1 parent 7a6f441 commit 32a9e20
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,16 @@ public void processAsync(String payload) {
try {
Log.infof("Processing return from camel: %s", payload);
Map<String, Object> decodedPayload = decodeItem(payload);
reinjectIfNeeded(decodedPayload);

String historyId = (String) decodedPayload.get("historyId");
final Endpoint endpoint = notificationHistoryRepository.getEndpointForHistoryId(historyId);

reinjectIfNeeded(endpoint, decodedPayload);
boolean updated = camelHistoryFillerHelper.updateHistoryItem(decodedPayload);
if (!updated) {
Log.warnf("Camel notification history update failed because no record was found with [id=%s]", decodedPayload.get("historyId"));
}
endpointErrorFromConnectorHelper.manageEndpointDisablingIfNeeded(new JsonObject(payload));
endpointErrorFromConnectorHelper.manageEndpointDisablingIfNeeded(endpoint, new JsonObject(payload));
} catch (Exception e) {
messagesErrorCounter.increment();
Log.error("| Failure to update the history", e);
Expand All @@ -93,16 +97,14 @@ public void processAsync(String payload) {
}
}

private void reinjectIfNeeded(Map<String, Object> payloadMap) {
private void reinjectIfNeeded(Endpoint endpoint, Map<String, Object> payloadMap) {
if (!featureFlipper.isEnableReInject() || (payloadMap.containsKey("successful") && ((Boolean) payloadMap.get("successful")))) {
return;
}

String historyId = (String) payloadMap.get("historyId");
Log.infof("Notification with id %s was not successful, resubmitting for further processing", historyId);

Endpoint ep = notificationHistoryRepository.getEndpointForHistoryId(historyId);

Event event = new Event();
Payload.PayloadBuilder payloadBuilder = new Payload.PayloadBuilder();
payloadMap.forEach(payloadBuilder::withAdditionalProperty);
Expand All @@ -113,17 +115,17 @@ private void reinjectIfNeeded(Map<String, Object> payloadMap) {
// Save the original id, as we may need it in the future.
Context.ContextBuilder contextBuilder = new Context.ContextBuilder();
contextBuilder.withAdditionalProperty("original-id", historyId);
if (ep != null) { // TODO For the current tests. EP should not be null in real life
contextBuilder.withAdditionalProperty("failed-integration", ep.getName());
if (endpoint != null) { // TODO For the current tests. EP should not be null in real life
contextBuilder.withAdditionalProperty("failed-integration", endpoint.getName());
}

Action action = new Action.ActionBuilder()
.withId(UUID.randomUUID())
.withBundle("console")
.withApplication("integrations")
.withEventType(INTEGRATION_FAILED_EVENT_TYPE)
.withAccountId(ep != null ? ep.getAccountId() : "")
.withOrgId(ep != null && ep.getOrgId() != null ? ep.getOrgId() : "")
.withAccountId(endpoint != null ? endpoint.getAccountId() : "")
.withOrgId(endpoint != null && endpoint.getOrgId() != null ? endpoint.getOrgId() : "")
.withContext(contextBuilder.build())
.withTimestamp(LocalDateTime.now(ZoneOffset.UTC))
.withEvents(Collections.singletonList(event))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.UUID;

@ApplicationScoped
public class EndpointErrorFromConnectorHelper {
Expand Down Expand Up @@ -48,25 +47,24 @@ void postConstruct() {
disabledWebhooksClientErrorCount = registry.counter(DISABLED_WEBHOOKS_COUNTER, ERROR_TYPE_TAG_KEY, CLIENT_TAG_VALUE);
}

public void manageEndpointDisablingIfNeeded(JsonObject payload) {
public void manageEndpointDisablingIfNeeded(Endpoint endpoint, JsonObject payload) {
String strHistoryId = payload.getString("id");
if (endpoint == null) {
Log.debugf("Unable to update endpoint data from history %s, because it no longer exists", strHistoryId);
return;
}

JsonObject data = new JsonObject(payload.getString("data"));
if (strHistoryId != null) {
UUID historyId = UUID.fromString(strHistoryId);
boolean shouldIncrementServerError = data.getBoolean(INCREMENT_ENDPOINT_SERVER_ERRORS, false);
boolean shouldDisableEndpointClientError = data.getBoolean(DISABLE_ENDPOINT_CLIENT_ERRORS, false);

if (data.getBoolean("successful", false)) {
final Endpoint endpoint = notificationHistoryRepository.getEndpointForHistoryId(historyId.toString());
if (endpoint != null) {
boolean reset = endpointRepository.resetEndpointServerErrors(endpoint.getId());
if (reset) {
Log.infof("The server errors counter of endpoint %s was just reset", endpoint.getId());
}
boolean reset = endpointRepository.resetEndpointServerErrors(endpoint.getId());
if (reset) {
Log.infof("The server errors counter of endpoint %s was just reset", endpoint.getId());
}
} else if (shouldIncrementServerError || shouldDisableEndpointClientError) {
final Endpoint endpoint = notificationHistoryRepository.getEndpointForHistoryId(historyId.toString());

if (shouldDisableEndpointClientError) {
/*
* The target endpoint returned a 4xx status. That kind of error requires an update of the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import com.redhat.cloud.notifications.Json;
import com.redhat.cloud.notifications.MicrometerAssertionHelper;
import com.redhat.cloud.notifications.TestLifecycleManager;
import com.redhat.cloud.notifications.db.repositories.EndpointRepository;
import com.redhat.cloud.notifications.db.repositories.NotificationHistoryRepository;
import com.redhat.cloud.notifications.models.Endpoint;
import com.redhat.cloud.notifications.models.NotificationHistory;
import com.redhat.cloud.notifications.models.NotificationStatus;
import io.quarkus.test.InjectMock;
Expand All @@ -17,9 +19,11 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import static com.redhat.cloud.notifications.events.ConnectorReceiver.FROMCAMEL_CHANNEL;
import static com.redhat.cloud.notifications.events.ConnectorReceiver.MESSAGES_ERROR_COUNTER_NAME;
Expand Down Expand Up @@ -47,12 +51,20 @@ public class ConnectorReceiverTest {
@InjectSpy
CamelHistoryFillerHelper camelHistoryFillerHelper;

@InjectSpy
EndpointRepository endpointRepository;

final String expectedHistoryId = UUID.randomUUID().toString();

@BeforeEach
void beforeEach() {
micrometerAssertionHelper.saveCounterValuesBeforeTest(
MESSAGES_PROCESSED_COUNTER_NAME,
MESSAGES_ERROR_COUNTER_NAME
);
final Endpoint endpoint = new Endpoint();
endpoint.setId(UUID.fromString(expectedHistoryId));
Mockito.when(notificationHistoryRepository.getEndpointForHistoryId(Mockito.eq(expectedHistoryId))).thenReturn(endpoint);
}

@AfterEach
Expand Down Expand Up @@ -101,8 +113,17 @@ void testPayloadWithDurationFitsLong2() {
testPayload(true, 2147483600000L, null, NotificationStatus.SUCCESS);
}

@Test
void testValidPayloadWithDeletedEndpoint() {
testPayload(UUID.randomUUID().toString(), true, 67549274, null, NotificationStatus.SUCCESS);
}

private void testPayload(boolean isSuccessful, long expectedDuration, String expectedOutcome, NotificationStatus expectedNotificationStatus) {
String expectedHistoryId = "e3c90a94-751b-4ce1-b345-b85d825795a4";
testPayload(expectedHistoryId, isSuccessful, expectedDuration, expectedOutcome, expectedNotificationStatus);
}

private void testPayload(String historyId, boolean isSuccessful, long expectedDuration, String expectedOutcome, NotificationStatus expectedNotificationStatus) {

String expectedDetailsType = "com.redhat.console.notification.toCamel.tower";
String expectedDetailsTarget = "1.2.3.4";

Expand All @@ -123,7 +144,7 @@ private void testPayload(boolean isSuccessful, long expectedDuration, String exp
"source", "demo-log",
"type", "com.redhat.cloud.notifications.history",
"time", "2021-12-14T10:08:23.217Z",
"id", expectedHistoryId,
"id", historyId,
"content-type", "application/json",
"data", Json.encode(dataMap)
));
Expand All @@ -143,11 +164,17 @@ private void testPayload(boolean isSuccessful, long expectedDuration, String exp

assertEquals(expectedNotificationStatus, nhUpdate.getValue().getStatus());

assertEquals(expectedHistoryId, decodedPayload.getValue().get("historyId"));
assertEquals(historyId, decodedPayload.getValue().get("historyId"));
assertEquals(expectedDuration, ((Number) decodedPayload.getValue().get("duration")).longValue());
assertEquals(expectedOutcome, decodedPayload.getValue().get("outcome"));
Map<String, Object> details = (Map<String, Object>) decodedPayload.getValue().get("details");
assertEquals(expectedDetailsType, details.get("type"));
assertEquals(expectedDetailsTarget, details.get("target"));

if (!expectedHistoryId.equals(historyId)) {
verifyNoInteractions(endpointRepository);
} else if (isSuccessful) {
verify(endpointRepository, times(1)).resetEndpointServerErrors(UUID.fromString(expectedHistoryId));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.redhat.cloud.notifications.Json;
import com.redhat.cloud.notifications.MicrometerAssertionHelper;
import com.redhat.cloud.notifications.db.repositories.EndpointRepository;
import com.redhat.cloud.notifications.db.repositories.NotificationHistoryRepository;
import com.redhat.cloud.notifications.models.Endpoint;
import com.redhat.cloud.notifications.models.EndpointType;
import io.quarkus.test.InjectMock;
Expand Down Expand Up @@ -32,9 +31,6 @@
@QuarkusTest
class EndpointErrorFromConnectorHelperTest {

@InjectMock
NotificationHistoryRepository notificationHistoryRepository;

@Inject
MicrometerAssertionHelper micrometerAssertionHelper;

Expand All @@ -58,68 +54,54 @@ void afterEach() {
micrometerAssertionHelper.clearSavedValues();
}

@Test
void testEndpointNotFound() {
JsonObject payload = buildTestPayload(true, 200, null, null);
endpointErrorFromConnectorHelper.manageEndpointDisablingIfNeeded(payload);
verify(notificationHistoryRepository, times(1)).getEndpointForHistoryId(payload.getString("id"));
verifyNoInteractions(endpointRepository);
verifyNoInteractions(integrationDisabledNotifier);
assertMetrics(0, 0);
}

@Test
void testResetEndpointServerErrorCount() {
final UUID endpointUuid = mockEndpointFromNotificationHistorySearch();
final Endpoint endpoint = mockEndpointFromNotificationHistorySearch();

JsonObject payload = buildTestPayload(true, 200, null, null);
endpointErrorFromConnectorHelper.manageEndpointDisablingIfNeeded(payload);
verify(notificationHistoryRepository, times(1)).getEndpointForHistoryId(payload.getString("id"));
verify(endpointRepository, times(1)).resetEndpointServerErrors(endpointUuid);
endpointErrorFromConnectorHelper.manageEndpointDisablingIfNeeded(endpoint, payload);
verify(endpointRepository, times(1)).resetEndpointServerErrors(endpoint.getId());
verifyNoInteractions(integrationDisabledNotifier);
assertMetrics(0, 0);
}

@Test
void testIncreaseServerErrorCount() {
final UUID endpointUuid = mockEndpointFromNotificationHistorySearch();
final Endpoint endpoint = mockEndpointFromNotificationHistorySearch();

JsonObject payload = buildTestPayload(false, 503, true, null);
endpointErrorFromConnectorHelper.manageEndpointDisablingIfNeeded(payload);
verify(notificationHistoryRepository, times(1)).getEndpointForHistoryId(payload.getString("id"));
verify(endpointRepository, times(1)).incrementEndpointServerErrors(eq(endpointUuid), anyInt());
endpointErrorFromConnectorHelper.manageEndpointDisablingIfNeeded(endpoint, payload);
verify(endpointRepository, times(1)).incrementEndpointServerErrors(eq(endpoint.getId()), anyInt());
verifyNoInteractions(integrationDisabledNotifier);
assertMetrics(0, 0);
}

@Test
void testIncreaseAndDisableServerErrorCount() {
final UUID endpointUuid = mockEndpointFromNotificationHistorySearch();
Mockito.when(endpointRepository.incrementEndpointServerErrors(eq(endpointUuid), anyInt())).thenReturn(true);
final Endpoint endpoint = mockEndpointFromNotificationHistorySearch();
Mockito.when(endpointRepository.incrementEndpointServerErrors(eq(endpoint.getId()), anyInt())).thenReturn(true);

JsonObject payload = buildTestPayload(false, 503, true, null);
endpointErrorFromConnectorHelper.manageEndpointDisablingIfNeeded(payload);
verify(notificationHistoryRepository, times(1)).getEndpointForHistoryId(payload.getString("id"));
verify(endpointRepository, times(1)).incrementEndpointServerErrors(eq(endpointUuid), anyInt());
endpointErrorFromConnectorHelper.manageEndpointDisablingIfNeeded(endpoint, payload);
verify(endpointRepository, times(1)).incrementEndpointServerErrors(eq(endpoint.getId()), anyInt());
verify(integrationDisabledNotifier, times(1)).tooManyServerErrors(any(), anyInt());
assertMetrics(1, 0);
}

@Test
void testDisableEndpointBecauseOfClientError() {
final UUID endpointUuid = mockEndpointFromNotificationHistorySearch();
Mockito.when(endpointRepository.disableEndpoint(endpointUuid)).thenReturn(true);
final Endpoint endpoint = mockEndpointFromNotificationHistorySearch();
Mockito.when(endpointRepository.disableEndpoint(endpoint.getId())).thenReturn(true);

JsonObject payload = buildTestPayload(false, 408, null, true);
endpointErrorFromConnectorHelper.manageEndpointDisablingIfNeeded(payload);
verify(notificationHistoryRepository, times(1)).getEndpointForHistoryId(payload.getString("id"));
verify(endpointRepository, times(1)).disableEndpoint(endpointUuid);
endpointErrorFromConnectorHelper.manageEndpointDisablingIfNeeded(endpoint, payload);
verify(endpointRepository, times(1)).disableEndpoint(endpoint.getId());
verify(integrationDisabledNotifier, times(1)).clientError(any(), eq(408));
assertMetrics(0, 1);
}

@NotNull
private UUID mockEndpointFromNotificationHistorySearch() {
private Endpoint mockEndpointFromNotificationHistorySearch() {
// Create an Endpoint which will be simulated to be fetched from the database.
final String orgId = "test-org-id";
final UUID endpointUuid = UUID.randomUUID();
Expand All @@ -129,8 +111,7 @@ private UUID mockEndpointFromNotificationHistorySearch() {
endpointFixture.setOrgId(orgId);
endpointFixture.setType(EndpointType.WEBHOOK);

Mockito.when(notificationHistoryRepository.getEndpointForHistoryId(anyString())).thenReturn(endpointFixture);
return endpointUuid;
return endpointFixture;
}


Expand Down

0 comments on commit 32a9e20

Please sign in to comment.