Skip to content

Commit

Permalink
AME-12299 integrated session notifications with a new DJ backed notif…
Browse files Browse the repository at this point in the history
…ication broker
  • Loading branch information
apforrest committed Oct 19, 2016
1 parent 3e9685c commit ba52b27
Show file tree
Hide file tree
Showing 23 changed files with 772 additions and 32 deletions.
4 changes: 4 additions & 0 deletions openam-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@
<groupId>org.forgerock.commons</groupId>
<artifactId>forgerock-bloomfilter-monitoring</artifactId>
</dependency>
<dependency>
<groupId>org.forgerock.openam</groupId>
<artifactId>openam-notifications</artifactId>
</dependency>

<!-- LDAP APIs -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@

package com.iplanet.dpro.session.service;

import static org.forgerock.openam.utils.Time.*;
import static org.forgerock.json.JsonValue.field;
import static org.forgerock.json.JsonValue.json;
import static org.forgerock.json.JsonValue.object;
import static org.forgerock.openam.utils.Time.currentTimeMillis;

import java.net.URL;
import java.util.Map;
Expand All @@ -39,6 +42,11 @@
import javax.inject.Named;
import javax.inject.Singleton;

import org.forgerock.json.JsonValue;
import org.forgerock.openam.notifications.NotificationBroker;
import org.forgerock.openam.notifications.Topic;
import org.forgerock.openam.notifications.NotificationsConfig;
import org.forgerock.openam.session.SessionEvent;
import org.forgerock.util.thread.listener.ShutdownListener;
import org.forgerock.util.thread.listener.ShutdownManager;

Expand Down Expand Up @@ -74,6 +82,8 @@ public class SessionNotificationSender {
private final SessionServerConfig serverConfig;
private final SessionInfoFactory sessionInfoFactory;
private final ThreadPool threadPool;
private final NotificationBroker broker;

/**
* The URL Vector for ALL session events : SESSION_CREATION, IDLE_TIMEOUT,
* MAX_TIMEOUT, LOGOUT, REACTIVATION, DESTROY.
Expand All @@ -85,12 +95,14 @@ public SessionNotificationSender(
final SessionServiceConfig serviceConfig,
final SessionServerConfig serverConfig,
final SessionInfoFactory sessionInfoFactory,
final ShutdownManager shutdownManager) {
final ShutdownManager shutdownManager,
final NotificationBroker broker) {

this.sessionDebug = sessionDebug;
this.serviceConfig = serviceConfig;
this.serverConfig = serverConfig;
this.sessionInfoFactory = sessionInfoFactory;
this.broker = broker;

threadPool = new ThreadPool(THREAD_POOL_NAME, serviceConfig.getNotificationThreadPoolSize(),
serviceConfig.getNotificationThreadPoolThreshold(), true, sessionDebug);
Expand Down Expand Up @@ -119,6 +131,16 @@ public int getNotificationQueueSize() {
*/
public void sendEvent(InternalSession session, int eventType) {
sessionDebug.message("Running sendEvent, type = " + eventType);

if (NotificationsConfig.INSTANCE.isAgentsEnabled()) {
SessionEvent sessionEvent = SessionEvent.valueOf(eventType);
publishSessionNotification(session.getSessionID(), sessionEvent);

for (SessionID sessionId : session.getRestrictedTokens()) {
publishSessionNotification(sessionId, sessionEvent);
}
}

try {
SessionNotificationSenderTask sns = new SessionNotificationSenderTask(session, eventType);
// First send local notification. sendToLocal will return
Expand All @@ -133,6 +155,13 @@ public void sendEvent(InternalSession session, int eventType) {
}
}

private void publishSessionNotification(SessionID sessionId, SessionEvent sessionEvent) {
JsonValue notification = json(object(
field("tokenId", sessionId.toString()),
field("eventType", sessionEvent)));
broker.publish(Topic.of("/agent/session"), notification);
}

/**
* Inner Session Notification Publisher Class Thread.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* The contents of this file are subject to the terms of the Common Development and
* Distribution License (the License). You may not use this file except in compliance with the
* License.
*
* You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
* specific language governing permission and limitations under the License.
*
* When distributing Covered Software, include this CDDL Header Notice in each file and include
* the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
* Header, with the fields enclosed by brackets [] replaced by your own identifying
* information: "Portions copyright [year] [name of copyright owner]".
*
* Copyright 2016 ForgeRock AS.
*/

package org.forgerock.openam.notifications;

import static com.sun.identity.shared.Constants.NOTIFICATIONS_AGENTS_ENABLED;

import com.iplanet.am.util.SystemProperties;
import com.sun.identity.common.configuration.ConfigurationListener;

/**
* Provides notifications configuration.
*
* @since 14.0.0
*/
public enum NotificationsConfig {

INSTANCE;

private volatile boolean agentsEnabled;

NotificationsConfig() {
SystemProperties.observe(new ConfigurationListener() {

@Override
public void notifyChanges() {
agentsEnabled = SystemProperties.getAsBoolean(NOTIFICATIONS_AGENTS_ENABLED, true);
}

}, NOTIFICATIONS_AGENTS_ENABLED);

agentsEnabled = SystemProperties.getAsBoolean(NOTIFICATIONS_AGENTS_ENABLED, true);
}

/**
* Whether notifications for agents is enabled.
*
* @return whether notifications for agents is enabled
*/
public boolean isAgentsEnabled() {
return agentsEnabled;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* The contents of this file are subject to the terms of the Common Development and
* Distribution License (the License). You may not use this file except in compliance with the
* License.
*
* You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
* specific language governing permission and limitations under the License.
*
* When distributing Covered Software, include this CDDL Header Notice in each file and include
* the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
* Header, with the fields enclosed by brackets [] replaced by your own identifying
* information: "Portions copyright [year] [name of copyright owner]".
*
* Copyright 2016 ForgeRock AS.
*/

package org.forgerock.openam.session;

/**
* Represents a session event.
* <p>
* Maps from {@link com.iplanet.dpro.session.SessionEvent} to a session event enum.
*
* @since 14.0.0
*/
public enum SessionEvent {

SESSION_CREATION(com.iplanet.dpro.session.SessionEvent.SESSION_CREATION),

/**
* Session idle time out event
*/
IDLE_TIMEOUT(com.iplanet.dpro.session.SessionEvent.IDLE_TIMEOUT),

/**
* Session maximum time out event
*/
MAX_TIMEOUT(com.iplanet.dpro.session.SessionEvent.MAX_TIMEOUT),

/**
* Session logout event
*/
LOGOUT(com.iplanet.dpro.session.SessionEvent.LOGOUT),

/**
* Session reactivation event
*/
REACTIVATION(com.iplanet.dpro.session.SessionEvent.REACTIVATION),

/**
* Session destroy event
*/
DESTROY(com.iplanet.dpro.session.SessionEvent.DESTROY),

/**
* Session Property changed
*/
PROPERTY_CHANGED(com.iplanet.dpro.session.SessionEvent.PROPERTY_CHANGED),

/**
* Session quota exhausted
*/
QUOTA_EXHAUSTED(com.iplanet.dpro.session.SessionEvent.QUOTA_EXHAUSTED),

/**
* Session property protected against change
*/
PROTECTED_PROPERTY(com.iplanet.dpro.session.SessionEvent.PROTECTED_PROPERTY);

private final int sessionEventId;

SessionEvent(int sessionEventId) {
this.sessionEventId = sessionEventId;
}

/**
* Given the session event Id will return the corresponding session event enum.
*
* @param sessionEventId the session event Id
* @return the corresponding session event enum
* @throws IllegalArgumentException if the session event Id is not valid
*/
public static SessionEvent valueOf(int sessionEventId) {
for (SessionEvent event : values()) {
if (event.sessionEventId == sessionEventId) {
return event;
}
}

throw new IllegalArgumentException("Unknown session event Id " + sessionEventId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
package org.forgerock.openam.cts.utils;

import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.*;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.iplanet.dpro.session.DNOrIPAddressListTokenRestriction;
import com.iplanet.dpro.session.SessionID;
import com.iplanet.dpro.session.TokenRestriction;
Expand All @@ -38,9 +40,11 @@
import org.forgerock.openam.core.guice.DataLayerGuiceModule;
import org.forgerock.openam.cts.TokenTestUtils;
import org.forgerock.openam.cts.api.tokens.Token;
import org.forgerock.openam.notifications.NotificationBroker;
import org.forgerock.openam.shared.guice.SharedGuiceModule;
import org.forgerock.openam.tokens.TokenType;
import org.forgerock.openam.utils.IOUtils;
import org.mockito.Mockito;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand All @@ -52,7 +56,8 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

@GuiceModules({CoreGuiceModule.class, SharedGuiceModule.class, DataLayerGuiceModule.class, JSONSerialisationTest.DummyAuditConfigModule.class})
@GuiceModules({CoreGuiceModule.class, SharedGuiceModule.class, DataLayerGuiceModule.class,
JSONSerialisationTest.DummyConfigModule.class})
public class JSONSerialisationTest extends GuiceTestCase {

private JSONSerialisation serialization;
Expand Down Expand Up @@ -183,12 +188,13 @@ private static void checkMapType(InternalSession is, String fieldName) throws Ex
assertThat(obj).isInstanceOf(ConcurrentHashMap.class);
}

public static class DummyAuditConfigModule extends AbstractModule {
public static class DummyConfigModule extends AbstractModule {
@Override
protected void configure() {
bind(AuditServiceConfigurationProvider.class).to(DummyAuditServiceConfigurationProvider.class);
bind(AuditServiceProvider.class).to(DummyAuditServiceProvider.class);
bind(AuditEventPublisher.class).to(AuditEventPublisherImpl.class);
bind(NotificationBroker.class).toInstance(mock(NotificationBroker.class));
}
}

Expand Down
4 changes: 4 additions & 0 deletions openam-notifications-integration/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
<groupId>org.forgerock.openam</groupId>
<artifactId>openam-core</artifactId>
</dependency>
<dependency>
<groupId>org.forgerock.openam</groupId>
<artifactId>openam-shared</artifactId>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,21 @@
import java.util.concurrent.ExecutorService;

import javax.inject.Inject;
import javax.inject.Named;

import org.forgerock.guice.core.GuiceModule;
import org.forgerock.openam.cts.CTSPersistentStore;
import org.forgerock.openam.notifications.NotificationBroker;
import org.forgerock.openam.notifications.brokers.SingleQueueNotificationBroker;
import org.forgerock.openam.notifications.brokers.InMemoryNotificationBroker;
import org.forgerock.openam.notifications.integration.brokers.CTSNotificationBroker;
import org.forgerock.util.thread.ExecutorServiceFactory;

import com.google.inject.Exposed;
import com.google.inject.PrivateModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.name.Names;
import com.iplanet.am.util.SystemProperties;

/**
* Guice bindings for notifications.
Expand All @@ -40,9 +45,16 @@ public class NotificationsGuiceModule extends PrivateModule {

@Override
protected void configure() {
bind(NotificationBroker.class).to(SingleQueueNotificationBroker.class).in(Singleton.class);
bindConstant().annotatedWith(Names.named("queueTimeout")).to(500L);
bindConstant().annotatedWith(Names.named("queueSize")).to(10000);
bind(NotificationBroker.class)
.annotatedWith(Names.named("localBroker"))
.to(InMemoryNotificationBroker.class)
.in(Singleton.class);
bindConstant().annotatedWith(Names.named("queueTimeoutMilliseconds"))
.to(SystemProperties.getAsLong("org.forgerock.openam.notifications.local.queueTimeoutMilliseconds", 500L));
bindConstant().annotatedWith(Names.named("queueSize"))
.to(SystemProperties.getAsInt("org.forgerock.openam.notifications.local.queueSize", 10000));
bindConstant().annotatedWith(Names.named("tokenExpirySeconds"))
.to(SystemProperties.getAsLong("org.forgerock.openam.notifications.cts.tokenExpirySeconds", 600L));

expose(NotificationBroker.class);
}
Expand All @@ -53,4 +65,14 @@ ExecutorService executorService(ExecutorServiceFactory factory) {
return factory.createFixedThreadPool(1);
}

@Provides
@Exposed
@Inject
@Singleton
NotificationBroker notificationBroker(CTSPersistentStore store,
@Named("localBroker") NotificationBroker broker,
@Named("tokenExpirySeconds") long tokenExpirySeconds) {
return new CTSNotificationBroker(store, broker, tokenExpirySeconds);
}

}
Loading

0 comments on commit ba52b27

Please sign in to comment.