Skip to content

Commit

Permalink
Use instance of LockService instantiated in JobScheduler through Guice (
Browse files Browse the repository at this point in the history
#677)

* WIP to show Geospatial plugin using LockService instance from JS

Signed-off-by: Craig Perkins <[email protected]>

* Use instance of LockService from Guice that is instantiated by Job Scheduler

Signed-off-by: Craig Perkins <[email protected]>

* Fix failing tests

Signed-off-by: Craig Perkins <[email protected]>

* Add to CHANGELOG

Signed-off-by: Craig Perkins <[email protected]>

* Address comments

Signed-off-by: Craig Perkins <[email protected]>

* Add checks to see if initialized

Signed-off-by: Craig Perkins <[email protected]>

* Remove constructor that accepts client

Signed-off-by: Craig Perkins <[email protected]>

* Switch to package-private

Signed-off-by: Craig Perkins <[email protected]>

* package-private

Signed-off-by: Craig Perkins <[email protected]>

* public

Signed-off-by: Craig Perkins <[email protected]>

---------

Signed-off-by: Craig Perkins <[email protected]>
(cherry picked from commit 847be33)
  • Loading branch information
cwperks authored and github-actions[bot] committed Jan 21, 2025
1 parent a452b88 commit 013554e
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ See the [CONTRIBUTING guide](./CONTRIBUTING.md#Changelog) for instructions on ho
### Documentation
### Maintenance
### Refactoring
- Use instance of LockService instantiated in JobScheduler through Guice ([#677](https://github.com/opensearch-project/geospatial/pull/677))
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import java.util.concurrent.atomic.AtomicReference;

import org.opensearch.OpenSearchException;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.core.action.ActionListener;
import org.opensearch.jobscheduler.spi.LockModel;
Expand All @@ -30,17 +29,19 @@ public class Ip2GeoLockService {
public static final long LOCK_DURATION_IN_SECONDS = 300l;
public static final long RENEW_AFTER_IN_SECONDS = 120l;
private final ClusterService clusterService;
private final LockService lockService;
private LockService lockService;

/**
* Constructor
*
* @param clusterService the cluster service
* @param client the client
*/
public Ip2GeoLockService(final ClusterService clusterService, final Client client) {
public Ip2GeoLockService(final ClusterService clusterService) {
this.clusterService = clusterService;
this.lockService = new LockService(client, clusterService);
}

public void initialize(final LockService lockService) {
this.lockService = lockService;
}

/**
Expand All @@ -54,6 +55,9 @@ public Ip2GeoLockService(final ClusterService clusterService, final Client clien
* @param listener the listener
*/
public void acquireLock(final String datasourceName, final Long lockDurationSeconds, final ActionListener<LockModel> listener) {
if (lockService == null) {
throw new OpenSearchException("Ip2GeoLockService is not initialized");
}
lockService.acquireLockWithId(JOB_INDEX_NAME, lockDurationSeconds, datasourceName, listener);
}

Expand All @@ -65,6 +69,9 @@ public void acquireLock(final String datasourceName, final Long lockDurationSeco
* @return lock model
*/
public Optional<LockModel> acquireLock(final String datasourceName, final Long lockDurationSeconds) {
if (lockService == null) {
throw new OpenSearchException("Ip2GeoLockService is not initialized");
}
AtomicReference<LockModel> lockReference = new AtomicReference();
CountDownLatch countDownLatch = new CountDownLatch(1);
lockService.acquireLockWithId(JOB_INDEX_NAME, lockDurationSeconds, datasourceName, new ActionListener<>() {
Expand Down Expand Up @@ -95,6 +102,9 @@ public void onFailure(final Exception e) {
* @param lockModel the lock model
*/
public void releaseLock(final LockModel lockModel) {
if (lockService == null) {
throw new OpenSearchException("Ip2GeoLockService is not initialized");
}
lockService.release(
lockModel,
ActionListener.wrap(released -> {}, exception -> log.error("Failed to release the lock", exception))
Expand All @@ -108,6 +118,9 @@ public void releaseLock(final LockModel lockModel) {
* @return renewed lock if renew succeed and null otherwise
*/
public LockModel renewLock(final LockModel lockModel) {
if (lockService == null) {
throw new OpenSearchException("Ip2GeoLockService is not initialized");
}
AtomicReference<LockModel> lockReference = new AtomicReference();
CountDownLatch countDownLatch = new CountDownLatch(1);
lockService.renewLock(lockModel, new ActionListener<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
import org.opensearch.action.ActionRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.lifecycle.Lifecycle;
import org.opensearch.common.lifecycle.LifecycleComponent;
import org.opensearch.common.lifecycle.LifecycleListener;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
Expand Down Expand Up @@ -75,7 +79,9 @@
import org.opensearch.index.mapper.Mapper;
import org.opensearch.indices.SystemIndexDescriptor;
import org.opensearch.ingest.Processor;
import org.opensearch.jobscheduler.spi.utils.LockService;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.ClusterPlugin;
import org.opensearch.plugins.IngestPlugin;
import org.opensearch.plugins.MapperPlugin;
import org.opensearch.plugins.Plugin;
Expand All @@ -96,11 +102,22 @@
* to interact with Cluster.
*/
@Log4j2
public class GeospatialPlugin extends Plugin implements IngestPlugin, ActionPlugin, MapperPlugin, SearchPlugin, SystemIndexPlugin {
public class GeospatialPlugin extends Plugin
implements
IngestPlugin,
ActionPlugin,
MapperPlugin,
SearchPlugin,
SystemIndexPlugin,
ClusterPlugin {
private Ip2GeoCachedDao ip2GeoCachedDao;
private DatasourceDao datasourceDao;
private GeoIpDataDao geoIpDataDao;
private URLDenyListChecker urlDenyListChecker;
private ClusterService clusterService;
private Ip2GeoLockService ip2GeoLockService;
private Ip2GeoExecutor ip2GeoExecutor;
private DatasourceUpdateService datasourceUpdateService;

@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
Expand Down Expand Up @@ -129,7 +146,10 @@ public void onIndexModule(IndexModule indexModule) {

@Override
public Collection<Class<? extends LifecycleComponent>> getGuiceServiceClasses() {
return List.of(Ip2GeoListener.class);
final List<Class<? extends LifecycleComponent>> services = new ArrayList<>(2);
services.add(Ip2GeoListener.class);
services.add(GuiceHolder.class);
return services;
}

@Override
Expand Down Expand Up @@ -158,20 +178,10 @@ public Collection<Object> createComponents(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
DatasourceUpdateService datasourceUpdateService = new DatasourceUpdateService(
clusterService,
datasourceDao,
geoIpDataDao,
urlDenyListChecker
);
Ip2GeoExecutor ip2GeoExecutor = new Ip2GeoExecutor(threadPool);
Ip2GeoLockService ip2GeoLockService = new Ip2GeoLockService(clusterService, client);
/**
* We don't need to return datasource runner because it is used only by job scheduler and job scheduler
* does not use DI but it calls DatasourceExtension#getJobRunner to get DatasourceRunner instance.
*/
DatasourceRunner.getJobRunnerInstance()
.initialize(clusterService, datasourceUpdateService, ip2GeoExecutor, datasourceDao, ip2GeoLockService);
this.clusterService = clusterService;
this.datasourceUpdateService = new DatasourceUpdateService(clusterService, datasourceDao, geoIpDataDao, urlDenyListChecker);
this.ip2GeoExecutor = new Ip2GeoExecutor(threadPool);
this.ip2GeoLockService = new Ip2GeoLockService(clusterService);

return List.of(
UploadStats.getInstance(),
Expand Down Expand Up @@ -265,4 +275,48 @@ public List<AggregationSpec> getAggregations() {

return List.of(geoHexGridSpec);
}

@Override
public void onNodeStarted(DiscoveryNode localNode) {
LockService lockService = GuiceHolder.getLockService();
ip2GeoLockService.initialize(lockService);

DatasourceRunner.getJobRunnerInstance()
.initialize(this.clusterService, this.datasourceUpdateService, this.ip2GeoExecutor, this.datasourceDao, this.ip2GeoLockService);
}

public static class GuiceHolder implements LifecycleComponent {

private static LockService lockService;

@Inject
public GuiceHolder(final LockService lockService) {
GuiceHolder.lockService = lockService;
}

static LockService getLockService() {
return lockService;
}

@Override
public void close() {}

@Override
public Lifecycle.State lifecycleState() {
return null;
}

@Override
public void addLifecycleListener(LifecycleListener listener) {}

@Override
public void removeLifecycleListener(LifecycleListener listener) {}

@Override
public void start() {}

@Override
public void stop() {}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.Environment;
import org.opensearch.geospatial.plugin.GeospatialPlugin;
import org.opensearch.node.MockNode;
import org.opensearch.node.Node;
import org.opensearch.plugins.Plugin;
Expand All @@ -49,7 +48,7 @@ private List<Class<? extends Plugin>> basePlugins() {
List<Class<? extends Plugin>> plugins = new ArrayList<>();
plugins.add(getTestTransportPlugin());
plugins.add(MockHttpTransport.TestPlugin.class);
plugins.add(GeospatialPlugin.class);
plugins.add(TestGeospatialPlugin.class);
return plugins;
}

Expand Down
29 changes: 29 additions & 0 deletions src/test/java/org/opensearch/geospatial/TestGeospatialPlugin.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import org.opensearch.common.lifecycle.LifecycleComponent;
import org.opensearch.geospatial.ip2geo.listener.Ip2GeoListener;
import org.opensearch.geospatial.plugin.GeospatialPlugin;

/**
* This class is needed for ClusterSettingsHelper.createMockNode to instantiate a test instance of the
* GeospatialPlugin without the JobSchedulerPlugin installed. Without overriding this class, the
* GeospatialPlugin would try to Inject JobScheduler's LockService in the GuiceHolder which will
* fail because JobScheduler is not installed
*/
public class TestGeospatialPlugin extends GeospatialPlugin {
@Override
public Collection<Class<? extends LifecycleComponent>> getGuiceServiceClasses() {
final List<Class<? extends LifecycleComponent>> services = new ArrayList<>(1);
services.add(Ip2GeoListener.class);
return services;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,19 @@
import org.opensearch.geospatial.GeospatialTestHelper;
import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;
import org.opensearch.jobscheduler.spi.LockModel;
import org.opensearch.jobscheduler.spi.utils.LockService;

public class Ip2GeoLockServiceTests extends Ip2GeoTestCase {
private Ip2GeoLockService ip2GeoLockService;
private Ip2GeoLockService noOpsLockService;

@Before
public void init() {
ip2GeoLockService = new Ip2GeoLockService(clusterService, verifyingClient);
noOpsLockService = new Ip2GeoLockService(clusterService, client);
ip2GeoLockService = new Ip2GeoLockService(clusterService);
noOpsLockService = new Ip2GeoLockService(clusterService);
// TODO Remove direct instantiation and offer a TestLockService class to plugins
ip2GeoLockService.initialize(new LockService(verifyingClient, clusterService));
noOpsLockService.initialize(new LockService(client, clusterService));
}

public void testAcquireLock_whenValidInput_thenSucceed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void testCreateComponents() {
}

public void testGetGuiceServiceClasses() {
Collection<Class<? extends LifecycleComponent>> classes = List.of(Ip2GeoListener.class);
Collection<Class<? extends LifecycleComponent>> classes = List.of(Ip2GeoListener.class, GeospatialPlugin.GuiceHolder.class);
assertEquals(classes, plugin.getGuiceServiceClasses());
}

Expand Down

0 comments on commit 013554e

Please sign in to comment.