Skip to content

Commit

Permalink
Add metrics.
Browse files Browse the repository at this point in the history
  • Loading branch information
jackluo923 committed Nov 17, 2024
1 parent 174cca3 commit 7a08a7b
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public enum ServerMeter implements AbstractMetrics.Meter {
REALTIME_ROWS_FILTERED("rows", false),
INVALID_REALTIME_ROWS_DROPPED("rows", false),
INCOMPLETE_REALTIME_ROWS_CONSUMED("rows", false),
REALTIME_CLP_TOO_MANY_ENCODED_VARS("rows", false),
REALTIME_CLP_UNENCODABLE("rows", false),
REALTIME_CLP_ENCODED_NON_STRINGS("rows", false),
REALTIME_CONSUMPTION_EXCEPTIONS("exceptions", true),
REALTIME_MERGED_TEXT_IDX_TRUNCATED_DOCUMENT_SIZE("bytes", false),
REALTIME_OFFSET_COMMITS("commits", true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordExtractor;
import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
Expand All @@ -41,6 +42,7 @@ public class CLPLogMessageDecoder implements StreamMessageDecoder<byte[]> {
public static final String ERROR_SAMPLING_PERIOD_CONFIG_KEY = "errorSamplingPeriod";
private static final Logger LOGGER = LoggerFactory.getLogger(CLPLogMessageDecoder.class);
private static final int DEFAULT_ERROR_SAMPLING_PERIOD = 10000;
private final ServerMetrics _serverMetrics = ServerMetrics.get();

private RecordExtractor<Map<String, Object>> _recordExtractor;
// Period at which errors should be sampled for printing:
Expand All @@ -67,7 +69,11 @@ public void init(Map<String, String> props, Set<String> fieldsToRead, String top
_recordExtractor = PluginManager.get().createInstance(recordExtractorClass);
RecordExtractorConfig config = PluginManager.get().createInstance(recordExtractorConfigClass);
config.init(props);
_recordExtractor.init(fieldsToRead, config);
if (_recordExtractor instanceof CLPLogRecordExtractor) {
((CLPLogRecordExtractor) _recordExtractor).init(fieldsToRead, config, topicName, _serverMetrics);
} else {
_recordExtractor.init(fieldsToRead, config);
}

// Parse error sampling period
if (null != errorSamplingPeriodString) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.plugin.inputformat.clplog;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
import com.yscope.clp.compressorfrontend.EncodedMessage;
import com.yscope.clp.compressorfrontend.MessageEncoder;
Expand All @@ -27,10 +28,14 @@
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexType;
import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
import org.apache.pinot.spi.metrics.PinotMeter;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.sql.parsers.rewriter.ClpRewriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -80,6 +85,19 @@ public class CLPLogRecordExtractor extends BaseRecordExtractor<Map<String, Objec
private String[] _unencodableFieldErrorDictionaryVars = null;
private Long[] _unencodableFieldErrorEncodedVars = null;

private String _topicName;
private ServerMetrics _serverMetrics;
PinotMeter _realtimeClpTooManyEncodedVarsMeter = null;
PinotMeter _realtimeClpUnencodableMeter = null;
PinotMeter _realtimeClpEncodedNonStringsMeter = null;

public void init(Set<String> fields, @Nullable RecordExtractorConfig recordExtractorConfig, String topicName,
ServerMetrics serverMetrics) {
init(fields, recordExtractorConfig);
_topicName = topicName;
_serverMetrics = serverMetrics;
}

@Override
public void init(Set<String> fields, @Nullable RecordExtractorConfig recordExtractorConfig) {
_config = (CLPLogRecordExtractorConfig) recordExtractorConfig;
Expand Down Expand Up @@ -161,28 +179,43 @@ private void encodeFieldWithClp(String key, Object value, GenericRow to) {
Object[] encodedVars = null;
if (null != value) {
boolean fieldIsUnencodable = false;
if (!(value instanceof String)) {
LOGGER.error("Can't encode value of type {} with CLP. name: '{}', value: '{}'",
value.getClass().getSimpleName(), key, value);
fieldIsUnencodable = true;

// Get value as string
String valueAsString = null;
if (value instanceof String) {
valueAsString = (String) value;
} else {
String valueAsString = (String) value;
try {
valueAsString = JsonUtils.objectToString(value);
_realtimeClpEncodedNonStringsMeter =
_serverMetrics.addMeteredTableValue(_topicName, ServerMeter.REALTIME_CLP_ENCODED_NON_STRINGS, 1,
_realtimeClpEncodedNonStringsMeter);
} catch (JsonProcessingException ex) {
LOGGER.error("Can't convert value of type {} to String (to encode with CLP). name: '{}', value: '{}'",
value.getClass().getSimpleName(), key, value);
fieldIsUnencodable = true;
}
}

// Encode value with CLP
if (null != valueAsString) {
try {
_clpMessageEncoder.encodeMessage(valueAsString, _clpEncodedMessage);
logtype = _clpEncodedMessage.getLogTypeAsString();
encodedVars = _clpEncodedMessage.getEncodedVarsAsBoxedLongs();
dictVars = _clpEncodedMessage.getDictionaryVarsAsStrings();

if (null != encodedVars && encodedVars.length > MAX_VARIABLES_PER_CELL) {
LOGGER.error("Can't encode field with CLP. name: '{}', error: Too many encoded variables", key);
fieldIsUnencodable = true;
}
if (null != dictVars && dictVars.length > MAX_VARIABLES_PER_CELL) {
LOGGER.error("Can't encode field with CLP. name: '{}', error: Too many dictionary variables", key);
if ((null != dictVars && dictVars.length > MAX_VARIABLES_PER_CELL) || (null != encodedVars
&& encodedVars.length > MAX_VARIABLES_PER_CELL)) {
_realtimeClpTooManyEncodedVarsMeter =
_serverMetrics.addMeteredTableValue(_topicName, ServerMeter.REALTIME_CLP_TOO_MANY_ENCODED_VARS, 1,
_realtimeClpTooManyEncodedVarsMeter);
fieldIsUnencodable = true;
}
} catch (IOException e) {
LOGGER.error("Can't encode field with CLP. name: '{}', error: {}", key, e.getMessage());
_realtimeClpUnencodableMeter =
_serverMetrics.addMeteredTableValue(_topicName, ServerMeter.REALTIME_CLP_UNENCODABLE, 1,
_realtimeClpUnencodableMeter);
fieldIsUnencodable = true;
}
}
Expand All @@ -191,7 +224,6 @@ private void encodeFieldWithClp(String key, Object value, GenericRow to) {
String unencodableFieldSuffix = _config.getUnencodableFieldSuffix();
if (null != unencodableFieldSuffix) {
String unencodableFieldKey = key + unencodableFieldSuffix;
LOGGER.info("Storing field '{}' that can't be encoded with CLP in {}", key, unencodableFieldKey);
to.putValue(unencodableFieldKey, value);
}

Expand All @@ -204,6 +236,8 @@ private void encodeFieldWithClp(String key, Object value, GenericRow to) {
dictVars = null;
encodedVars = null;
}
} else {
to.putValue(key, valueAsString);
}
}

Expand Down

0 comments on commit 7a08a7b

Please sign in to comment.