Skip to content

Commit

Permalink
[pinpoint-apm#10882] Add server map module based on redis-timeseries
Browse files Browse the repository at this point in the history
  • Loading branch information
intr3p1d committed Jan 20, 2025
1 parent c8a7016 commit 711c711
Show file tree
Hide file tree
Showing 27 changed files with 1,267 additions and 43 deletions.
6 changes: 6 additions & 0 deletions collector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,12 @@
<artifactId>jakarta.annotation-api</artifactId>
</dependency>


<dependency>
<groupId>com.navercorp.pinpoint</groupId>
<artifactId>pinpoint-redis-timeseries</artifactId>
<version>3.1.0-SNAPSHOT</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.navercorp.pinpoint.collector;


import com.navercorp.pinpoint.collector.applicationmap.ApplicationMapModule;
import com.navercorp.pinpoint.collector.config.ClusterModule;
import com.navercorp.pinpoint.collector.config.CollectorCommonConfiguration;
import com.navercorp.pinpoint.collector.config.CollectorConfiguration;
Expand Down Expand Up @@ -35,6 +36,8 @@
GrpcSslModule.class,

RealtimeCollectorModule.class,

ApplicationMapModule.class,
})
@ComponentScan(basePackages = {
"com.navercorp.pinpoint.collector.handler",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2024 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.pinpoint.collector.applicationmap;

import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

/**
* @author intr3p1d
*/
@Configuration(proxyBeanMethods = false)
@ComponentScan(basePackages = {
"com.navercorp.pinpoint.collector.applicationmap",
"com.navercorp.pinpoint.collector.applicationmap.dao",
"com.navercorp.pinpoint.collector.applicationmap.redis",
"com.navercorp.pinpoint.collector.applicationmap.service",
})
public class ApplicationMapModule {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2024 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.pinpoint.collector.applicationmap.dao;

import com.navercorp.pinpoint.collector.dao.CachedStatisticsDao;
import com.navercorp.pinpoint.common.trace.ServiceType;

/**
* @author intr3p1d
*/
public interface InboundDao extends CachedStatisticsDao {
// src -> dest
// inbound (rowKey dest <- columnName src)
// outbound (rowKey src -> columnName dest)
void update(
String srcServiceName, String srcApplicationName, ServiceType srcApplicationType,
String destServiceName, String destApplicationName, ServiceType destApplicationType,
String srcHost, int elapsed, boolean isError
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2024 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.pinpoint.collector.applicationmap.dao;

import com.navercorp.pinpoint.collector.dao.CachedStatisticsDao;
import com.navercorp.pinpoint.common.trace.ServiceType;

/**
* @author intr3p1d
*/
public interface OutboundDao extends CachedStatisticsDao {
// src -> dest
// inbound (rowKey dest <- columnName src)
// outbound (rowKey src -> columnName dest)
void update(
String srcServiceName, String srcApplicationName, ServiceType srcApplicationType,
String destServiceName, String destApplicationName, ServiceType destApplicationType,
String srcHost, int elapsed, boolean isError
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2024 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.pinpoint.collector.applicationmap.dao;


import com.navercorp.pinpoint.collector.dao.CachedStatisticsDao;
import com.navercorp.pinpoint.common.trace.ServiceType;

/**
* @author intr3p1d
*/
public interface SelfDao extends CachedStatisticsDao {
void received(String serviceName, String applicationName, ServiceType applicationType, int elapsed, boolean isError);
void updatePing(String serviceName, String applicationName, ServiceType applicationType, int elapsed, boolean isError);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright 2024 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.pinpoint.collector.applicationmap.redis;

import com.navercorp.pinpoint.collector.applicationmap.dao.InboundDao;
import com.navercorp.pinpoint.collector.applicationmap.redis.schema.ApplicationMapTable;
import com.navercorp.pinpoint.collector.applicationmap.redis.schema.TimeSeriesKey;
import com.navercorp.pinpoint.collector.applicationmap.redis.schema.TimeSeriesValue;
import com.navercorp.pinpoint.collector.dao.hbase.IgnoreStatFilter;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.MapLinkConfiguration;
import com.navercorp.pinpoint.collector.applicationmap.redis.statistics.RedisBulkWriter;
import com.navercorp.pinpoint.common.server.util.AcceptedTimeService;
import com.navercorp.pinpoint.common.server.util.ApplicationMapStatisticsUtils;
import com.navercorp.pinpoint.common.server.util.TimeSlot;
import com.navercorp.pinpoint.common.trace.HistogramSchema;
import com.navercorp.pinpoint.common.trace.ServiceType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.util.Objects;

/**
* @author intr3p1d
*/
@Repository
public class RedisInboundDao implements InboundDao {

private final Logger logger = LogManager.getLogger(this.getClass());

private final AcceptedTimeService acceptedTimeService;
private final IgnoreStatFilter ignoreStatFilter;
private final RedisBulkWriter bulkWriter;
private final MapLinkConfiguration mapLinkConfiguration;

public RedisInboundDao(
MapLinkConfiguration mapLinkConfiguration,
AcceptedTimeService acceptedTimeService,
IgnoreStatFilter ignoreStatFilter,
@Qualifier("inboundBulkWriter") RedisBulkWriter bulkWriter
) {
this.mapLinkConfiguration = Objects.requireNonNull(mapLinkConfiguration, "mapLinkConfiguration");
this.acceptedTimeService = Objects.requireNonNull(acceptedTimeService, "acceptedTimeService");
this.ignoreStatFilter = Objects.requireNonNull(ignoreStatFilter, "ignoreStatFilter");
this.bulkWriter = Objects.requireNonNull(bulkWriter, "inboundBulkWriter");
}


@Override
public void update(
String srcServiceName, String srcApplicationName, ServiceType srcApplicationType,
String destServiceName, String destApplicationName, ServiceType destApplicationType,
String srcHost, int elapsed, boolean isError
) {
Objects.requireNonNull(srcServiceName, "srcServiceName");
Objects.requireNonNull(destServiceName, "destServiceName");
Objects.requireNonNull(srcApplicationName, "srcApplicationName");
Objects.requireNonNull(destServiceName, "destApplicationName");

if (logger.isDebugEnabled()) {
logger.debug("[Inbound] {} {}({}) <- {} {}({})[{}]",
destServiceName, destApplicationName, destApplicationType,
srcServiceName, srcApplicationName, srcApplicationType, srcHost
);
}

if (ignoreStatFilter.filter(srcApplicationType, srcHost)) {
logger.debug("[Ignore-Inbound] {} {}({}) <- {} {}({})[{}]",
destServiceName, destApplicationName, destApplicationType,
srcServiceName, srcApplicationName, srcApplicationType, srcHost
);
return;
}

final short srcSlotNumber = ApplicationMapStatisticsUtils.getSlotNumber(srcApplicationType, elapsed, isError);
HistogramSchema histogramSchema = srcApplicationType.getHistogramSchema();
final long acceptedTime = acceptedTimeService.getAcceptedTime();

// for inbound, main is destination
// and sub is source
final TimeSeriesKey applicationTypeKey = new TimeSeriesKey(
ApplicationMapTable.Inbound, "tenantId",
destServiceName, destApplicationName,
srcServiceName, srcApplicationName, srcSlotNumber
);
TimeSeriesValue addOne = new TimeSeriesValue(acceptedTime);
this.bulkWriter.increment(applicationTypeKey, addOne);

if (mapLinkConfiguration.isEnableAvg()) {
final TimeSeriesKey sumStatKey = new TimeSeriesKey(
ApplicationMapTable.Inbound, "tenantId",
destServiceName, destApplicationName,
srcServiceName, srcApplicationName,
histogramSchema.getSumStatSlot().getSlotTime()
);
final TimeSeriesValue sumValue = new TimeSeriesValue(acceptedTime);
this.bulkWriter.increment(sumStatKey, sumValue, elapsed);
}
if (mapLinkConfiguration.isEnableMax()) {
final TimeSeriesKey maxStatKey = new TimeSeriesKey(
ApplicationMapTable.Inbound, "tenantId",
destServiceName, destApplicationName,
srcServiceName, srcApplicationName,
histogramSchema.getMaxStatSlot().getSlotTime()
);
final TimeSeriesValue maxValue = new TimeSeriesValue(acceptedTime);
this.bulkWriter.updateMax(maxStatKey, maxValue, elapsed);
}

}

@Override
public void flushLink() {
this.bulkWriter.flushLink();
}

@Override
public void flushAvgMax() {
this.bulkWriter.flushAvgMax();
}

}
Loading

0 comments on commit 711c711

Please sign in to comment.