Skip to content

Commit

Permalink
[pinpoint-apm#10882] Add ServiceName + ApplicationName based ServerMap
Browse files Browse the repository at this point in the history
refactoring
  • Loading branch information
intr3p1d committed Jun 10, 2024
1 parent 9b6be48 commit 16dc9c7
Show file tree
Hide file tree
Showing 38 changed files with 2,153 additions and 22 deletions.
2 changes: 2 additions & 0 deletions agent-module/agent-testweb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@
<module>resilience4j-plugin-testweb</module>
<module>closed-module-testweb</module>
<module>closed-module-testlib</module>
<module>spring-boot3-testweb</module>
<module>spring-boot3-webflux-plugin-testweb</module>
</modules>

<dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
SchedulerConfiguration.class,
})
@ComponentScan({
"com.navercorp.pinpoint.collector.dao.hbase"
"com.navercorp.pinpoint.collector.dao.hbase",
"com.navercorp.pinpoint.collector.applicationmap.dao.hbase"
})
@PropertySource(name = "CollectorHbaseModule", value = {
"classpath:hbase-root.properties",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
"com.navercorp.pinpoint.collector.mapper",
"com.navercorp.pinpoint.collector.util",
"com.navercorp.pinpoint.collector.service",
"com.navercorp.pinpoint.collector.applicationmap.service",
})
public class PinpointCollectorModule {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2024 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.pinpoint.collector.applicationmap.dao;

import com.navercorp.pinpoint.collector.dao.CachedStatisticsDao;
import com.navercorp.pinpoint.common.trace.ServiceType;

/**
* @author intr3p1d
*/
public interface InboundDao extends CachedStatisticsDao {
// src -> dest
// inbound (rowKey dest <- columnName src)
// outbound (rowKey src -> columnName dest)
void update(
String srcServiceName, String srcApplicationName, ServiceType srcApplicationType,
String destServiceName, String destApplicationName, ServiceType destApplicationType,
String srcHost, int elapsed, boolean isError
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2024 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.pinpoint.collector.applicationmap.dao;

import com.navercorp.pinpoint.collector.dao.CachedStatisticsDao;
import com.navercorp.pinpoint.common.trace.ServiceType;

/**
* @author intr3p1d
*/
public interface OutboundDao extends CachedStatisticsDao {
// src -> dest
// inbound (rowKey dest <- columnName src)
// outbound (rowKey src -> columnName dest)
void update(
String srcServiceName, String srcApplicationName, ServiceType srcApplicationType,
String destServiceName, String destApplicationName, ServiceType destApplicationType,
String srcHost, int elapsed, boolean isError
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2024 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.pinpoint.collector.applicationmap.dao;

import com.navercorp.pinpoint.collector.dao.CachedStatisticsDao;
import com.navercorp.pinpoint.common.trace.ServiceType;

/**
* @author intr3p1d
*/
public interface SelfDao extends CachedStatisticsDao {
void received(String serviceName, String applicationName, ServiceType applicationType, int elapsed, boolean isError);
void updatePing(String serviceName, String applicationName, ServiceType applicationType, int elapsed, boolean isError);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright 2024 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.pinpoint.collector.applicationmap.dao.hbase;

import com.navercorp.pinpoint.collector.applicationmap.dao.InboundDao;
import com.navercorp.pinpoint.collector.dao.hbase.IgnoreStatFilter;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.BulkWriter;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.ColumnName;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.MapLinkConfiguration;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.RowKey;
import com.navercorp.pinpoint.collector.applicationmap.dao.hbase.statistics.ApplicationMapColumnName;
import com.navercorp.pinpoint.collector.applicationmap.dao.hbase.statistics.ApplicationMapRowKey;
import com.navercorp.pinpoint.common.server.util.AcceptedTimeService;
import com.navercorp.pinpoint.common.server.util.ApplicationMapStatisticsUtils;
import com.navercorp.pinpoint.common.server.util.TimeSlot;
import com.navercorp.pinpoint.common.trace.HistogramSchema;
import com.navercorp.pinpoint.common.trace.ServiceType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.util.Objects;

/**
* @author intr3p1d
*/
@Repository
public class HbaseInboundDao implements InboundDao {

private final Logger logger = LogManager.getLogger(this.getClass());

private final AcceptedTimeService acceptedTimeService;

private final TimeSlot timeSlot;
private final IgnoreStatFilter ignoreStatFilter;
private final BulkWriter bulkWriter;
private final MapLinkConfiguration mapLinkConfiguration;

public HbaseInboundDao(
MapLinkConfiguration mapLinkConfiguration,
IgnoreStatFilter ignoreStatFilter,
AcceptedTimeService acceptedTimeService,
TimeSlot timeSlot,
@Qualifier("inboundBulkWriter") BulkWriter bulkWriter
) {
this.mapLinkConfiguration = Objects.requireNonNull(mapLinkConfiguration, "mapLinkConfiguration");
this.ignoreStatFilter = Objects.requireNonNull(ignoreStatFilter, "ignoreStatFilter");
this.acceptedTimeService = Objects.requireNonNull(acceptedTimeService, "acceptedTimeService");
this.timeSlot = Objects.requireNonNull(timeSlot, "timeSlot");

this.bulkWriter = Objects.requireNonNull(bulkWriter, "inboundBulkWriter");
}


@Override
public void update(
String srcServiceName, String srcApplicationName, ServiceType srcApplicationType,
String destServiceName, String destApplicationName, ServiceType destApplicationType,
String srcHost, int elapsed, boolean isError
) {
Objects.requireNonNull(srcServiceName, "srcServiceName");
Objects.requireNonNull(destServiceName, "destServiceName");
Objects.requireNonNull(srcApplicationName, "srcApplicationName");
Objects.requireNonNull(destServiceName, "destApplicationName");

if (logger.isDebugEnabled()) {
logger.debug("[Inbound] {} {}({}) <- {} {}({})[{}]",
destServiceName, destApplicationName, destApplicationType,
srcServiceName, srcApplicationName, srcApplicationType, srcHost
);
}


// TODO dest, src parameter normalization
if (ignoreStatFilter.filter(srcApplicationType, srcHost)) {
logger.debug("[Ignore-Inbound] {} {}({}) <- {} {}({})[{}]",
destServiceName, destApplicationName, destApplicationType,
srcServiceName, srcApplicationName, srcApplicationType, srcHost
);
return;
}

final long acceptedTime = acceptedTimeService.getAcceptedTime();
final long rowTimeSlot = timeSlot.getTimeSlot(acceptedTime);

// rowKey is dest in inbound
final RowKey destRowKey = new ApplicationMapRowKey(destServiceName, destApplicationType.getCode(), destApplicationName, rowTimeSlot);

// columnName is src in outbound
final short srcSlotNumber = ApplicationMapStatisticsUtils.getSlotNumber(srcApplicationType, elapsed, isError);
HistogramSchema histogramSchema = srcApplicationType.getHistogramSchema();

final ColumnName srcColumnName = new ApplicationMapColumnName(srcServiceName, srcApplicationType.getCode(), srcApplicationName, srcSlotNumber);
this.bulkWriter.increment(destRowKey, srcColumnName);

if (mapLinkConfiguration.isEnableAvg()) {
final ColumnName sumColumnName = new ApplicationMapColumnName(srcServiceName, srcApplicationType.getCode(), srcApplicationName, histogramSchema.getSumStatSlot().getSlotTime());
this.bulkWriter.increment(destRowKey, sumColumnName, elapsed);
}
if (mapLinkConfiguration.isEnableMax()) {
final ColumnName maxColumnName = new ApplicationMapColumnName(srcServiceName, srcApplicationType.getCode(), srcApplicationName, histogramSchema.getMaxStatSlot().getSlotTime());
this.bulkWriter.updateMax(destRowKey, maxColumnName, elapsed);
}

}

@Override
public void flushLink() {
this.bulkWriter.flushLink();
}

@Override
public void flushAvgMax() {
this.bulkWriter.flushAvgMax();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright 2024 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.pinpoint.collector.applicationmap.dao.hbase;

import com.navercorp.pinpoint.collector.applicationmap.dao.OutboundDao;
import com.navercorp.pinpoint.collector.dao.hbase.IgnoreStatFilter;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.BulkWriter;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.ColumnName;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.MapLinkConfiguration;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.RowKey;
import com.navercorp.pinpoint.collector.applicationmap.dao.hbase.statistics.ApplicationMapColumnName;
import com.navercorp.pinpoint.collector.applicationmap.dao.hbase.statistics.ApplicationMapRowKey;
import com.navercorp.pinpoint.common.server.util.AcceptedTimeService;
import com.navercorp.pinpoint.common.server.util.ApplicationMapStatisticsUtils;
import com.navercorp.pinpoint.common.server.util.TimeSlot;
import com.navercorp.pinpoint.common.trace.HistogramSchema;
import com.navercorp.pinpoint.common.trace.ServiceType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.util.Objects;

/**
* @author intr3p1d
*/
@Repository
public class HbaseOutboundDao implements OutboundDao {

private final Logger logger = LogManager.getLogger(this.getClass());

private final AcceptedTimeService acceptedTimeService;

private final TimeSlot timeSlot;

private final BulkWriter bulkWriter;
private final MapLinkConfiguration mapLinkConfiguration;

public HbaseOutboundDao(
MapLinkConfiguration mapLinkConfiguration,
IgnoreStatFilter ignoreStatFilter,
AcceptedTimeService acceptedTimeService, TimeSlot timeSlot,
@Qualifier("outboundBulkWriter") BulkWriter bulkWriter
) {
this.mapLinkConfiguration = Objects.requireNonNull(mapLinkConfiguration, "mapLinkConfiguration");
this.acceptedTimeService = Objects.requireNonNull(acceptedTimeService, "acceptedTimeService");
this.timeSlot = Objects.requireNonNull(timeSlot, "timeSlot");

this.bulkWriter = Objects.requireNonNull(bulkWriter, "outboundBulkWriter");
}


@Override
public void update(
String srcServiceName, String srcApplicationName, ServiceType srcApplicationType,
String destServiceName, String destApplicationName, ServiceType destApplicationType,
String srcHost, int elapsed, boolean isError
) {
// outbound (rowKey src -> columnName dest)
Objects.requireNonNull(destServiceName, "destServiceName");
Objects.requireNonNull(srcServiceName, "srcServiceName");
Objects.requireNonNull(destApplicationName, "destApplicationName");
Objects.requireNonNull(srcServiceName, "srcApplicationName");

if (logger.isDebugEnabled()) {
logger.debug("[Outbound] {} {}({})[{}] -> {} {}({})",
srcServiceName, srcApplicationName, srcApplicationType, srcHost,
destServiceName, destApplicationName, destApplicationType
);
}

final long acceptedTime = acceptedTimeService.getAcceptedTime();
final long rowTimeSlot = timeSlot.getTimeSlot(acceptedTime);

// rowKey is src in outbound
final RowKey srcRowKey = new ApplicationMapRowKey(srcServiceName, srcApplicationType.getCode(), srcApplicationName, rowTimeSlot);

// columnName is dest in outbound
final short destSlotNumber = ApplicationMapStatisticsUtils.getSlotNumber(destApplicationType, elapsed, isError);
HistogramSchema histogramSchema = destApplicationType.getHistogramSchema();

final ColumnName destColumnName = new ApplicationMapColumnName(destServiceName, destApplicationType.getCode(), destApplicationName, destSlotNumber);
this.bulkWriter.increment(srcRowKey, destColumnName);

if (mapLinkConfiguration.isEnableAvg()) {
final ColumnName sumColumnName = new ApplicationMapColumnName(destServiceName, destApplicationType.getCode(), destApplicationName, histogramSchema.getSumStatSlot().getSlotTime());
this.bulkWriter.increment(srcRowKey, sumColumnName, elapsed);
}
if (mapLinkConfiguration.isEnableMax()) {
final ColumnName maxColumnName = new ApplicationMapColumnName(destServiceName, destApplicationType.getCode(), destApplicationName, histogramSchema.getMaxStatSlot().getSlotTime());
this.bulkWriter.updateMax(srcRowKey, maxColumnName, elapsed);
}
}


@Override
public void flushLink() {
this.bulkWriter.flushLink();
}

@Override
public void flushAvgMax() {
this.bulkWriter.flushAvgMax();
}

}
Loading

0 comments on commit 16dc9c7

Please sign in to comment.