Skip to content

Commit

Permalink
AME-12693 Name ScheduledExecutorService threads
Browse files Browse the repository at this point in the history
  • Loading branch information
craigmcdonnell committed Nov 24, 2016
1 parent 8106332 commit bd3e03e
Show file tree
Hide file tree
Showing 28 changed files with 388 additions and 187 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* The contents of this file are subject to the terms of the Common Development and
* Distribution License (the License). You may not use this file except in compliance with the
* License.
*
* You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
* specific language governing permission and limitations under the License.
*
* When distributing Covered Software, include this CDDL Header Notice in each file and include
* the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
* Header, with the fields enclosed by brackets [] replaced by your own identifying
* information: "Portions copyright [year] [name of copyright owner]".
*
* Copyright 2015-2016 ForgeRock AS.
*/
package org.forgerock.openam.audit.context;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import org.forgerock.util.thread.ExecutorServiceFactory;

/**
* Responsible for filtering the API of {@ExecutorServiceFactory} to exclude any
* methods that do not offer a means of setting thread names and to ensure that
* new methods added to {@ExecutorServiceFactory} are always called via
* {@link AuditRequestContextPropagatingExecutorServiceFactory}.
*
* @see ExecutorServiceFactory
* @see AuditRequestContextPropagatingExecutorServiceFactory
*
* @since 14.0.0
*/
public interface AMExecutorServiceFactory {

/**
* Generates a ScheduledExecutorService which has been pre-registered with the
* ShutdownManager.
*
* @see java.util.concurrent.Executors#newScheduledThreadPool(int)
*
* @param poolSize The size of the ScheduledExecutorService thread pool.
* @param threadNamePrefix The thread name prefix to use when generating new threads.
* @return A non null ScheduledExecutorService
*/
ScheduledExecutorService createScheduledService(int poolSize, String threadNamePrefix);

/**
* Creates a fixed size Thread Pool ExecutorService which has been pre-registered with
* the {@link org.forgerock.util.thread.listener.ShutdownManager}.
*
* @param pool The size of the pool to create.
* @param factory The {@link java.util.concurrent.ThreadFactory} used to generate new threads.
* @return Non null.
*/
ExecutorService createFixedThreadPool(int pool, ThreadFactory factory);

/**
* Create a fixed size Thread Pool ExecutorService using the provided name as the prefix
* of the thread names.
*
* @see #createFixedThreadPool(int, java.util.concurrent.ThreadFactory)
*
* @param pool Size of the fixed pool.
* @param threadNamePrefix The thread name prefix to use when generating new threads.
* @return Non null.
*/
ExecutorService createFixedThreadPool(int pool, String threadNamePrefix);

/**
* Generates a Cached Thread Pool ExecutorService which has been pre-registered with the
* ShutdownManager. The provided ThreadFactory is used by the service when creating Threads.
*
* @see java.util.concurrent.Executors#newCachedThreadPool(java.util.concurrent.ThreadFactory)
*
* @param factory The ThreadFactory that will be used when generating threads. May not be null.
* @return A non null ExecutorService.
*/
ExecutorService createCachedThreadPool(ThreadFactory factory);

/**
* Generates a Cached Thread Pool ExecutorService using the provided name as a prefix
* of the thread names.
*
* @see #createCachedThreadPool(java.util.concurrent.ThreadFactory)
*
* @param threadNamePrefix The thread name prefix to use when generating new threads.
* @return Non null.
*/
ExecutorService createCachedThreadPool(String threadNamePrefix);

/**
* Generates a ThreadPoolExecutor with the provided values, and registers that executor as listening for
* shutdown messages.
*
* @param coreSize the number of threads to keep in the pool, even if they are idle
* @param maxSize Max number of threads in the pool
* @param idleTimeout When the number of threads is greater than core, maximum time that excess idle
* threads will wait before terminating
* @param timeoutTimeunit The time unit for the idleTimeout argument
* @param runnables Queue of threads to be run
* @param threadNamePrefix The thread name prefix to use when generating new threads.
* @return a configured ExecutorService, registered to listen to shutdown messages.
*/
ExecutorService createThreadPool(int coreSize, int maxSize, long idleTimeout,
TimeUnit timeoutTimeunit, BlockingQueue<Runnable> runnables, String threadNamePrefix);

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,71 +16,64 @@

package org.forgerock.openam.audit.context;

import org.forgerock.util.thread.ExecutorServiceFactory;
import org.forgerock.util.thread.listener.ShutdownManager;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import org.forgerock.util.thread.ExecutorServiceFactory;
import org.forgerock.util.thread.listener.ShutdownManager;

/**
* ExecutorServiceFactory decorator that ensures all ExecutorServices propagate {@link AuditRequestContext} from
* task publishing thread to task consuming thread.
* {@link ExecutorServiceFactory} decorator that ensures all ExecutorServices propagate
* {@link AuditRequestContext} from task publishing thread to task consuming thread.
*
* @since 13.0.0
*/
public class AuditRequestContextPropagatingExecutorServiceFactory extends ExecutorServiceFactory {
public class AuditRequestContextPropagatingExecutorServiceFactory implements AMExecutorServiceFactory {

private final AMExecutorServiceFactory delegate;

/**
* Constructs a new {@code AuditRequestContextPropagatingExecutorServiceFactory}.
*
* @param shutdownManager The {@code ShutdownManager}.
*/
public AuditRequestContextPropagatingExecutorServiceFactory(ShutdownManager shutdownManager) {
super(shutdownManager);
delegate = new ExtendedExecutorServiceFactory(shutdownManager);
}

@Override
public ScheduledExecutorService createScheduledService(int poolSize) {
return decorate(super.createScheduledService(poolSize));
public ScheduledExecutorService createScheduledService(int poolSize, String threadNamePrefix) {
return decorate(delegate.createScheduledService(poolSize, threadNamePrefix));
}

@Override
public ExecutorService createFixedThreadPool(int pool, ThreadFactory factory) {
return decorate(super.createFixedThreadPool(pool, factory));
return decorate(delegate.createFixedThreadPool(pool, factory));
}

@Override
public ExecutorService createFixedThreadPool(int pool, String threadName) {
return decorate(super.createFixedThreadPool(pool, threadName));
}

@Override
public ExecutorService createFixedThreadPool(int pool) {
return decorate(super.createFixedThreadPool(pool));
return decorate(delegate.createFixedThreadPool(pool, threadName));
}

@Override
public ExecutorService createCachedThreadPool(ThreadFactory factory) {
return decorate(super.createCachedThreadPool(factory));
return decorate(delegate.createCachedThreadPool(factory));
}

@Override
public ExecutorService createCachedThreadPool(String threadName) {
return decorate(super.createCachedThreadPool(threadName));
}

@Override
public ExecutorService createCachedThreadPool() {
return decorate(super.createCachedThreadPool());
return decorate(delegate.createCachedThreadPool(threadName));
}

@Override
public ExecutorService createThreadPool(int coreSize, int maxSize, long idleTimeout, TimeUnit timeoutTimeunit,
BlockingQueue<Runnable> runnables) {
return decorate(super.createThreadPool(coreSize, maxSize, idleTimeout, timeoutTimeunit, runnables));
public ExecutorService createThreadPool(int coreSize, int maxSize, long idleTimeout,
TimeUnit timeoutTimeunit, BlockingQueue<Runnable> runnables, String threadNamePrefix) {
return decorate(delegate.createThreadPool(
coreSize, maxSize, idleTimeout, timeoutTimeunit, runnables, threadNamePrefix));
}

private ExecutorService decorate(ExecutorService delegate) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* The contents of this file are subject to the terms of the Common Development and
* Distribution License (the License). You may not use this file except in compliance with the
* License.
*
* You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
* specific language governing permission and limitations under the License.
*
* When distributing Covered Software, include this CDDL Header Notice in each file and include
* the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
* Header, with the fields enclosed by brackets [] replaced by your own identifying
* information: "Portions copyright [year] [name of copyright owner]".
*
* Copyright 2015-2016 ForgeRock AS.
*/
package org.forgerock.openam.audit.context;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.forgerock.util.Reject;
import org.forgerock.util.thread.ExecutorServiceFactory;
import org.forgerock.util.thread.listener.ShutdownListener;
import org.forgerock.util.thread.listener.ShutdownManager;

/**
* Subclass of {@link ExecutorServiceFactory} which allows {@link ScheduledExecutorService} threads to be named.
* <p>
* This is a temporary class which is only needed as Commons has reached feature freeze. When changes to Commons
* are permitted, the single public method in this class should be rolled up into {@link ExecutorServiceFactory}
* (COMMONS-137) and this subclass should be deleted (AME-12776).
*/
class ExtendedExecutorServiceFactory extends ExecutorServiceFactory implements AMExecutorServiceFactory {

private final ShutdownManager shutdownManager;

/**
* Create an instance of the factory.
*
* @param shutdownManager Required to ensure each ExecutorService will be shutdown.
*/
ExtendedExecutorServiceFactory(ShutdownManager shutdownManager) {
super(shutdownManager);
this.shutdownManager = shutdownManager;
}

@Override
public ScheduledExecutorService createScheduledService(int poolSize, String threadNamePrefix) {
final ScheduledExecutorService service =
Executors.newScheduledThreadPool(poolSize, new NamedThreadFactory(threadNamePrefix));
registerShutdown(service);
return service;
}

@Override
public ExecutorService createThreadPool(int coreSize, int maxSize, long idleTimeout,
TimeUnit timeoutTimeunit, BlockingQueue<Runnable> runnables, String threadNamePrefix) {
Reject.ifTrue(coreSize < 0);
Reject.ifTrue(maxSize < coreSize || maxSize <= 0);
Reject.ifTrue(idleTimeout < 0);

ExecutorService service = new ThreadPoolExecutor(coreSize, maxSize, idleTimeout, timeoutTimeunit,
runnables, new NamedThreadFactory(threadNamePrefix));
registerShutdown(service);
return service;
}

/**
* Registers a listener to trigger shutdown of the ExecutorService.
* @param service Non null ExecutorService to register.
*/
private void registerShutdown(final ExecutorService service) {
shutdownManager.addShutdownListener(
new ShutdownListener() {
public void shutdown() {
service.shutdownNow();
}
});
}

/**
* Used to generate threads with a provided name. Each new thread will
* have its generated number appended to the end of it, in the form -X, where
* X is incremented once for each thread created.
*/
private class NamedThreadFactory implements ThreadFactory {

private final AtomicInteger count = new AtomicInteger(0);
private final String name;

public NamedThreadFactory(String name) {
this.name = name;
}

public Thread newThread(Runnable r) {
return new Thread(r, name + "-" + count.getAndIncrement());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.sun.identity.sm.ServiceManager;
import org.forgerock.json.JsonValue;
import org.forgerock.json.JsonValueException;
import org.forgerock.openam.audit.context.AMExecutorServiceFactory;
import org.forgerock.openam.utils.IOUtils;
import org.forgerock.openam.utils.JsonValueBuilder;
import org.forgerock.openam.utils.file.ZipUtils;
Expand Down Expand Up @@ -103,8 +104,8 @@ public class DefaultDebugRecorder implements DebugRecorder {
* Initialize the RecordDebugController.
*/
@Inject
public DefaultDebugRecorder(ExecutorServiceFactory executorServiceFactory) {
scheduledExecutorService = executorServiceFactory.createScheduledService(2);
public DefaultDebugRecorder(AMExecutorServiceFactory executorServiceFactory) {
scheduledExecutorService = executorServiceFactory.createScheduledService(2, "DefaultDebugRecorder");
recordReport = new RecordReport();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,34 @@
* Header, with the fields enclosed by brackets [] replaced by your own identifying
* information: "Portions copyright [year] [name of copyright owner]".
*
* Copyright 2015 ForgeRock AS.
* Copyright 2015-2016 ForgeRock AS.
*/
package org.forgerock.openam.core.rest.record;


import com.sun.identity.shared.configuration.SystemPropertiesManager;
import com.sun.identity.shared.debug.Debug;
import com.sun.identity.shared.debug.DebugConstants;
import com.sun.identity.shared.debug.DebugLevel;
import com.sun.identity.shared.debug.file.impl.InvalidDebugConfigurationException;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

import org.forgerock.json.JsonValue;
import org.forgerock.openam.audit.context.AMExecutorServiceFactory;
import org.forgerock.openam.audit.context.AuditRequestContextPropagatingExecutorServiceFactory;
import org.forgerock.openam.utils.IOUtils;
import org.forgerock.openam.utils.JsonValueBuilder;
import org.forgerock.util.thread.ExecutorServiceFactory;
import org.forgerock.util.thread.listener.ShutdownManager;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import com.sun.identity.shared.configuration.SystemPropertiesManager;
import com.sun.identity.shared.debug.Debug;
import com.sun.identity.shared.debug.DebugConstants;
import com.sun.identity.shared.debug.DebugLevel;
import com.sun.identity.shared.debug.file.impl.InvalidDebugConfigurationException;

public class DefaultDebugRecorderTest extends DebugTestTemplate {
protected static final String DEBUG_CONFIG_FOR_TEST = "/record/debugconfig.properties";
Expand All @@ -48,8 +50,8 @@ public void setUp() throws Exception {
super.setUp();
initializeProvider(DEBUG_CONFIG_FOR_TEST);
SystemPropertiesManager.initializeProperties(DebugConstants.CONFIG_DEBUG_LEVEL, DebugLevel.MESSAGE.getName());
recordDebugController = new DefaultDebugRecorderForTest(new ExecutorServiceFactory(Mockito.mock
(org.forgerock.util.thread.listener.ShutdownManager.class)));
recordDebugController = new DefaultDebugRecorderForTest(
new AuditRequestContextPropagatingExecutorServiceFactory(Mockito.mock(ShutdownManager.class)));
}

@Test
Expand Down Expand Up @@ -343,7 +345,7 @@ public static class DefaultDebugRecorderForTest extends DefaultDebugRecorder {
*
* @param executorServiceFactory
*/
public DefaultDebugRecorderForTest(ExecutorServiceFactory executorServiceFactory) {
public DefaultDebugRecorderForTest(AMExecutorServiceFactory executorServiceFactory) {
super(executorServiceFactory);
}

Expand Down
Loading

0 comments on commit bd3e03e

Please sign in to comment.