Skip to content

Commit

Permalink
CASSANDRA shared session to cluster (OpenIdentityPlatform#707)
Browse files Browse the repository at this point in the history
  • Loading branch information
vharseko authored Jan 17, 2024
1 parent 4d8bc81 commit 645500f
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import org.forgerock.util.promise.PromiseImpl;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.sun.identity.shared.debug.Debug;

/**
Expand All @@ -60,7 +58,7 @@ public class ConnectionFactoryProvider implements org.forgerock.openam.sm.datala
// Injected
private final TimeoutConfig timeoutConfig;
private final ConnectionConfigFactory configFactory;
private final DataLayerConfiguration dataLayerConfiguration;
// private final DataLayerConfiguration dataLayerConfiguration;

private final Debug debug;
private final ConnectionType connectionType;
Expand All @@ -85,7 +83,7 @@ public ConnectionFactoryProvider(ConnectionType connectionType, ConnectionConfig
this.timeoutConfig = timeoutConfig;
this.debug = debug;
this.connectionType = connectionType;
this.dataLayerConfiguration=dataLayerConfiguration;
// this.dataLayerConfiguration=dataLayerConfiguration;
}

static ConnectionFactory connectionFactory=null;
Expand All @@ -104,15 +102,14 @@ public org.forgerock.openam.sm.datalayer.api.ConnectionFactory<CqlSession> creat

debug("Creating Embedded Factory:\nURL: {0}\nMax Connections: {1}\nHeartbeat: {2}\nOperation Timeout: {3}", config.getLDAPURLs(), config.getMaxConnections(), config.getLdapHeartbeat(), timeout);

final String keyspace=dataLayerConfiguration.getKeySpace();
// final String keyspace=dataLayerConfiguration.getKeySpace();

// final String username=config.getBindDN();
// final String password=new String(config.getBindPassword());

CqlSessionBuilder builder=CqlSession.builder()
.withApplicationName("OpenAM CTS: "+keyspace)
.withConfigLoader(DriverConfigLoader.fromDefaults(Repo.class.getClassLoader()))
.withKeyspace(keyspace);
// CqlSessionBuilder builder=CqlSession.builder()
// .withApplicationName("OpenAM CTS: "+keyspace)
// .withConfigLoader(DriverConfigLoader.fromDefaults(Repo.class.getClassLoader()));
// if (StringUtils.isNotBlank(username)&&StringUtils.isNotBlank(password)) {
// builder=builder.withAuthCredentials(username, password);
// }
Expand All @@ -125,7 +122,7 @@ public org.forgerock.openam.sm.datalayer.api.ConnectionFactory<CqlSession> creat
// }
// }
// }
connectionFactory=new ConnectionFactory(builder);
connectionFactory=new ConnectionFactory();
}
}
}
Expand All @@ -138,10 +135,8 @@ private void debug(String format, Object... args) {
}

public static class ConnectionFactory implements org.forgerock.openam.sm.datalayer.api.ConnectionFactory<CqlSession> {
final CqlSessionBuilder builder;

public ConnectionFactory(CqlSessionBuilder cluster) {
this.builder = cluster;

public ConnectionFactory() {
}

@Override
Expand All @@ -161,7 +156,7 @@ public CqlSession create() throws DataLayerException {
if (session==null) {
synchronized (this.getClass()) {
if (session==null) {
session=builder.build();
session=Cluster.getSession();
}
}
}
Expand All @@ -171,7 +166,7 @@ public CqlSession create() throws DataLayerException {
@Override
public void close() {
if (session!=null && !session.isClosed()) {
session.close();
session=null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,21 +83,21 @@ public TokenStorageAdapter(DataLayerConfiguration dataLayerConfiguration,Connect

PreparedStatement get_statement_read() throws DataLayerException {
if (static_statement_read==null) {
static_statement_read=getSession().prepare("select * from "+cfg.getTableName()+" where "+CoreTokenField.TOKEN_ID.toString()+"=:coreTokenId limit 1");
static_statement_read=getSession().prepare("select * from \""+cfg.getKeySpace()+"\".\""+cfg.getTableName()+"\" where "+CoreTokenField.TOKEN_ID.toString()+"=:coreTokenId limit 1");
}
return static_statement_read;
}

PreparedStatement get_statement_delete() throws DataLayerException {
if (static_statement_delete==null) {
static_statement_delete=getSession().prepare("delete from "+cfg.getTableName()+" where "+CoreTokenField.TOKEN_ID.toString()+"=:coreTokenId");
static_statement_delete=getSession().prepare("delete from \""+cfg.getKeySpace()+"\".\""+cfg.getTableName()+"\" where "+CoreTokenField.TOKEN_ID.toString()+"=:coreTokenId");
}
return static_statement_delete;
}

PreparedStatement get_statement_update() throws DataLayerException {
if (static_statement_update==null) {
static_statement_update=getSession().prepare("update "+cfg.getTableName()+" using ttl :ttl set coreTokenDate01=:coreTokenDate01,coreTokenDate02=:coreTokenDate02,coreTokenDate03=:coreTokenDate03,coreTokenDate04=:coreTokenDate04,coreTokenDate05=:coreTokenDate05,coreTokenExpirationDate=:coreTokenExpirationDate,coreTokenInteger01=:coreTokenInteger01,coreTokenInteger02=:coreTokenInteger02,coreTokenInteger03=:coreTokenInteger03,coreTokenInteger04=:coreTokenInteger04,coreTokenInteger05=:coreTokenInteger05,coreTokenInteger06=:coreTokenInteger06,coreTokenInteger07=:coreTokenInteger07,coreTokenInteger08=:coreTokenInteger08,coreTokenInteger09=:coreTokenInteger09,coreTokenInteger10=:coreTokenInteger10,coreTokenObject=:coreTokenObject,coreTokenString01=:coreTokenString01,coreTokenString02=:coreTokenString02,coreTokenString03=:coreTokenString03,coreTokenString04=:coreTokenString04,coreTokenString05=:coreTokenString05,coreTokenString06=:coreTokenString06,coreTokenString07=:coreTokenString07,coreTokenString08=:coreTokenString08,coreTokenString09=:coreTokenString09,coreTokenString10=:coreTokenString10,coreTokenString11=:coreTokenString11,coreTokenString12=:coreTokenString12,coreTokenString13=:coreTokenString13,coreTokenString14=:coreTokenString14,coreTokenString15=:coreTokenString15,coreTokenMultiString01=:coreTokenMultiString01,coreTokenMultiString02=:coreTokenMultiString02,coreTokenMultiString03=:coreTokenMultiString03,coreTokenType=:coreTokenType,coreTokenUserId=:coreTokenUserId,etag=:etag,createTimestamp=:createTimestamp where coreTokenId=:coreTokenId");
static_statement_update=getSession().prepare("update \""+cfg.getKeySpace()+"\".\""+cfg.getTableName()+"\" using ttl :ttl set coreTokenDate01=:coreTokenDate01,coreTokenDate02=:coreTokenDate02,coreTokenDate03=:coreTokenDate03,coreTokenDate04=:coreTokenDate04,coreTokenDate05=:coreTokenDate05,coreTokenExpirationDate=:coreTokenExpirationDate,coreTokenInteger01=:coreTokenInteger01,coreTokenInteger02=:coreTokenInteger02,coreTokenInteger03=:coreTokenInteger03,coreTokenInteger04=:coreTokenInteger04,coreTokenInteger05=:coreTokenInteger05,coreTokenInteger06=:coreTokenInteger06,coreTokenInteger07=:coreTokenInteger07,coreTokenInteger08=:coreTokenInteger08,coreTokenInteger09=:coreTokenInteger09,coreTokenInteger10=:coreTokenInteger10,coreTokenObject=:coreTokenObject,coreTokenString01=:coreTokenString01,coreTokenString02=:coreTokenString02,coreTokenString03=:coreTokenString03,coreTokenString04=:coreTokenString04,coreTokenString05=:coreTokenString05,coreTokenString06=:coreTokenString06,coreTokenString07=:coreTokenString07,coreTokenString08=:coreTokenString08,coreTokenString09=:coreTokenString09,coreTokenString10=:coreTokenString10,coreTokenString11=:coreTokenString11,coreTokenString12=:coreTokenString12,coreTokenString13=:coreTokenString13,coreTokenString14=:coreTokenString14,coreTokenString15=:coreTokenString15,coreTokenMultiString01=:coreTokenMultiString01,coreTokenMultiString02=:coreTokenMultiString02,coreTokenMultiString03=:coreTokenMultiString03,coreTokenType=:coreTokenType,coreTokenUserId=:coreTokenUserId,etag=:etag,createTimestamp=:createTimestamp where coreTokenId=:coreTokenId");
}
return static_statement_update;
}
Expand Down Expand Up @@ -225,7 +225,7 @@ public Collection<Token> query(TokenFilter query) throws DataLayerException {
final Collection<Token> res = new ArrayList<Token>();
try {
final Filter filter=query.getQuery().accept(new org.openidentityplatform.openam.cassandra.QueryFilterVisitor(),null);
Select select=selectFrom(filter.getTable()).all();
Select select=selectFrom(cfg.getKeySpace(),filter.getTable()).all();
for(Relation clause : filter.clauses) {
select=select.where(clause);
}
Expand Down Expand Up @@ -284,7 +284,7 @@ public Collection<PartialToken> partialQuery(TokenFilter query) throws DataLayer
requestedAttributes.add(tokenField.toString());
requestedAttributes.add("coreTokenId");
final Filter filter=query.getQuery().accept(new org.openidentityplatform.openam.cassandra.QueryFilterVisitor(),null);
Select select=selectFrom(filter.getTable()).columns(requestedAttributes.toArray(new String[0]));
Select select=selectFrom(cfg.getKeySpace(),filter.getTable()).columns(requestedAttributes.toArray(new String[0]));
for(Relation clause : filter.clauses) {
select=select.where(clause);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.openidentityplatform.openam.cassandra;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;

public class Cluster {

static CqlSession session=CqlSession.builder()
.withApplicationName("OpenAM")
.withConfigLoader(DriverConfigLoader.fromDefaults(Repo.class.getClassLoader()))
.build();

public static CqlSession getSession() {
return session;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ public ResultSet execute(){
onSuccess(result.getExecutionInfo());
return result;
}catch(Throwable e){
onFailure(e);
try {
onFailure(e);
}catch (Throwable e2) {
logger.error("",e2);
}
throw e;
}

Expand All @@ -82,31 +86,25 @@ public void onSuccess(ExecutionInfo result) {
final QueryTrace trace=result.getQueryTrace();
logger.trace("{}μs {} {}",trace.getDurationMicros(),trace.getParameters(),trace.getCoordinatorAddress());
}else if (logger.isTraceEnabled()){
logger.trace("{} {} ms {}: {}: {}"
logger.trace("{} {} ms: {}: {}"
,statement.getExecutionProfileName()
,System.currentTimeMillis()-start
,getKeyspace()
,debugQuery(statement)
,statement.getConsistencyLevel()
);
}
}

public void onFailure(Throwable t) {
logger.warn("{} {} ms {}: {} {}: {}"
logger.warn("{} {} ms: {} {}: {}"
,statement.getExecutionProfileName()
,System.currentTimeMillis()-start
,getKeyspace()
,debugQuery(statement)
,statement.getConsistencyLevel()
,t.getMessage()
);
}

String getKeyspace() {
return statement.getKeyspace()==null?session.getKeyspace().get().toString():statement.getKeyspace().toString();
}

static ArrayList<String> debugQuery(Statement<?> statement) {
final ArrayList<String> debugs=new ArrayList<String>();
if (statement instanceof DefaultBoundStatement) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public class Repo extends IdRepo {
final static String created = "_created";
final static String updated = "_updated";

String keyspace="test";
@Override
public void initialize(Map<String, Set<String>> configParams) throws IdRepoException {
super.initialize(configParams);
Expand Down Expand Up @@ -151,16 +152,15 @@ public void initialize(Map<String, Set<String>> configParams) throws IdRepoExcep
disableCaseSensitive.addAll(Arrays.asList(new String[]{"uid","mail","iplanet-am-user-alias-list"}));
}

final String keyspace=CollectionHelper.getMapAttr(configParams, "sun-idrepo-ldapv3-config-organization_name","test");
keyspace=CollectionHelper.getMapAttr(configParams, "sun-idrepo-ldapv3-config-organization_name","test");
final String[] servers=((Map<String, Set<String>>)configParams).get("sun-idrepo-ldapv3-config-ldap-server").toArray(new String[0]);
final String username=CollectionHelper.getMapAttr(configParams, "sun-idrepo-ldapv3-config-authid",null);
final String password=CollectionHelper.getMapAttr(configParams, "sun-idrepo-ldapv3-config-authpw",null);

logger.info("create session {}/{}",username,servers);
CqlSessionBuilder builder=CqlSession.builder()
.withApplicationName("OpenAM datastore: "+keyspace)
.withConfigLoader(DriverConfigLoader.fromDefaults(Repo.class.getClassLoader()))
.withKeyspace(keyspace);
.withConfigLoader(DriverConfigLoader.fromDefaults(Repo.class.getClassLoader()));
if (StringUtils.isNotBlank(username)&&StringUtils.isNotBlank(password)) {
builder=builder.withAuthCredentials(username, password);
}
Expand All @@ -173,19 +173,19 @@ public void initialize(Map<String, Set<String>> configParams) throws IdRepoExcep
}
}
}
session=builder.build();
statement_select_by_type=session.prepare("select uid,field,value,change from values where type=:type limit 64000 allow filtering");
statement_select_by_uid=session.prepare("select uid,field,value,change from values where type=:type and uid=:uid");
statement_select_by_fields=session.prepare("select uid,field,value,change from values where type=:type and uid=:uid and field in :fields limit 64000");
statement_select_created_updated_by_uid=session.prepare("select max(change) as updated, min(change) as created from values where type=:type and uid=:uid");
statement_delete_by_uid=session.prepare("delete from values where type=:type and uid=:uid");
statement_delete_by_fields=session.prepare("delete from values where type=:type and uid=:uid and field in :fields");
statement_delete_by_field_value=session.prepare("delete from values where type=:type and uid=:uid and field=:field and value=:value");
statement_add_value=session.prepare("insert into values (type,uid,field,value,change) values (:type,:uid,:field,:value,toTimestamp(now()))");
statement_add_value_ttl=session.prepare("insert into values (type,uid,field,value,change) values (:type,:uid,:field,:value,toTimestamp(now())) using ttl :ttl");
session=Cluster.getSession();
statement_select_by_type=session.prepare("select uid,field,value,change from \""+keyspace+"\".\"values\" where type=:type limit 64000 allow filtering");
statement_select_by_uid=session.prepare("select uid,field,value,change from \""+keyspace+"\".\"values\" where type=:type and uid=:uid");
statement_select_by_fields=session.prepare("select uid,field,value,change from \""+keyspace+"\".\"values\" where type=:type and uid=:uid and field in :fields limit 64000");
statement_select_created_updated_by_uid=session.prepare("select max(change) as updated, min(change) as created from \""+keyspace+"\".\"values\" where type=:type and uid=:uid");
statement_delete_by_uid=session.prepare("delete from \""+keyspace+"\".\"values\" where type=:type and uid=:uid");
statement_delete_by_fields=session.prepare("delete from \""+keyspace+"\".\"values\" where type=:type and uid=:uid and field in :fields");
statement_delete_by_field_value=session.prepare("delete from \""+keyspace+"\".\"values\" where type=:type and uid=:uid and field=:field and value=:value");
statement_add_value=session.prepare("insert into \""+keyspace+"\".\"values\" (type,uid,field,value,change) values (:type,:uid,:field,:value,toTimestamp(now()))");
statement_add_value_ttl=session.prepare("insert into \""+keyspace+"\".\"values\" (type,uid,field,value,change) values (:type,:uid,:field,:value,toTimestamp(now())) using ttl :ttl");

statement_add_value_exist=session.prepare("insert into values (type,uid,field,value,change) values (:type,:uid,:field,:value,toTimestamp(now())) IF NOT EXISTS");
statement_add_value_ttl_exist=session.prepare("insert into values (type,uid,field,value,change) values (:type,:uid,:field,:value,toTimestamp(now())) IF NOT EXISTS using ttl :ttl");
statement_add_value_exist=session.prepare("insert into \""+keyspace+"\".\"values\" (type,uid,field,value,change) values (:type,:uid,:field,:value,toTimestamp(now())) IF NOT EXISTS");
statement_add_value_ttl_exist=session.prepare("insert into \""+keyspace+"\".\"values\" (type,uid,field,value,change) values (:type,:uid,:field,:value,toTimestamp(now())) IF NOT EXISTS using ttl :ttl");
}catch(Exception e){
logger.error("error",e);
throw new RuntimeException(e);
Expand All @@ -206,9 +206,6 @@ public void initialize(Map<String, Set<String>> configParams) throws IdRepoExcep
@Override
public void shutdown() {
super.shutdown();
if (session!=null && !session.isClosed()) {
session.close();
}
session=null;
}

Expand All @@ -235,6 +232,7 @@ public boolean isActive(SSOToken token, IdType type, String name) throws IdRepoE
if (value!=null && !value.isEmpty() && value.contains(activeValue)) { //activeAttr is set and activeAttr==activeValue
return true;
}else if ((value==null || value.isEmpty()) && isExists(token, type, name)){ //activeAttr is not set and isExists==true
setActiveStatus(token, type, name, true); //restore activeAttr
return true;
}
return false;
Expand Down Expand Up @@ -379,6 +377,9 @@ public void setAttributes(SSOToken token, IdType type, String name, Map<String,
}
}
attributes.remove("uid");
if (!attributes.containsKey(activeAttr)) {
attributes.put(activeAttr, new HashSet<String>(Arrays.asList(new String[]{activeValue})));
}
}
Map<String, Set<String>> oldValuesMap=null;
if (!isAdd) { //get old values for rewrite
Expand Down Expand Up @@ -508,7 +509,7 @@ final PreparedStatement getIndexByValue(String field) throws ExecutionException
return indexByValue.get(table,new Callable<PreparedStatement>() {
@Override
public PreparedStatement call() throws Exception {
return session.prepare("select uid,field,value from "+table+" where type=:type and field=:field and value in :values limit 64000");
return session.prepare("select uid,field,value from \""+keyspace+"\"."+table+" where type=:type and field=:field and value in :values limit 64000");
}
});
}
Expand All @@ -522,7 +523,7 @@ final PreparedStatement getIndexByValueAndUID(String field) throws ExecutionExce
return indexByValueAndUID.get(table,new Callable<PreparedStatement>() {
@Override
public PreparedStatement call() throws Exception {
return session.prepare("select uid,field,value from "+table+" where type=:type and field=:field and value in :values and uid=:uid limit 64000");
return session.prepare("select uid,field,value from \""+keyspace+"\"."+table+" where type=:type and field=:field and value in :values and uid=:uid limit 64000");
}
});
}
Expand Down Expand Up @@ -621,7 +622,7 @@ public RepoSearchResults search(SSOToken token, IdType type, String pattern, int
values.add(row.getString("value"));
}
}catch(UncheckedExecutionException e) {
logger.debug("unknown index {}: {} {}",session.getKeyspace().get(),filterEntry.getKey(),e.getCause().toString());
logger.debug("unknown index {}: {} {}",keyspace,filterEntry.getKey(),e.getCause().toString());
}
//join result
if (result.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ public void test_isAdd_false() throws SSOException, IdRepoException {
assertTrue(repo.isExists(null, IdType.USER, "9170000000"));
assertTrue(repo.isActive(null, IdType.USER, "9170000000"));

repo.removeAttributes(null,IdType.USER, "9170000000",new HashSet<String>(Arrays.asList(new String[] {"inetuserstatus"})));
repo.removeAttributes(null,IdType.USER, "9170000000",new HashSet<String>(Arrays.asList(new String[] {"uid"})));
assertFalse(repo.isExists(null, IdType.USER, "9170000000"));
assertFalse(repo.isActive(null, IdType.USER, "9170000000"));
Expand Down

0 comments on commit 645500f

Please sign in to comment.