Skip to content

Commit

Permalink
feat(metrics): add metrics for aspect write and bytes (#8526)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Jul 31, 2023
1 parent b77b4e2 commit de1f23d
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import io.ebean.PagedList;

import javax.annotation.Nonnull;
Expand All @@ -28,6 +29,8 @@
* worth looking into ways to move this responsibility inside {@link AspectDao} implementations.
*/
public interface AspectDao {
String ASPECT_WRITE_COUNT_METRIC_NAME = "aspectWriteCount";
String ASPECT_WRITE_BYTES_METRIC_NAME = "aspectWriteBytes";

@Nullable
EntityAspect getAspect(@Nonnull final String urn, @Nonnull final String aspectName, final long version);
Expand Down Expand Up @@ -116,4 +119,11 @@ ListResult<String> listAspectMetadata(

@Nonnull
<T> T runInTransactionWithRetry(@Nonnull final Supplier<T> block, final int maxTransactionRetry);

default void incrementWriteMetrics(String aspectName, long count, long bytes) {
MetricUtils.counter(this.getClass(),
String.join(MetricUtils.DELIMITER, List.of(ASPECT_WRITE_COUNT_METRIC_NAME, aspectName))).inc(count);
MetricUtils.counter(this.getClass(),
String.join(MetricUtils.DELIMITER, List.of(ASPECT_WRITE_BYTES_METRIC_NAME, aspectName))).inc(bytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1811,6 +1811,8 @@ public RollbackResult deleteAspect(String urn, String aspectName, @Nonnull Map<S
latest.setCreatedBy(survivingAspect.getCreatedBy());
latest.setCreatedFor(survivingAspect.getCreatedFor());
_aspectDao.saveAspect(latest, false);
// metrics
_aspectDao.incrementWriteMetrics(aspectName, 1, latest.getAspect().getBytes(StandardCharsets.UTF_8).length);
_aspectDao.deleteAspect(survivingAspect);
} else {
if (isKeyAspect) {
Expand Down Expand Up @@ -2014,20 +2016,27 @@ private UpdateAspectResult ingestAspectToLocalDBNoTransaction(

_aspectDao.saveAspect(latest, false);

// metrics
_aspectDao.incrementWriteMetrics(aspectName, 1, latest.getAspect().getBytes(StandardCharsets.UTF_8).length);

return new UpdateAspectResult(urn, oldValue, oldValue,
EntityUtils.parseSystemMetadata(latest.getSystemMetadata()), latestSystemMetadata,
MetadataAuditOperation.UPDATE, auditStamp, 0);
}

// 4. Save the newValue as the latest version
log.debug("Ingesting aspect with name {}, urn {}", aspectName, urn);
String newValueStr = EntityUtils.toJsonAspect(newValue);
long versionOfOld = _aspectDao.saveLatestAspect(urn.toString(), aspectName, latest == null ? null : EntityUtils.toJsonAspect(oldValue),
latest == null ? null : latest.getCreatedBy(), latest == null ? null : latest.getCreatedFor(),
latest == null ? null : latest.getCreatedOn(), latest == null ? null : latest.getSystemMetadata(),
EntityUtils.toJsonAspect(newValue), auditStamp.getActor().toString(),
newValueStr, auditStamp.getActor().toString(),
auditStamp.hasImpersonator() ? auditStamp.getImpersonator().toString() : null,
new Timestamp(auditStamp.getTime()), EntityUtils.toJsonAspect(providedSystemMetadata), nextVersion);

// metrics
_aspectDao.incrementWriteMetrics(aspectName, 1, newValueStr.getBytes(StandardCharsets.UTF_8).length);

return new UpdateAspectResult(urn, oldValue, newValue,
latest == null ? null : EntityUtils.parseSystemMetadata(latest.getSystemMetadata()), providedSystemMetadata,
MetadataAuditOperation.UPDATE, auditStamp, versionOfOld);
Expand Down Expand Up @@ -2072,10 +2081,14 @@ private RecordTemplate updateAspect(
newSystemMetadata.setLastObserved(System.currentTimeMillis());

log.debug("Updating aspect with name {}, urn {}", aspectName, urn);
_aspectDao.saveAspect(urn.toString(), aspectName, EntityUtils.toJsonAspect(value), auditStamp.getActor().toString(),
String aspectStr = EntityUtils.toJsonAspect(value);
_aspectDao.saveAspect(urn.toString(), aspectName, aspectStr, auditStamp.getActor().toString(),
auditStamp.hasImpersonator() ? auditStamp.getImpersonator().toString() : null,
new Timestamp(auditStamp.getTime()), EntityUtils.toJsonAspect(newSystemMetadata), version, oldAspect == null);

// metrics
_aspectDao.incrementWriteMetrics(aspectName, 1, aspectStr.getBytes(StandardCharsets.UTF_8).length);

return new UpdateAspectResult(urn, oldValue, value, oldSystemMetadata, newSystemMetadata,
MetadataAuditOperation.UPDATE, auditStamp, version);
}, maxTransactionRetry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.linkedin.metadata.query.ExtraInfoArray;
import com.linkedin.metadata.query.ListResultMetadata;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -596,6 +597,9 @@ public void saveAspect(
);

saveAspect(aspect, insert);

// metrics
incrementWriteMetrics(aspectName, 1, aspectMetadata.getBytes(StandardCharsets.UTF_8).length);
}

@Override
Expand Down

0 comments on commit de1f23d

Please sign in to comment.