Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
RTSDK-6583: Update to EMAJ Config Guide for global parameter added for RTSDK-6411
RTSDK-6583: Upticking version to 3.6.6.1 for 2.0.6.G1 release
RTSDK-6583: Update to CHANGELOG for release 2.0.6.G1
RTSDK-6411: Memory leak upon repeated init and un-init of OMMConsumer due to a growing global pool
  • Loading branch information
ciprian-tarlev committed Sep 16, 2022
1 parent 7e4932e commit 5e35e79
Show file tree
Hide file tree
Showing 14 changed files with 184 additions and 43 deletions.
16 changes: 13 additions & 3 deletions Java/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,33 @@ There are three types of RTSDK releases that append a letter directly followed b
"E" releases (E-Loads) are emergency RTSDK releases that are uploaded to MyRefinitiv and Developer Community but not to GitHub. Also note that emergency releases may only be partial (i.e., Java or C++/C only).

----------------------------------------------------------------------------------------
CURRENT RELEASE HIGHLIGHTS - RTSDK Java 2.0.6.L1 aka EMA/ETA 3.6.6.L1 aka 3.6.6.0
CURRENT RELEASE HIGHLIGHTS - RTSDK Java 2.0.6.G1 aka EMA/ETA 3.6.6.G1 aka 3.6.6.1
----------------------------------------------------------------------------------------

This is a maintenance release with fixes.
This is a rapid release with a critical fix.

Customer Issues Resolved
----------------------------------------------------------------------------------------
[Case Number: 10825196] - [RTSDK-5901] - NullPointerException with emaj 3.6.1.2 processing directory domain message
- [Case Number: 11444081] - [RTSDK-6411] - Memory leak upon repeated init and un-init of OMMConsumer due to a growing global pool


----------------------------------------------------------------------------------------
FULL CHANGELOG
----------------------------------------------------------------------------------------

--------------------------------------------
RTSDK Java Release 2.0.6.G1 (Sep 16, 2022)
--------------------------------------------
This is a rapid release with a critical fix.

EMA Java 3.6.6.L1 Issues Resolved
---------------------------------
- [RTSDK-6411] - Memory leak upon repeated init and un-init of OMMConsumer due to a growing global pool [Case Number: 11444081]

--------------------------------------------
RTSDK Java Release 2.0.6.L1 (Jun 20, 2022)
--------------------------------------------
This is a maintenance release with fixes.

EMA Java 3.6.6.L1 Issues Resolved
---------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class ConfigManager
public static final int ChannelType = 104;
public static final int ChannelInitTimeout = 105;
public static final int ServiceDiscoveryRetryCount = 106;
public static final int JsonConverterPoolsSize = 107;

// Channel: Socket, HTTP, Encrypted, WebSocket
public static final int ChannelCompressionThreshold = 200;
Expand Down Expand Up @@ -561,6 +562,7 @@ class ConfigManager
GlobalConfigDict.add( "ReactorMsgEventPoolLimit",ReactorMsgEventPoolLimit );
GlobalConfigDict.add( "TunnelStreamMsgEventPoolLimit", TunnelStreamMsgEventPoolLimit);
GlobalConfigDict.add( "TunnelStreamStatusEventPoolLimit", TunnelStreamStatusEventPoolLimit );
GlobalConfigDict.add("JsonConverterPoolsSize", JsonConverterPoolsSize);

CONSUMER_GROUP = ConfigManager.acquire().new Branch();
CONSUMER_GROUP.add(ConfigManager.ConsumerGroup,ConfigManager.ConsumerTagDict);
Expand Down Expand Up @@ -788,7 +790,8 @@ class ConfigManager
"CloseChannelFromConverterFailure",
"OpenLimit",
"OpenWindow",
"LoadFactor"
"LoadFactor",
"JsonConverterPoolsSize"
};
public static String DoubleValues[] = {
"TokenReissueRatio"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1064,7 +1064,7 @@ private EnumTypeTableImpl getEnumTypeTable(com.refinitiv.eta.codec.EnumTypeTable
return enumTypeTableImpl;
}

private void clearFlags()
void clearFlags()
{
loadedFieldDictionary = false;
loadedEnumTypeDef = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,13 @@ public <T> T[] toArray(T[] a)
{
throw new UnsupportedOperationException("FieldList collection doesn't support this operation.");
}

@Override
public void returnToPool()
{
_dataDictionaryImpl.rsslDataDictionary(null);
_dataDictionaryImpl.clearFlags();
}

String toString(int indent)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
class GlobalConfig
{
final static int DEFAULT_EVENT_POOL_LIMIT = -1;
static final int JSON_CONVERTER_DEFAULT_POOLS_SIZE = 10;

int reactorMsgEventPoolLimit;
int reactorChannelEventPoolLimit;
int workerEventPoolLimit;
int tunnelStreamMsgEventPoolLimit;
int tunnelStreamStatusEventPoolLimit;
int jsonConverterPoolsSize;

GlobalConfig()
{
Expand All @@ -25,6 +27,7 @@ class GlobalConfig
workerEventPoolLimit = DEFAULT_EVENT_POOL_LIMIT;
tunnelStreamMsgEventPoolLimit = DEFAULT_EVENT_POOL_LIMIT;
tunnelStreamStatusEventPoolLimit = DEFAULT_EVENT_POOL_LIMIT;
jsonConverterPoolsSize = JSON_CONVERTER_DEFAULT_POOLS_SIZE;
}

void clear()
Expand All @@ -34,5 +37,6 @@ void clear()
workerEventPoolLimit = DEFAULT_EVENT_POOL_LIMIT;
tunnelStreamMsgEventPoolLimit = DEFAULT_EVENT_POOL_LIMIT;
tunnelStreamStatusEventPoolLimit = DEFAULT_EVENT_POOL_LIMIT;
jsonConverterPoolsSize = JSON_CONVERTER_DEFAULT_POOLS_SIZE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,53 @@ static class ImplementationType
long nextLongId();

void channelInformation(ChannelInformation ci);

default int getJsonConverterPoolsSize(ConfigElement configElement, BaseConfig baseConfig, StringBuilder stringBuilder,
Logger loggerClient) {
String jsonConverterPoolsSizeValue = configElement._valueStr;

try {
if (jsonConverterPoolsSizeValue == null || jsonConverterPoolsSizeValue.isEmpty())
return GlobalConfig.JSON_CONVERTER_DEFAULT_POOLS_SIZE;

long jsonConverterPoolsSize = Long.parseLong(jsonConverterPoolsSizeValue);

if (jsonConverterPoolsSize < 0) {
if (loggerClient.isWarnEnabled())
{
stringBuilder.append("JsonConverterPoolsSize value should be equal or greater than 0.")
.append(" It will be set to default value: 10.");
loggerClient.warn(formatLogMessage(baseConfig.instanceName, stringBuilder.toString(), Severity.WARNING));
}
return GlobalConfig.JSON_CONVERTER_DEFAULT_POOLS_SIZE;
}

if (jsonConverterPoolsSize > Integer.MAX_VALUE) {
if (loggerClient.isWarnEnabled())
{
stringBuilder.append("JsonConverterPoolsSize value should not be greater than ")
.append(Integer.MAX_VALUE)
.append(". It will be set to ")
.append(Integer.MAX_VALUE)
.append(".");
loggerClient.warn(formatLogMessage(baseConfig.instanceName, stringBuilder.toString(), Severity.WARNING));
}
return Integer.MAX_VALUE;
}

return (int) jsonConverterPoolsSize;
} catch (NumberFormatException exception) {
if (loggerClient.isWarnEnabled())
{
stringBuilder.append("invalid JsonConverterPoolsSize value format [")
.append(jsonConverterPoolsSizeValue)
.append("]; expected number. It will be set to default value - 10.");
loggerClient.warn(formatLogMessage(baseConfig.instanceName, stringBuilder.toString(), Severity.WARNING));
}
}

return GlobalConfig.JSON_CONVERTER_DEFAULT_POOLS_SIZE;
}
}

abstract class OmmBaseImpl<T> implements OmmCommonImpl, Runnable, TimeoutClient, ReactorServiceNameToIdCallback, ReactorJsonConversionEventCallback
Expand Down Expand Up @@ -941,6 +988,11 @@ void readConfiguration(EmaConfigImpl config)
{
_activeConfig.globalConfig.tunnelStreamStatusEventPoolLimit = ce.intValue();
}
if( (ce = globalConfigAttributes.getPrimitiveValue(ConfigManager.JsonConverterPoolsSize)) != null)
{
_activeConfig.globalConfig.jsonConverterPoolsSize =
getJsonConverterPoolsSize(ce, _activeConfig, strBuilder(), _loggerClient);
}
}

ProgrammaticConfigure pc = config.programmaticConfigure();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,7 @@ void handleAdminDomains(EmaConfigImpl config) {
jsonConverterOptions.catchUnknownJsonKeys(_activeConfig.catchUnknownJsonKeys);
jsonConverterOptions.catchUnknownJsonFids(_activeConfig.catchUnknownJsonFids);
jsonConverterOptions.closeChannelFromFailure(_activeConfig.closeChannelFromFailure);
jsonConverterOptions.jsonConverterPoolsSize(_activeConfig.globalConfig.jsonConverterPoolsSize);

if (_rsslReactor.initJsonConverter(jsonConverterOptions, _rsslErrorInfo) != ReactorReturnCodes.SUCCESS) {
strBuilder().append("Failed to initialize OmmBaseImpl (RWF/JSON Converter).")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ void initialize(ActiveServerConfig activeConfig,EmaConfigServerImpl config)
jsonConverterOptions.catchUnknownJsonKeys(activeConfig.catchUnknownJsonKeys);
jsonConverterOptions.catchUnknownJsonFids(activeConfig.catchUnknownJsonFids);
jsonConverterOptions.closeChannelFromFailure(activeConfig.closeChannelFromFailure);
jsonConverterOptions.jsonConverterPoolsSize(activeConfig.globalConfig.jsonConverterPoolsSize);

if (_rsslReactor.initJsonConverter(jsonConverterOptions, _rsslErrorInfo) != ReactorReturnCodes.SUCCESS) {
strBuilder().append("Failed to initialize OmmServerBaseImpl (RWF/JSON Converter).")
Expand Down Expand Up @@ -722,6 +723,11 @@ void readConfiguration(EmaConfigServerImpl config)
{
_activeServerConfig.globalConfig.tunnelStreamStatusEventPoolLimit = ce.intValue();
}
if( (ce = globalConfigAttributes.getPrimitiveValue(ConfigManager.JsonConverterPoolsSize)) != null)
{
_activeServerConfig.globalConfig.jsonConverterPoolsSize =
getJsonConverterPoolsSize(ce, _activeServerConfig, strBuilder(), _loggerClient);
}
}

ProgrammaticConfigure pc = config.programmaticConfigure();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ GlobalConfig retrieveGlobalConfig() {
ElementEntry workerEventPoolLimit = getIntElementEntry(globalConfigEntry, "WorkerEventPoolLimit");
ElementEntry tunnelStreamMsgEventPoolLimit = getIntElementEntry(globalConfigEntry, "TunnelStreamMsgEventPoolLimit");
ElementEntry tunnelStreamStatusEventPoolLimit = getIntElementEntry(globalConfigEntry, "TunnelStreamStatusEventPoolLimit");
ElementEntry jsonConverterPoolsSize = getIntElementEntry(globalConfigEntry, "JsonConverterPoolsSize");

if (reactorMsgEventPoolLimit != null) {
config.reactorMsgEventPoolLimit = convertToInt(reactorMsgEventPoolLimit.intValue());
Expand All @@ -616,6 +617,9 @@ GlobalConfig retrieveGlobalConfig() {
if (tunnelStreamStatusEventPoolLimit != null) {
config.tunnelStreamStatusEventPoolLimit = convertToInt(tunnelStreamStatusEventPoolLimit.intValue());
}
if (jsonConverterPoolsSize != null) {
config.jsonConverterPoolsSize = getJsonConverterPoolsSize(jsonConverterPoolsSize.intValue());
}
return config;
}
void retrieveDictionaryConfig( String dictionaryName, ActiveConfig activeConfig )
Expand Down Expand Up @@ -3202,4 +3206,27 @@ void removeConfigFileService(DirectoryServiceStore dirServiceStore, DirectoryCac
i++;
}
}

private int getJsonConverterPoolsSize(long jsonConverterPoolsSize)
{
if(jsonConverterPoolsSize < 0)
{
_emaConfigErrList.append( "JsonConverterPoolsSize value should be equal or greater than 0.")
.append( " It will be set to default value: 10.")
.create(Severity.WARNING);
return GlobalConfig.JSON_CONVERTER_DEFAULT_POOLS_SIZE;
}

if(jsonConverterPoolsSize > Integer.MAX_VALUE)
{
_emaConfigErrList.append("JsonConverterPoolsSize value should not be greater than ")
.append(Integer.MAX_VALUE)
.append(". It will be set to ")
.append(Integer.MAX_VALUE)
.append(".")
.create(Severity.WARNING);
return Integer.MAX_VALUE;
}
return (int) jsonConverterPoolsSize;
}
}
Binary file modified Java/Ema/Docs/EMAJ_ConfigGuide.pdf
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class JsonFactory {
private static ObjectPool<ByteBuffer> byteBufferPool = new ObjectPool<>(true, () -> ByteBuffer.allocate(DEFAULT_BYTEBUFFER_SIZE));
private static ByteArrayPool byteArrayPool = new ByteArrayPool();

private static boolean isInitialized = false;
public static final int JSON_CONVERTER_DEFAULT_POOLS_SIZE = 10;

private JsonFactory() {
throw new AssertionError();
}
Expand Down Expand Up @@ -333,39 +336,44 @@ public static void releaseByteArray(byte[] array) {
}


public static void initPools(int numOfObjects) {

intPool.growPool(numOfObjects);
uintPool.growPool(numOfObjects);
elementListPool.growPool(numOfObjects);
elementEntryPool.growPool(numOfObjects);
bufferPool.growPool(numOfObjects);
fieldListPool.growPool(numOfObjects);
fieldEntryPool.growPool(numOfObjects);
vectorPool.growPool(numOfObjects);
vectorEntryPool.growPool(numOfObjects);
seriesPool.growPool(numOfObjects);
seriesEntryPool.growPool(numOfObjects);
filterListPool.growPool(numOfObjects);
filterEntryPool.growPool(numOfObjects);
realPool.growPool(numOfObjects);
doublePool.growPool(numOfObjects);
floatPool.growPool(numOfObjects);
statePool.growPool(numOfObjects);
qosPool.growPool(numOfObjects);
mapPool.growPool(numOfObjects);
mapEntryPool.growPool(numOfObjects);
enumPool.growPool(numOfObjects);
timePool.growPool(numOfObjects);
datePool.growPool(numOfObjects);
dateTimePool.growPool(numOfObjects);
arrayPool.growPool(numOfObjects);
arrayEntryPool.growPool(numOfObjects);
msgPool.growPool(numOfObjects);
fieldSetDefDbPool.growPool(numOfObjects);
elementSetDefDbPool.growPool(numOfObjects);
decodeIterPool.growPool(numOfObjects);
encodeIteratorPool.growPool(numOfObjects);
byteBufferPool.growPool(numOfObjects);
public static void initPools(int numOfObjects)
{
if(!isInitialized && numOfObjects > 0)
{
intPool.growPool(numOfObjects);
uintPool.growPool(numOfObjects);
elementListPool.growPool(numOfObjects);
elementEntryPool.growPool(numOfObjects);
bufferPool.growPool(numOfObjects);
fieldListPool.growPool(numOfObjects);
fieldEntryPool.growPool(numOfObjects);
vectorPool.growPool(numOfObjects);
vectorEntryPool.growPool(numOfObjects);
seriesPool.growPool(numOfObjects);
seriesEntryPool.growPool(numOfObjects);
filterListPool.growPool(numOfObjects);
filterEntryPool.growPool(numOfObjects);
realPool.growPool(numOfObjects);
doublePool.growPool(numOfObjects);
floatPool.growPool(numOfObjects);
statePool.growPool(numOfObjects);
qosPool.growPool(numOfObjects);
mapPool.growPool(numOfObjects);
mapEntryPool.growPool(numOfObjects);
enumPool.growPool(numOfObjects);
timePool.growPool(numOfObjects);
datePool.growPool(numOfObjects);
dateTimePool.growPool(numOfObjects);
arrayPool.growPool(numOfObjects);
arrayEntryPool.growPool(numOfObjects);
msgPool.growPool(numOfObjects);
fieldSetDefDbPool.growPool(numOfObjects);
elementSetDefDbPool.growPool(numOfObjects);
decodeIterPool.growPool(numOfObjects);
encodeIteratorPool.growPool(numOfObjects);
byteBufferPool.growPool(numOfObjects);

isInitialized = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5530,7 +5530,7 @@ public int initJsonConverter(ReactorJsonConverterOptions jsonConverterOptions, R
"Reactor.initJsonConverter", "The service ID must be in a range between 0 to 65535.");
}

JsonFactory.initPools(3);
JsonFactory.initPools(jsonConverterOptions.jsonConverterPoolsSize());
JsonConverterBuilder jsonConverterBuilder = ConverterFactory.createJsonConverterBuilder();

if(Objects.isNull(serviceNameIdConverterClient))
Expand Down
Loading

0 comments on commit 5e35e79

Please sign in to comment.