Skip to content

Commit

Permalink
[pinpoint-apm#10882] Add ServiceName + ApplicationName based ServerMap
Browse files Browse the repository at this point in the history
  • Loading branch information
intr3p1d committed Apr 11, 2024
1 parent 87c2dce commit debc29f
Show file tree
Hide file tree
Showing 36 changed files with 2,140 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2024 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.pinpoint.collector.dao;

import com.navercorp.pinpoint.common.trace.ServiceType;

/**
* @author intr3p1d
*/
public interface MapStatisticsInboundDao extends CachedStatisticsDao {
// src -> dest
// inbound (rowKey dest <- columnName src)
// outbound (rowKey src -> columnName dest)
void update(
String srcServiceGroupName, String srcApplicationName, ServiceType srcServiceType,
String destServiceGroupName, String destApplicationName, ServiceType destServiceType,
String srcHost, int elapsed, boolean isError
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2024 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.pinpoint.collector.dao;

import com.navercorp.pinpoint.common.trace.ServiceType;

/**
* @author intr3p1d
*/
public interface MapStatisticsOutboundDao extends CachedStatisticsDao {
// src -> dest
// inbound (rowKey dest <- columnName src)
// outbound (rowKey src -> columnName dest)
void update(
String destServiceGroupName, String destApplicationName, ServiceType destServiceType,
String srcServiceGroupName, String srcApplicationName, ServiceType srcServiceType,
String srcHost, int elapsed, boolean isError
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2024 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.pinpoint.collector.dao;

import com.navercorp.pinpoint.common.trace.ServiceType;

/**
* @author intr3p1d
*/
public interface MapStatisticsSelfDao extends CachedStatisticsDao {
void received(String serviceGroup, String applicationName, ServiceType serviceType, int elapsed, boolean isError);

void updatePing(String serviceGroup, String applicationName, ServiceType serviceType, int elapsed, boolean isError);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright 2024 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.pinpoint.collector.dao.hbase;

import com.navercorp.pinpoint.collector.dao.MapStatisticsInboundDao;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.BulkWriter;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.ColumnName;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.MapLinkConfiguration;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.RowKey;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.ServiceGroupColumnName;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.ServiceGroupRowKey;
import com.navercorp.pinpoint.common.server.util.AcceptedTimeService;
import com.navercorp.pinpoint.common.server.util.ApplicationMapStatisticsUtils;
import com.navercorp.pinpoint.common.server.util.TimeSlot;
import com.navercorp.pinpoint.common.trace.HistogramSchema;
import com.navercorp.pinpoint.common.trace.ServiceType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.util.Objects;

/**
* @author intr3p1d
*/
@Repository
public class HbaseMapStatisticsInboundDao implements MapStatisticsInboundDao {

private final Logger logger = LogManager.getLogger(this.getClass());

private final AcceptedTimeService acceptedTimeService;

private final TimeSlot timeSlot;
private final IgnoreStatFilter ignoreStatFilter;
private final BulkWriter bulkWriter;
private final MapLinkConfiguration mapLinkConfiguration;

public HbaseMapStatisticsInboundDao(
MapLinkConfiguration mapLinkConfiguration,
IgnoreStatFilter ignoreStatFilter,
AcceptedTimeService acceptedTimeService,
TimeSlot timeSlot,
@Qualifier("inboundBulkWriter") BulkWriter bulkWriter
) {
this.mapLinkConfiguration = Objects.requireNonNull(mapLinkConfiguration, "mapLinkConfiguration");
this.ignoreStatFilter = Objects.requireNonNull(ignoreStatFilter, "ignoreStatFilter");
this.acceptedTimeService = Objects.requireNonNull(acceptedTimeService, "acceptedTimeService");
this.timeSlot = Objects.requireNonNull(timeSlot, "timeSlot");

this.bulkWriter = Objects.requireNonNull(bulkWriter, "inboundBulkWriter");
}


@Override
public void update(
String srcServiceGroupName, String srcApplicationName, ServiceType srcServiceType,
String destServiceGroupName, String destApplicationName, ServiceType destServiceType,
String srcHost, int elapsed, boolean isError
) {
Objects.requireNonNull(srcServiceGroupName, "srcServiceGroupName");
Objects.requireNonNull(destServiceGroupName, "destServiceGroupName");
Objects.requireNonNull(srcApplicationName, "srcApplicationName");
Objects.requireNonNull(destServiceGroupName, "destApplicationName");

if (logger.isDebugEnabled()) {
logger.debug("[Inbound] {} {}({})[{}] <- {} {}({})",
destServiceGroupName, destApplicationName, destServiceType, srcHost,
srcServiceGroupName, srcApplicationName, srcServiceType
);
}


// TODO dest, src parameter normalization
if (ignoreStatFilter.filter(srcServiceType, srcHost)) {
logger.debug("[Ignore-Inbound] {} {}({})[{}] <- {} {}({})",
destServiceGroupName, destApplicationName, destServiceType, srcHost,
srcServiceGroupName, srcApplicationName, srcServiceType
);
return;
}

final long acceptedTime = acceptedTimeService.getAcceptedTime();
final long rowTimeSlot = timeSlot.getTimeSlot(acceptedTime);

// rowKey is dest in inbound
final RowKey destRowKey = new ServiceGroupRowKey(destServiceGroupName, destServiceType.getCode(), destApplicationName, rowTimeSlot);

// columnName is src in outbound
final short srcSlotNumber = ApplicationMapStatisticsUtils.getSlotNumber(srcServiceType, elapsed, isError);
HistogramSchema histogramSchema = srcServiceType.getHistogramSchema();

final ColumnName srcColumnName = new ServiceGroupColumnName(srcServiceGroupName, srcServiceType.getCode(), srcApplicationName, srcSlotNumber);
this.bulkWriter.increment(destRowKey, srcColumnName);

if (mapLinkConfiguration.isEnableAvg()) {
final ColumnName sumColumnName = new ServiceGroupColumnName(srcServiceGroupName, srcServiceType.getCode(), srcApplicationName, histogramSchema.getSumStatSlot().getSlotTime());
this.bulkWriter.increment(destRowKey, sumColumnName, elapsed);
}
if (mapLinkConfiguration.isEnableMax()) {
final ColumnName maxColumnName = new ServiceGroupColumnName(srcServiceGroupName, srcServiceType.getCode(), srcApplicationName, histogramSchema.getMaxStatSlot().getSlotTime());
this.bulkWriter.updateMax(destRowKey, maxColumnName, elapsed);
}

}

@Override
public void flushLink() {
this.bulkWriter.flushLink();
}

@Override
public void flushAvgMax() {
this.bulkWriter.flushAvgMax();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright 2024 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.pinpoint.collector.dao.hbase;

import com.navercorp.pinpoint.collector.dao.MapStatisticsOutboundDao;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.BulkWriter;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.ColumnName;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.MapLinkConfiguration;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.RowKey;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.ServiceGroupColumnName;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.ServiceGroupRowKey;
import com.navercorp.pinpoint.common.server.util.AcceptedTimeService;
import com.navercorp.pinpoint.common.server.util.ApplicationMapStatisticsUtils;
import com.navercorp.pinpoint.common.server.util.TimeSlot;
import com.navercorp.pinpoint.common.trace.HistogramSchema;
import com.navercorp.pinpoint.common.trace.ServiceType;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.util.Objects;

/**
* @author intr3p1d
*/
@Repository
public class HbaseMapStatisticsOutboundDao implements MapStatisticsOutboundDao {

private final Logger logger = LogManager.getLogger(this.getClass());

private final AcceptedTimeService acceptedTimeService;

private final TimeSlot timeSlot;

private final BulkWriter bulkWriter;
private final MapLinkConfiguration mapLinkConfiguration;

public HbaseMapStatisticsOutboundDao(
MapLinkConfiguration mapLinkConfiguration,
IgnoreStatFilter ignoreStatFilter,
AcceptedTimeService acceptedTimeService, TimeSlot timeSlot,
@Qualifier("outboundBulkWriter") BulkWriter bulkWriter
) {
this.mapLinkConfiguration = Objects.requireNonNull(mapLinkConfiguration, "mapLinkConfiguration");
this.acceptedTimeService = Objects.requireNonNull(acceptedTimeService, "acceptedTimeService");
this.timeSlot = Objects.requireNonNull(timeSlot, "timeSlot");

this.bulkWriter = Objects.requireNonNull(bulkWriter, "outboundBulkWriter");
}


@Override
public void update(
String destServiceGroupName, String destApplicationName, ServiceType destServiceType,
String srcServiceGroupName, String srcApplicationName, ServiceType srcServiceType,
String srcHost, int elapsed, boolean isError
) {
// outbound (rowKey src -> columnName dest)
Objects.requireNonNull(destServiceGroupName, "destServiceGroupName");
Objects.requireNonNull(srcServiceGroupName, "srcServiceGroupName");
Objects.requireNonNull(destApplicationName, "destApplicationName");
Objects.requireNonNull(srcServiceGroupName, "srcApplicationName");

if (logger.isDebugEnabled()) {
logger.debug("[Outbound] {} {}({})[{}] -> {} {}({})",
srcServiceGroupName, srcApplicationName, srcServiceType, srcHost,
destServiceGroupName, destApplicationName, destServiceType
);
}

// there may be no endpoint in case of httpclient
srcHost = StringUtils.defaultString(srcHost);

final long acceptedTime = acceptedTimeService.getAcceptedTime();
final long rowTimeSlot = timeSlot.getTimeSlot(acceptedTime);

// rowKey is src in outbound
final RowKey srcRowKey = new ServiceGroupRowKey(srcServiceGroupName, srcServiceType.getCode(), srcApplicationName, rowTimeSlot);

// columnName is dest in outbound
final short destSlotNumber = ApplicationMapStatisticsUtils.getSlotNumber(destServiceType, elapsed, isError);
HistogramSchema histogramSchema = destServiceType.getHistogramSchema();

final ColumnName destColumnName = new ServiceGroupColumnName(destServiceGroupName, destServiceType.getCode(), destApplicationName, destSlotNumber);
this.bulkWriter.increment(srcRowKey, destColumnName);

if (mapLinkConfiguration.isEnableAvg()) {
final ColumnName sumColumnName = new ServiceGroupColumnName(destServiceGroupName, destServiceType.getCode(), destApplicationName, histogramSchema.getSumStatSlot().getSlotTime());
this.bulkWriter.increment(srcRowKey, sumColumnName, elapsed);
}
if (mapLinkConfiguration.isEnableMax()) {
final ColumnName maxColumnName = new ServiceGroupColumnName(destServiceGroupName, destServiceType.getCode(), destApplicationName, histogramSchema.getMaxStatSlot().getSlotTime());
this.bulkWriter.updateMax(srcRowKey, maxColumnName, elapsed);
}
}


@Override
public void flushLink() {
this.bulkWriter.flushLink();
}

@Override
public void flushAvgMax() {
this.bulkWriter.flushAvgMax();
}

}

0 comments on commit debc29f

Please sign in to comment.