Skip to content

Commit

Permalink
[ISSUE apache#494] support prometheus
Browse files Browse the repository at this point in the history
  • Loading branch information
Slideee committed May 6, 2023
1 parent 517bcbb commit 59de3e9
Show file tree
Hide file tree
Showing 9 changed files with 479 additions and 1 deletion.
8 changes: 8 additions & 0 deletions metric-exporter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_dropwizard</artifactId>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.connect.metrics;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
import io.prometheus.client.dropwizard.samplebuilder.DefaultSampleBuilder;
import io.prometheus.client.dropwizard.samplebuilder.SampleBuilder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.rocketmq.connect.metrics.stats.Stat;

/**
* Collect Dropwizard metrics from a MetricRegistry.
*/
public class DropwizardExports extends io.prometheus.client.Collector implements io.prometheus.client.Collector.Describable {
private static final Logger LOGGER = Logger.getLogger(DropwizardExports.class.getName());
private MetricRegistry registry;
private MetricFilter metricFilter;
private SampleBuilder sampleBuilder;

/**
* Creates a new DropwizardExports with a {@link DefaultSampleBuilder} and {@link MetricFilter#ALL}.
*
* @param registry a metric registry to export in prometheus.
*/
public DropwizardExports(MetricRegistry registry) {
this.registry = registry;
this.metricFilter = MetricFilter.ALL;
this.sampleBuilder = new DefaultSampleBuilder();
}

/**
* Creates a new DropwizardExports with a {@link DefaultSampleBuilder} and custom {@link MetricFilter}.
*
* @param registry a metric registry to export in prometheus.
* @param metricFilter a custom metric filter.
*/
public DropwizardExports(MetricRegistry registry, MetricFilter metricFilter) {
this.registry = registry;
this.metricFilter = metricFilter;
this.sampleBuilder = new DefaultSampleBuilder();
}

/**
* @param registry a metric registry to export in prometheus.
* @param sampleBuilder sampleBuilder to use to create prometheus samples.
*/
public DropwizardExports(MetricRegistry registry, SampleBuilder sampleBuilder) {
this.registry = registry;
this.metricFilter = MetricFilter.ALL;
this.sampleBuilder = sampleBuilder;
}

/**
* @param registry a metric registry to export in prometheus.
* @param metricFilter a custom metric filter.
* @param sampleBuilder sampleBuilder to use to create prometheus samples.
*/
public DropwizardExports(MetricRegistry registry, MetricFilter metricFilter, SampleBuilder sampleBuilder) {
this.registry = registry;
this.metricFilter = metricFilter;
this.sampleBuilder = sampleBuilder;
}

private static String getHelpMessage(String metricName, Metric metric) {
return String.format("Generated from Dropwizard metric import (metric=%s, type=%s)", metricName, metric.getClass().getName());
}

/**
* Export counter as Prometheus <a href="https://prometheus.io/docs/concepts/metric_types/#gauge">Gauge</a>.
*/
MetricFamilySamples fromCounter(String dropwizardName, Counter counter) {
MetricFamilySamples.Sample sample = sampleBuilder.createSample(dropwizardName, "", new ArrayList<String>(), new ArrayList<String>(), new Long(counter.getCount()).doubleValue());
return new MetricFamilySamples(sample.name, Type.GAUGE, getHelpMessage(dropwizardName, counter), Arrays.asList(sample));
}

/**
* Export gauge as a prometheus gauge.
*/
MetricFamilySamples fromGauge(String dropwizardName, Gauge gauge) {
Object obj = gauge.getValue();
double value;
if (obj instanceof Number) {
value = ((Number) obj).doubleValue();
} else if (obj instanceof Boolean) {
value = ((Boolean) obj) ? 1 : 0;
} else {
LOGGER.log(Level.FINE, String.format("Invalid type for Gauge %s: %s", sanitizeMetricName(dropwizardName), obj == null ? "null" : obj.getClass().getName()));
return null;
}
MetricFamilySamples.Sample sample = sampleBuilder.createSample(dropwizardName, "", new ArrayList<String>(), new ArrayList<String>(), value);
return new MetricFamilySamples(sample.name, Type.GAUGE, getHelpMessage(dropwizardName, gauge), Arrays.asList(sample));
}

/**
* Export a histogram snapshot as a prometheus SUMMARY.
*
* @param dropwizardName metric name.
* @param snapshot the histogram snapshot.
* @param count the total sample count for this snapshot.
* @param factor a factor to apply to histogram values.
*/
MetricFamilySamples fromSnapshotAndCount(String dropwizardName, Snapshot snapshot, long count, double factor,
String helpMessage) {
MetricName metricName = MetricUtils.stringToMetricName(dropwizardName);
Stat.HistogramType histogramType = Stat.HistogramType.valueOf(metricName.getType());
List<MetricFamilySamples.Sample> samples = new ArrayList<>();
switch (histogramType) {
case Avg:
samples = Arrays.asList(sampleBuilder.createSample(dropwizardName, "", new ArrayList<String>(), new ArrayList<String>(), snapshot.getMean() * factor));
break;
case Min:
samples = Arrays.asList(sampleBuilder.createSample(dropwizardName, "", new ArrayList<String>(), new ArrayList<String>(), snapshot.getMin() * factor));
break;
case Max:
samples = Arrays.asList(sampleBuilder.createSample(dropwizardName, "", new ArrayList<String>(), new ArrayList<String>(), snapshot.getMax() * factor));
break;
case Percentile_75th:
samples = Arrays.asList(sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.75"), snapshot.get75thPercentile() * factor));
break;
case Percentile_95th:
samples = Arrays.asList(sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.95"), snapshot.get95thPercentile() * factor));
break;
case Percentile_98th:
samples = Arrays.asList(sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.98"), snapshot.get98thPercentile() * factor));
break;
case Percentile_99th:
samples = Arrays.asList(sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.99"), snapshot.get99thPercentile() * factor));
break;
case Percentile_999th:
samples = Arrays.asList(sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.999"), snapshot.get999thPercentile() * factor));
break;
default:
samples = Arrays.asList(sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.5"), snapshot.getMedian() * factor), sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.5"), snapshot.getMedian() * factor), sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.75"), snapshot.get75thPercentile() * factor), sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.95"), snapshot.get95thPercentile() * factor), sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.98"), snapshot.get98thPercentile() * factor), sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.99"), snapshot.get99thPercentile() * factor), sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.999"), snapshot.get999thPercentile() * factor), sampleBuilder.createSample(dropwizardName, "_count", new ArrayList<String>(), new ArrayList<String>(), count));

}
return new MetricFamilySamples(samples.get(0).name, Type.SUMMARY, helpMessage, samples);
}

/**
* Convert histogram snapshot.
*/
MetricFamilySamples fromHistogram(String dropwizardName, Histogram histogram) {
return fromSnapshotAndCount(dropwizardName, histogram.getSnapshot(), histogram.getCount(), 1.0, getHelpMessage(dropwizardName, histogram));
}

/**
* Export Dropwizard Timer as a histogram. Use TIME_UNIT as time unit.
*/
MetricFamilySamples fromTimer(String dropwizardName, Timer timer) {
return fromSnapshotAndCount(dropwizardName, timer.getSnapshot(), timer.getCount(), 1.0D / TimeUnit.SECONDS.toNanos(1L), getHelpMessage(dropwizardName, timer));
}

/**
* Export a Meter as as prometheus COUNTER.
*/
MetricFamilySamples fromMeter(String dropwizardName, Meter meter) {
MetricName metricName = MetricUtils.stringToMetricName(dropwizardName);
final MetricFamilySamples.Sample sample = sampleBuilder.createSample(dropwizardName, "", new ArrayList<String>(), new ArrayList<String>(), MetricUtils.getMeterValue(metricName, meter));
return new MetricFamilySamples(sample.name, Type.COUNTER, getHelpMessage(dropwizardName, meter), Arrays.asList(sample));
}

@Override
public List<MetricFamilySamples> collect() {
Map<String, MetricFamilySamples> mfSamplesMap = new HashMap<String, MetricFamilySamples>();

for (SortedMap.Entry<String, Gauge> entry : registry.getGauges(metricFilter).entrySet()) {
addToMap(mfSamplesMap, fromGauge(entry.getKey(), entry.getValue()));
}
for (SortedMap.Entry<String, Counter> entry : registry.getCounters(metricFilter).entrySet()) {
addToMap(mfSamplesMap, fromCounter(entry.getKey(), entry.getValue()));
}
for (SortedMap.Entry<String, Histogram> entry : registry.getHistograms(metricFilter).entrySet()) {
addToMap(mfSamplesMap, fromHistogram(entry.getKey(), entry.getValue()));
}
for (SortedMap.Entry<String, Timer> entry : registry.getTimers(metricFilter).entrySet()) {
addToMap(mfSamplesMap, fromTimer(entry.getKey(), entry.getValue()));
}
for (SortedMap.Entry<String, Meter> entry : registry.getMeters(metricFilter).entrySet()) {
addToMap(mfSamplesMap, fromMeter(entry.getKey(), entry.getValue()));
}
return new ArrayList<MetricFamilySamples>(mfSamplesMap.values());
}

private void addToMap(Map<String, MetricFamilySamples> mfSamplesMap, MetricFamilySamples newMfSamples) {
if (newMfSamples != null) {
MetricFamilySamples currentMfSamples = mfSamplesMap.get(newMfSamples.name);
if (currentMfSamples == null) {
mfSamplesMap.put(newMfSamples.name, newMfSamples);
} else {
Set<MetricFamilySamples.Sample> samples = new HashSet<MetricFamilySamples.Sample>(currentMfSamples.samples);
samples.addAll(newMfSamples.samples);
List<MetricFamilySamples.Sample> list = new ArrayList<>(samples);
mfSamplesMap.put(newMfSamples.name, new MetricFamilySamples(newMfSamples.name, currentMfSamples.type, currentMfSamples.help, list));
}
}
}

@Override
public List<MetricFamilySamples> describe() {
return new ArrayList<MetricFamilySamples>();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.connect.metrics;

import io.prometheus.client.Collector;
import io.prometheus.client.dropwizard.samplebuilder.SampleBuilder;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.StringUtils;

public class PrometheusSampleBuilder implements SampleBuilder {
private static final List<String> SOURCE_TASK_LABEL_NAMES = Arrays.asList("metric_group", "data_type", "connector", "task");

@Override
public Collector.MetricFamilySamples.Sample createSample(String dropwizardName, String nameSuffix,
List<String> additionalLabelNames, List<String> additionalLabelValues, double value) {
String suffix = nameSuffix == null ? "" : nameSuffix;
List<String> labelValues = sanitizeLabelValues(dropwizardName);
return new Collector.MetricFamilySamples.Sample(sanitizeMetricName(dropwizardName + suffix), SOURCE_TASK_LABEL_NAMES, labelValues, value);
}

public String sanitizeMetricName(String dropwizardName) {
return dropwizardName.split(":")[1].split(",")[1].replaceAll("-", "_");
}

public List<String> sanitizeLabelValues(String dropwizardName) {
String[] var = dropwizardName.split(":");
String[] split = var[1].split(",");

String metricGroup = split[0];
String metricName = split[1].replaceAll("-", "_");
String dateType = split[2];
String var3 = split[3];
String connectorName = StringUtils.EMPTY;
if (!StringUtils.equals(var3, "")) {
connectorName = var3.substring(var3.indexOf("=") + 1);
}
String var4 = split[4];
String taskid = StringUtils.EMPTY;
if (!StringUtils.equals(var4, "")) {
taskid = var4.substring(var4.indexOf("=") + 1);
}
return Arrays.asList(metricGroup, dateType, connectorName, taskid);
}
}
12 changes: 12 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<commons-lang3.version>3.12.0</commons-lang3.version>
<prometheus.version>0.6.0</prometheus.version>

</properties>
<dependencyManagement>
<dependencies>
Expand Down Expand Up @@ -202,6 +204,16 @@
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient</artifactId>
<version>${prometheus.version}</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_dropwizard</artifactId>
<version>${prometheus.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
5 changes: 4 additions & 1 deletion rocketmq-connect-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,10 @@
<artifactId>metrics-core</artifactId>
<version>4.2.0</version>
</dependency>

<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>metric-exporter</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public class WorkerConfig {
*/
private int httpPort = 8082;

private int exporterPort = 5557;

/**
* plugin paths config;
* Multiple use ',' split
Expand Down Expand Up @@ -591,6 +593,14 @@ public void setStateManagementService(String stateManagementService) {
this.stateManagementService = stateManagementService;
}

public int getExporterPort() {
return exporterPort;
}

public void setExporterPort(int exporterPort) {
this.exporterPort = exporterPort;
}

@Override
public String toString() {
return "WorkerConfig{" +
Expand Down Expand Up @@ -638,6 +648,7 @@ public String toString() {
", configManagementService='" + configManagementService + '\'' +
", positionManagementService='" + positionManagementService + '\'' +
", stateManagementService='" + stateManagementService + '\'' +
", exporterPort=" + exporterPort +
'}';
}
}
Loading

0 comments on commit 59de3e9

Please sign in to comment.