Skip to content

Commit

Permalink
[#noissue] Remove duplicate code in ServerMap
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed May 11, 2024
1 parent 8c8bced commit d52a33d
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import com.navercorp.pinpoint.common.hbase.ConnectionFactoryBean;
import com.navercorp.pinpoint.common.hbase.HbaseTemplate;
import com.navercorp.pinpoint.common.hbase.RowMapper;
import com.navercorp.pinpoint.common.hbase.TableFactory;
import com.navercorp.pinpoint.common.hbase.TableNameProvider;
import com.navercorp.pinpoint.common.hbase.async.AsyncConnectionFactoryBean;
import com.navercorp.pinpoint.common.hbase.async.AsyncTableCustomizer;
import com.navercorp.pinpoint.common.hbase.async.AsyncTableFactory;
Expand All @@ -28,6 +30,18 @@
import com.navercorp.pinpoint.common.hbase.util.ScanMetricReporter;
import com.navercorp.pinpoint.common.server.executor.ExecutorCustomizer;
import com.navercorp.pinpoint.common.server.executor.ExecutorProperties;
import com.navercorp.pinpoint.web.applicationmap.dao.MapResponseDao;
import com.navercorp.pinpoint.web.applicationmap.dao.MapStatisticsCalleeDao;
import com.navercorp.pinpoint.web.applicationmap.dao.MapStatisticsCallerDao;
import com.navercorp.pinpoint.web.applicationmap.dao.hbase.HbaseMapResponseTimeDao;
import com.navercorp.pinpoint.web.applicationmap.dao.hbase.HbaseMapStatisticsCalleeDao;
import com.navercorp.pinpoint.web.applicationmap.dao.hbase.HbaseMapStatisticsCallerDao;
import com.navercorp.pinpoint.web.applicationmap.dao.hbase.MapScanFactory;
import com.navercorp.pinpoint.web.applicationmap.dao.mapper.RowMapperFactory;
import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMap;
import com.navercorp.pinpoint.web.vo.RangeFactory;
import com.navercorp.pinpoint.web.vo.ResponseTime;
import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.Connection;
Expand All @@ -38,22 +52,19 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Import;
import org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean;

import java.util.Optional;
import java.util.concurrent.ExecutorService;

@org.springframework.context.annotation.Configuration
@ComponentScan({
"com.navercorp.pinpoint.web.applicationmap.dao.hbase",
})
@Import({
MapMapperConfiguration.class
})
public class MapHbaseConfiguration {
private final Logger logger = LogManager.getLogger(MapHbaseConfiguration.class);

private final HbaseTemplateConfiguration config = new HbaseTemplateConfiguration();

public MapHbaseConfiguration() {
Expand All @@ -62,7 +73,7 @@ public MapHbaseConfiguration() {

@Bean
public FactoryBean<ExecutorService> mapHbaseThreadPool(@Qualifier("hbaseExecutorCustomizer") ExecutorCustomizer<ThreadPoolExecutorFactoryBean> executorCustomizer,
@Qualifier("hbaseClientExecutorProperties") ExecutorProperties properties) {
@Qualifier("hbaseClientExecutorProperties") ExecutorProperties properties) {
ThreadPoolExecutorFactoryBean factory = new ThreadPoolExecutorFactoryBean();
executorCustomizer.customize(factory, properties);
factory.setThreadNamePrefix("Map-" + factory.getThreadNamePrefix());
Expand All @@ -71,8 +82,8 @@ public FactoryBean<ExecutorService> mapHbaseThreadPool(@Qualifier("hbaseExecutor

@Bean
public FactoryBean<Connection> mapHbaseConnection(Configuration configuration,
User user,
@Qualifier("mapHbaseThreadPool") ExecutorService executorService) {
User user,
@Qualifier("mapHbaseThreadPool") ExecutorService executorService) {
return new ConnectionFactoryBean(configuration, user, executorService);
}

Expand Down Expand Up @@ -103,5 +114,45 @@ public HbaseTemplate mapHbaseTemplate(@Qualifier("hbaseConfiguration") Configura
return config.hbaseTemplate(configurable, tableFactory, asyncTableFactory, parallelScan, nativeAsync, reporter);
}

@Bean
public MapScanFactory mapScanFactory(RangeFactory rangeFactory) {
return new MapScanFactory(rangeFactory);
}

@Bean
public MapResponseDao hbaseMapResponseTimeDao(@Qualifier("mapHbaseTemplate")
HbaseTemplate hbaseTemplate,
TableNameProvider tableNameProvider,
@Qualifier("responseTimeMapper")
RowMapper<ResponseTime> responseTimeMapper,
MapScanFactory mapScanFactory,
@Qualifier("statisticsSelfRowKeyDistributor")
RowKeyDistributorByHashPrefix rowKeyDistributor) {
return new HbaseMapResponseTimeDao(hbaseTemplate, tableNameProvider, responseTimeMapper, mapScanFactory, rowKeyDistributor);
}

@Bean
public MapStatisticsCalleeDao hbaseMapStatisticsCalleeDao(@Qualifier("mapHbaseTemplate")
HbaseTemplate hbaseTemplate,
TableNameProvider tableNameProvider,
@Qualifier("mapCalleeMapper")
RowMapperFactory<LinkDataMap> calleeMapper,
MapScanFactory mapScanFactory,
@Qualifier("statisticsCalleeRowKeyDistributor")
RowKeyDistributorByHashPrefix rowKeyDistributor) {
return new HbaseMapStatisticsCalleeDao(hbaseTemplate, tableNameProvider, calleeMapper, mapScanFactory, rowKeyDistributor);
}

@Bean
public MapStatisticsCallerDao hbaseMapStatisticsCallerDao(@Qualifier("mapHbaseTemplate")
HbaseTemplate hbaseTemplate,
TableNameProvider tableNameProvider,
@Qualifier("mapCallerMapper")
RowMapperFactory<LinkDataMap> callerMapper,
MapScanFactory mapScanFactory,
@Qualifier("statisticsCallerRowKeyDistributor")
RowKeyDistributorByHashPrefix rowKeyDistributor) {
return new HbaseMapStatisticsCallerDao(hbaseTemplate, tableNameProvider, callerMapper, mapScanFactory, rowKeyDistributor);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
import com.navercorp.pinpoint.web.applicationmap.dao.mapper.MapStatisticsCalleeMapper;
import com.navercorp.pinpoint.web.applicationmap.dao.mapper.MapStatisticsCallerMapper;
import com.navercorp.pinpoint.web.applicationmap.dao.mapper.ResponseTimeMapper;
import com.navercorp.pinpoint.web.applicationmap.dao.mapper.RowMapperFactory;
import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMap;
import com.navercorp.pinpoint.web.component.ApplicationFactory;
import com.navercorp.pinpoint.web.util.TimeWindowFunction;
import com.navercorp.pinpoint.web.vo.ResponseTime;
import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix;
import org.springframework.beans.factory.annotation.Qualifier;
Expand All @@ -20,33 +20,18 @@
public class MapMapperConfiguration {

@Bean
public RowMapper<LinkDataMap> mapStatisticsCallerMapper(ApplicationFactory applicationFactory,
@Qualifier("statisticsCallerRowKeyDistributor")
RowKeyDistributorByHashPrefix rowKeyDistributor) {
return new MapStatisticsCallerMapper(applicationFactory, rowKeyDistributor, LinkFilter::skip, TimeWindowFunction.identity());
public RowMapperFactory<LinkDataMap> mapCallerMapper(ApplicationFactory applicationFactory,
@Qualifier("statisticsCallerRowKeyDistributor")
RowKeyDistributorByHashPrefix rowKeyDistributor) {
return (windowFunction) -> new MapStatisticsCallerMapper(applicationFactory, rowKeyDistributor, LinkFilter::skip, windowFunction);
}

@Bean
public RowMapper<LinkDataMap> mapStatisticsCallerTimeAggregatedMapper(ApplicationFactory applicationFactory,
@Qualifier("statisticsCallerRowKeyDistributor")
RowKeyDistributorByHashPrefix rowKeyDistributor) {
return new MapStatisticsCallerMapper(applicationFactory, rowKeyDistributor, LinkFilter::skip, TimeWindowFunction.ALL_IN_ONE);
}

@Bean
public RowMapper<LinkDataMap> mapStatisticsCalleeMapper(ServiceTypeRegistryService registry,
ApplicationFactory applicationFactory,
@Qualifier("statisticsCalleeRowKeyDistributor")
RowKeyDistributorByHashPrefix rowKeyDistributor) {
return new MapStatisticsCalleeMapper(registry, applicationFactory, rowKeyDistributor, LinkFilter::skip, TimeWindowFunction.identity());
}

@Bean
public RowMapper<LinkDataMap> mapStatisticsCalleeTimeAggregatedMapper(ServiceTypeRegistryService registry,
ApplicationFactory applicationFactory,
@Qualifier("statisticsCalleeRowKeyDistributor")
RowKeyDistributorByHashPrefix rowKeyDistributor) {
return new MapStatisticsCalleeMapper(registry, applicationFactory, rowKeyDistributor, LinkFilter::skip, TimeWindowFunction.ALL_IN_ONE);
public RowMapperFactory<LinkDataMap> mapCalleeMapper(ServiceTypeRegistryService registry,
ApplicationFactory applicationFactory,
@Qualifier("statisticsCalleeRowKeyDistributor")
RowKeyDistributorByHashPrefix rowKeyDistributor) {
return (windowFunction) -> new MapStatisticsCalleeMapper(registry, applicationFactory, rowKeyDistributor, LinkFilter::skip, windowFunction);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,15 @@
import com.navercorp.pinpoint.common.hbase.HbaseOperations;
import com.navercorp.pinpoint.common.hbase.RowMapper;
import com.navercorp.pinpoint.common.hbase.TableNameProvider;
import com.navercorp.pinpoint.common.server.util.ApplicationMapStatisticsUtils;
import com.navercorp.pinpoint.common.server.util.time.Range;
import com.navercorp.pinpoint.web.applicationmap.dao.MapResponseDao;
import com.navercorp.pinpoint.web.vo.Application;
import com.navercorp.pinpoint.web.vo.RangeFactory;
import com.navercorp.pinpoint.web.vo.ResponseTime;
import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.util.ArrayList;
Expand All @@ -51,27 +48,25 @@ public class HbaseMapResponseTimeDao implements MapResponseDao {

private static final HbaseColumnFamily.SelfStatMap DESCRIPTOR = HbaseColumnFamily.MAP_STATISTICS_SELF_VER2_COUNTER;

private int scanCacheSize = 40;

private final RowMapper<ResponseTime> responseTimeMapper;

private final HbaseOperations hbaseOperations;
private final TableNameProvider tableNameProvider;

private final RangeFactory rangeFactory;
private final MapScanFactory scanFactory;

private final RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix;
private final RowKeyDistributorByHashPrefix rowKeyDistributor;

public HbaseMapResponseTimeDao(@Qualifier("mapHbaseTemplate") HbaseOperations hbaseOperations,
public HbaseMapResponseTimeDao(HbaseOperations hbaseOperations,
TableNameProvider tableNameProvider,
@Qualifier("responseTimeMapper") RowMapper<ResponseTime> responseTimeMapper,
RangeFactory rangeFactory,
@Qualifier("statisticsSelfRowKeyDistributor") RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix) {
RowMapper<ResponseTime> responseTimeMapper,
MapScanFactory scanFactory,
RowKeyDistributorByHashPrefix rowKeyDistributor) {
this.hbaseOperations = Objects.requireNonNull(hbaseOperations, "hbaseOperations");
this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider");
this.responseTimeMapper = Objects.requireNonNull(responseTimeMapper, "responseTimeMapper");
this.rangeFactory = Objects.requireNonNull(rangeFactory, "rangeFactory");
this.rowKeyDistributorByHashPrefix = Objects.requireNonNull(rowKeyDistributorByHashPrefix, "rowKeyDistributorByHashPrefix");
this.scanFactory = Objects.requireNonNull(scanFactory, "scanFactory");
this.rowKeyDistributor = Objects.requireNonNull(rowKeyDistributor, "rowKeyDistributor");
}


Expand All @@ -83,10 +78,10 @@ public List<ResponseTime> selectResponseTime(Application application, Range rang
logger.debug("selectResponseTime applicationName:{}, {}", application, range);
}

Scan scan = createScan(application, range, DESCRIPTOR.getName());
Scan scan = scanFactory.createScan("MapSelfScan", application, range, DESCRIPTOR.getName());

TableName mapStatisticsSelfTableName = tableNameProvider.getTableName(DESCRIPTOR.getTable());
List<ResponseTime> responseTimeList = hbaseOperations.findParallel(mapStatisticsSelfTableName, scan, rowKeyDistributorByHashPrefix, responseTimeMapper, MAP_STATISTICS_SELF_VER2_NUM_PARTITIONS);
List<ResponseTime> responseTimeList = hbaseOperations.findParallel(mapStatisticsSelfTableName, scan, rowKeyDistributor, responseTimeMapper, MAP_STATISTICS_SELF_VER2_NUM_PARTITIONS);

if (responseTimeList.isEmpty()) {
return new ArrayList<>();
Expand All @@ -95,24 +90,4 @@ public List<ResponseTime> selectResponseTime(Application application, Range rang
return responseTimeList;
}

private Scan createScan(Application application, Range range, byte[] family) {
range = rangeFactory.createStatisticsRange(range);
if (logger.isDebugEnabled()) {
logger.debug("scan time:{} ", range.prettyToString());
}

// start key is replaced by end key because timestamp has been reversed
byte[] startKey = ApplicationMapStatisticsUtils.makeRowKey(application.getName(), application.getServiceTypeCode(), range.getTo());
byte[] endKey = ApplicationMapStatisticsUtils.makeRowKey(application.getName(), application.getServiceTypeCode(), range.getFrom());

final Scan scan = new Scan();
scan.setCaching(this.scanCacheSize);
scan.withStartRow(startKey);
scan.withStopRow(endKey);
scan.addFamily(family);
scan.setId("ApplicationSelfScan");

return scan;
}

}

0 comments on commit d52a33d

Please sign in to comment.