Skip to content

Commit

Permalink
Update threat intel job mapping to new version (opensearch-project#1272)
Browse files Browse the repository at this point in the history
* update mapping

Signed-off-by: Joanne Wang <[email protected]>

* fix integ tests

Signed-off-by: Joanne Wang <[email protected]>

* remove debug in case of null and return listener on failure for lock

Signed-off-by: Joanne Wang <[email protected]>

* remove stash context and change to local.root

Signed-off-by: Joanne Wang <[email protected]>

* rename releaseLock

Signed-off-by: Joanne Wang <[email protected]>

* check for concurrent modification exception

Signed-off-by: Joanne Wang <[email protected]>

* remove exception check

Signed-off-by: Joanne Wang <[email protected]>

---------

Signed-off-by: Joanne Wang <[email protected]>
  • Loading branch information
jowg-amazon authored Sep 4, 2024
1 parent f89c3ed commit 66f1155
Show file tree
Hide file tree
Showing 12 changed files with 312 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,24 +73,13 @@ public void onFailure(final Exception e) {
*
* @param lockModel the lock model
*/
public void releaseLock(final LockModel lockModel) {
log.debug("Releasing lock with id [{}]", lockModel.getLockId());
public void releaseLock(final LockModel lockModel, final ActionListener<Boolean> listener) {
lockService.release(
lockModel,
ActionListener.wrap(released -> {}, exception -> log.error("Failed to release the lock", exception))
);
}

/**
* Wrapper method of LockService#release
*
* @param lockModel the lock model
*/
public void releaseLockEventDriven(final LockModel lockModel, final ActionListener<Boolean> listener) {
log.debug("Releasing lock with id [{}]", lockModel.getLockId());
lockService.release(
lockModel,
ActionListener.wrap(listener::onResponse, exception -> log.error("Failed to release the lock", exception))
ActionListener.wrap(listener::onResponse, exception -> {
log.error("Failed to release the lock", exception);
listener.onFailure(exception);
})
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ protected Runnable updateJobRunner(final ScheduledJobParameter jobParameter) {
ActionListener.wrap(lock -> {
updateJobParameter(jobParameter, lockService.getRenewLockRunnable(new AtomicReference<>(lock)),
ActionListener.wrap(
r -> lockService.releaseLockEventDriven(lock, ActionListener.wrap(
r -> lockService.releaseLock(lock, ActionListener.wrap(
response -> {
log.debug("Released tif job parameter lock with id [{}]", lock.getLockId());
},
Expand All @@ -125,7 +125,7 @@ protected Runnable updateJobRunner(final ScheduledJobParameter jobParameter) {
)),
e -> {
log.error("Failed to update job parameter " + jobParameter.getName(), e);
lockService.releaseLockEventDriven(lock, ActionListener.wrap(
lockService.releaseLock(lock, ActionListener.wrap(
response -> {
log.debug("Released tif job parameter lock with id [{}]", lock.getLockId());
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ protected Runnable retrieveLockAndUpdateConfig(final SATIFSourceConfig saTifSour
updateSourceConfigAndIOCs(saTifSourceConfig, lockService.getRenewLockRunnable(new AtomicReference<>(lock)),
ActionListener.wrap(
r -> {
lockService.releaseLockEventDriven(lock, ActionListener.wrap(
lockService.releaseLock(lock, ActionListener.wrap(
response -> {
log.debug("Released threat intel source config lock with id [{}]", lock.getLockId());
},
Expand All @@ -119,7 +119,7 @@ protected Runnable retrieveLockAndUpdateConfig(final SATIFSourceConfig saTifSour
},
e -> {
log.error("Failed to update threat intel source config " + saTifSourceConfig.getName(), e);
lockService.releaseLockEventDriven(lock, ActionListener.wrap(
lockService.releaseLock(lock, ActionListener.wrap(
response -> {
log.debug("Released threat intel source config lock with id [{}]", lock.getLockId());
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@
import org.opensearch.securityanalytics.commons.model.IOCType;
import org.opensearch.securityanalytics.threatIntel.action.monitor.SearchThreatIntelMonitorAction;
import org.opensearch.securityanalytics.threatIntel.action.monitor.request.SearchThreatIntelMonitorRequest;
import org.opensearch.securityanalytics.threatIntel.common.StashedThreadContext;
import org.opensearch.securityanalytics.threatIntel.common.TIFLockService;
import org.opensearch.securityanalytics.threatIntel.model.DefaultIocStoreConfig;
import org.opensearch.securityanalytics.threatIntel.model.SATIFSourceConfig;
import org.opensearch.securityanalytics.util.IndexUtils;
import org.opensearch.securityanalytics.util.SecurityAnalyticsException;
import org.opensearch.threadpool.ThreadPool;

Expand All @@ -80,6 +80,7 @@
import static org.opensearch.securityanalytics.threatIntel.model.SATIFSourceConfig.SOURCE_CONFIG_FIELD;
import static org.opensearch.securityanalytics.threatIntel.model.SATIFSourceConfig.STATE_FIELD;
import static org.opensearch.securityanalytics.transport.TransportIndexDetectorAction.PLUGIN_OWNER_FIELD;
import static org.opensearch.securityanalytics.util.IndexUtils.shouldUpdateIndex;

/**
* CRUD for threat intel feeds source config object
Expand All @@ -93,7 +94,6 @@ public class SATIFSourceConfigService {
private final NamedXContentRegistry xContentRegistry;
private final TIFLockService lockService;


public SATIFSourceConfigService(final Client client,
final ClusterService clusterService,
ThreadPool threadPool,
Expand Down Expand Up @@ -139,7 +139,7 @@ public void indexTIFSourceConfig(SATIFSourceConfig saTifSourceConfig,
}
}, exception -> {
log.error("Failed to create threat intel source config index", exception);
lockService.releaseLockEventDriven(lock, ActionListener.wrap(
lockService.releaseLock(lock, ActionListener.wrap(
r -> {
log.debug("Released threat intel source config lock with id [{}]", lock.getLockId());
actionListener.onFailure(exception);
Expand Down Expand Up @@ -197,33 +197,75 @@ private String getIndexMapping() {
/**
* Index name: .opensearch-sap--job
* Mapping: /mappings/threat_intel_job_mapping.json
* Updates the job index mapping if currently on a previous version
*
* @param stepListener setup listener
*/
public void createJobIndexIfNotExists(final StepListener<Void> stepListener) {
// check if job index exists
if (clusterService.state().metadata().hasIndex(SecurityAnalyticsPlugin.JOB_INDEX_NAME) == true) {
stepListener.onResponse(null);
return;
}
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(SecurityAnalyticsPlugin.JOB_INDEX_NAME).mapping(getIndexMapping())
.settings(SecurityAnalyticsPlugin.TIF_JOB_INDEX_SETTING);
StashedThreadContext.run(client, () -> client.admin().indices().create(createIndexRequest, ActionListener.wrap(
r -> {
log.debug("[{}] index created", SecurityAnalyticsPlugin.JOB_INDEX_NAME);
stepListener.onResponse(null);
}, e -> {
if (e instanceof ResourceAlreadyExistsException) {
log.info("Index [{}] already exists", SecurityAnalyticsPlugin.JOB_INDEX_NAME);
checkAndUpdateJobIndexMapping(stepListener);
} else {
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(SecurityAnalyticsPlugin.JOB_INDEX_NAME).mapping(getIndexMapping())
.settings(SecurityAnalyticsPlugin.TIF_JOB_INDEX_SETTING);
client.admin().indices().create(createIndexRequest, ActionListener.wrap(
r -> {
log.debug("[{}] index created", SecurityAnalyticsPlugin.JOB_INDEX_NAME);
stepListener.onResponse(null);
return;
}, e -> {
if (e instanceof ResourceAlreadyExistsException) {
log.info("Index [{}] already exists", SecurityAnalyticsPlugin.JOB_INDEX_NAME);
stepListener.onResponse(null);
return;
}
log.error("Failed to create [{}] index", SecurityAnalyticsPlugin.JOB_INDEX_NAME, e);
stepListener.onFailure(e);
}
log.error("Failed to create [{}] index", SecurityAnalyticsPlugin.JOB_INDEX_NAME, e);
stepListener.onFailure(e);
}
)));
));
}
}

private void checkAndUpdateJobIndexMapping(StepListener<Void> stepListener) {
try {
// Check if job index contains old mapping, if so update index mapping (current version = 2)
if (shouldUpdateIndex(clusterService.state().metadata().index(SecurityAnalyticsPlugin.JOB_INDEX_NAME), getIndexMapping())) {
log.info("Old schema version found for [{}] index, updating the index mapping", SecurityAnalyticsPlugin.JOB_INDEX_NAME);
IndexUtils.updateIndexMapping(
SecurityAnalyticsPlugin.JOB_INDEX_NAME,
getIndexMapping(), clusterService.state(), client.admin().indices(),
ActionListener.wrap(
r -> {
log.info("Successfully updated index mapping for [{}] index", SecurityAnalyticsPlugin.JOB_INDEX_NAME);
stepListener.onResponse(null);
}, e -> {
// Check if version is updated despite failure
try {
// Check if job index still contains older mapping
if (shouldUpdateIndex(clusterService.state().metadata().index(SecurityAnalyticsPlugin.JOB_INDEX_NAME), getIndexMapping())) {
log.error("Job index still contains older mapping, failed to update job index mapping", e);
stepListener.onFailure(e);
} else {
// If job index contains newest mapping, then return success
log.info("Successfully updated index mapping for [{}] index", SecurityAnalyticsPlugin.JOB_INDEX_NAME);
stepListener.onResponse(null);
}
} catch (IOException exception) {
log.error("Failed to check if job index contains older mapping. Failed to update job index mapping", e);
stepListener.onFailure(e);
}
}
),
false
);
} else {
// If job index contains newest mapping, then do nothing
stepListener.onResponse(null);
}
} catch (IOException e) {
log.error("Failed to check and update job index mapping", e);
stepListener.onFailure(e);
}
}

// Get TIF source config
public void getTIFSourceConfig(
Expand All @@ -234,7 +276,7 @@ public void getTIFSourceConfig(
client.get(getRequest, ActionListener.wrap(
getResponse -> {
if (!getResponse.isExists()) {
actionListener.onFailure(SecurityAnalyticsException.wrap(new OpenSearchStatusException(String.format(Locale.getDefault(),"Threat intel source config [%s] not found.", tifSourceConfigId), RestStatus.NOT_FOUND)));
actionListener.onFailure(SecurityAnalyticsException.wrap(new OpenSearchStatusException(String.format(Locale.ROOT, "Threat intel source config [%s] not found.", tifSourceConfigId), RestStatus.NOT_FOUND)));
return;
}
SATIFSourceConfig saTifSourceConfig = null;
Expand All @@ -246,7 +288,7 @@ public void getTIFSourceConfig(
saTifSourceConfig = SATIFSourceConfig.docParse(xcp, getResponse.getId(), getResponse.getVersion());
}
if (saTifSourceConfig == null) {
actionListener.onFailure(SecurityAnalyticsException.wrap(new OpenSearchStatusException(String.format(Locale.getDefault(),"No threat intel source config exists [%s]", tifSourceConfigId), RestStatus.BAD_REQUEST)));
actionListener.onFailure(SecurityAnalyticsException.wrap(new OpenSearchStatusException(String.format(Locale.ROOT, "No threat intel source config exists [%s]", tifSourceConfigId), RestStatus.BAD_REQUEST)));
} else {
log.debug("Threat intel source config with id [{}] fetched", getResponse.getId());
actionListener.onResponse(saTifSourceConfig);
Expand Down Expand Up @@ -366,9 +408,9 @@ public void deleteTIFSourceConfig(
log.info("Deleted threat intel source config [{}] successfully", saTifSourceConfig.getId());
actionListener.onResponse(deleteResponse);
} else if (deleteResponse.status().equals(RestStatus.NOT_FOUND)) {
actionListener.onFailure(SecurityAnalyticsException.wrap(new OpenSearchStatusException(String.format(Locale.getDefault(), "Threat intel source config with id [{%s}] not found", saTifSourceConfig.getId()), RestStatus.NOT_FOUND)));
actionListener.onFailure(SecurityAnalyticsException.wrap(new OpenSearchStatusException(String.format(Locale.ROOT, "Threat intel source config with id [{%s}] not found", saTifSourceConfig.getId()), RestStatus.NOT_FOUND)));
} else {
actionListener.onFailure(SecurityAnalyticsException.wrap(new OpenSearchStatusException(String.format(Locale.getDefault(), "Failed to delete threat intel source config [{%s}]", saTifSourceConfig.getId()), deleteResponse.status())));
actionListener.onFailure(SecurityAnalyticsException.wrap(new OpenSearchStatusException(String.format(Locale.ROOT, "Failed to delete threat intel source config [{%s}]", saTifSourceConfig.getId()), deleteResponse.status())));
}
}, e -> {
log.error("Failed to delete threat intel source config with id [{}]", saTifSourceConfig.getId());
Expand Down Expand Up @@ -407,7 +449,7 @@ public void deleteJobSchedulerLockIfJobDisabled(
log.info("Threat intel job scheduler lock with id [{}] not found", id);
actionListener.onResponse(deleteResponse);
} else {
actionListener.onFailure(SecurityAnalyticsException.wrap(new OpenSearchStatusException(String.format(Locale.getDefault(), "Failed to delete threat intel job scheduler lock with id [{%s}]", id), deleteResponse.status())));
actionListener.onFailure(SecurityAnalyticsException.wrap(new OpenSearchStatusException(String.format(Locale.ROOT, "Failed to delete threat intel job scheduler lock with id [{%s}]", id), deleteResponse.status())));
}
}, e -> {
log.error("Failed to delete threat intel job scheduler lock with id [{}]", id);
Expand Down Expand Up @@ -453,7 +495,7 @@ private void deleteIocIndex(Set<String> indicesToDelete, Boolean backgroundJob,
if (!response.isAcknowledged()) {
log.error("Could not delete one or more IOC indices: " + index);
if (backgroundJob == false) {
listener.onFailure(SecurityAnalyticsException.wrap(new OpenSearchStatusException(String.format(Locale.getDefault(), "Could not delete one or more IOC indices: " + index), RestStatus.BAD_REQUEST)));
listener.onFailure(SecurityAnalyticsException.wrap(new OpenSearchStatusException(String.format(Locale.ROOT, "Could not delete one or more IOC indices: " + index), RestStatus.BAD_REQUEST)));
}
} else {
log.debug("Successfully deleted one or more IOC indices:" + index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import org.opensearch.securityanalytics.threatIntel.action.SAIndexTIFSourceConfigAction;
import org.opensearch.securityanalytics.threatIntel.action.SAIndexTIFSourceConfigRequest;
import org.opensearch.securityanalytics.threatIntel.action.SAIndexTIFSourceConfigResponse;
import org.opensearch.securityanalytics.threatIntel.common.SourceConfigType;
import org.opensearch.securityanalytics.threatIntel.common.TIFLockService;
import org.opensearch.securityanalytics.threatIntel.model.SATIFSourceConfigDto;
import org.opensearch.securityanalytics.threatIntel.model.UrlDownloadSource;
import org.opensearch.securityanalytics.threatIntel.service.SATIFSourceConfigManagementService;
import org.opensearch.securityanalytics.transport.SecureTransportAction;
import org.opensearch.securityanalytics.util.SecurityAnalyticsException;
Expand Down Expand Up @@ -106,7 +104,7 @@ private void retrieveLockAndCreateTIFConfig(SAIndexTIFSourceConfigRequest reques
user,
ActionListener.wrap(
saTifSourceConfigDtoResponse -> {
lockService.releaseLockEventDriven(lock, ActionListener.wrap(
lockService.releaseLock(lock, ActionListener.wrap(
r -> {
log.debug("Released threat intel source config lock with id [{}]", lock.getLockId());
listener.onResponse(new SAIndexTIFSourceConfigResponse(
Expand All @@ -129,7 +127,7 @@ private void retrieveLockAndCreateTIFConfig(SAIndexTIFSourceConfigRequest reques
}, e -> {
String action = RestRequest.Method.PUT.equals(request.getMethod()) ? "update" : "create";
log.error(String.format("Failed to %s IOCs and threat intel source config", action), e);
lockService.releaseLockEventDriven(lock, ActionListener.wrap(
lockService.releaseLock(lock, ActionListener.wrap(
r -> {
log.debug("Released threat intel source config lock with id [{}]", lock.getLockId());
listener.onFailure(e);
Expand All @@ -146,7 +144,7 @@ private void retrieveLockAndCreateTIFConfig(SAIndexTIFSourceConfigRequest reques
} catch (Exception e) {
String action = RestRequest.Method.PUT.equals(request.getMethod()) ? "update" : "create";
log.error(String.format("Failed to %s IOCs and threat intel source config", action), e);
lockService.releaseLockEventDriven(lock, ActionListener.wrap(
lockService.releaseLock(lock, ActionListener.wrap(
r -> {
log.debug("Released threat intel source config lock with id [{}]", lock.getLockId());
listener.onFailure(e);
Expand Down
Loading

0 comments on commit 66f1155

Please sign in to comment.