diff --git a/metric-exporter/pom.xml b/metric-exporter/pom.xml
index 658b28b54..988ffd813 100644
--- a/metric-exporter/pom.xml
+++ b/metric-exporter/pom.xml
@@ -41,6 +41,14 @@
com.google.guava
guava
+
+ io.prometheus
+ simpleclient_dropwizard
+
+
+ io.prometheus
+ simpleclient
+
diff --git a/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/DropwizardExports.java b/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/DropwizardExports.java
new file mode 100644
index 000000000..bbbc0ee48
--- /dev/null
+++ b/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/DropwizardExports.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.metrics;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import io.prometheus.client.dropwizard.samplebuilder.DefaultSampleBuilder;
+import io.prometheus.client.dropwizard.samplebuilder.SampleBuilder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.rocketmq.connect.metrics.stats.Stat;
+
+/**
+ * Collect Dropwizard metrics from a MetricRegistry.
+ */
+public class DropwizardExports extends io.prometheus.client.Collector implements io.prometheus.client.Collector.Describable {
+ private static final Logger LOGGER = Logger.getLogger(DropwizardExports.class.getName());
+ private MetricRegistry registry;
+ private MetricFilter metricFilter;
+ private SampleBuilder sampleBuilder;
+
+ /**
+ * Creates a new DropwizardExports with a {@link DefaultSampleBuilder} and {@link MetricFilter#ALL}.
+ *
+ * @param registry a metric registry to export in prometheus.
+ */
+ public DropwizardExports(MetricRegistry registry) {
+ this.registry = registry;
+ this.metricFilter = MetricFilter.ALL;
+ this.sampleBuilder = new DefaultSampleBuilder();
+ }
+
+ /**
+ * Creates a new DropwizardExports with a {@link DefaultSampleBuilder} and custom {@link MetricFilter}.
+ *
+ * @param registry a metric registry to export in prometheus.
+ * @param metricFilter a custom metric filter.
+ */
+ public DropwizardExports(MetricRegistry registry, MetricFilter metricFilter) {
+ this.registry = registry;
+ this.metricFilter = metricFilter;
+ this.sampleBuilder = new DefaultSampleBuilder();
+ }
+
+ /**
+ * @param registry a metric registry to export in prometheus.
+ * @param sampleBuilder sampleBuilder to use to create prometheus samples.
+ */
+ public DropwizardExports(MetricRegistry registry, SampleBuilder sampleBuilder) {
+ this.registry = registry;
+ this.metricFilter = MetricFilter.ALL;
+ this.sampleBuilder = sampleBuilder;
+ }
+
+ /**
+ * @param registry a metric registry to export in prometheus.
+ * @param metricFilter a custom metric filter.
+ * @param sampleBuilder sampleBuilder to use to create prometheus samples.
+ */
+ public DropwizardExports(MetricRegistry registry, MetricFilter metricFilter, SampleBuilder sampleBuilder) {
+ this.registry = registry;
+ this.metricFilter = metricFilter;
+ this.sampleBuilder = sampleBuilder;
+ }
+
+ private static String getHelpMessage(String metricName, Metric metric) {
+ return String.format("Generated from Dropwizard metric import (metric=%s, type=%s)", metricName, metric.getClass().getName());
+ }
+
+ /**
+ * Export counter as Prometheus Gauge.
+ */
+ MetricFamilySamples fromCounter(String dropwizardName, Counter counter) {
+ MetricFamilySamples.Sample sample = sampleBuilder.createSample(dropwizardName, "", new ArrayList(), new ArrayList(), new Long(counter.getCount()).doubleValue());
+ return new MetricFamilySamples(sample.name, Type.GAUGE, getHelpMessage(dropwizardName, counter), Arrays.asList(sample));
+ }
+
+ /**
+ * Export gauge as a prometheus gauge.
+ */
+ MetricFamilySamples fromGauge(String dropwizardName, Gauge gauge) {
+ Object obj = gauge.getValue();
+ double value;
+ if (obj instanceof Number) {
+ value = ((Number) obj).doubleValue();
+ } else if (obj instanceof Boolean) {
+ value = ((Boolean) obj) ? 1 : 0;
+ } else {
+ LOGGER.log(Level.FINE, String.format("Invalid type for Gauge %s: %s", sanitizeMetricName(dropwizardName), obj == null ? "null" : obj.getClass().getName()));
+ return null;
+ }
+ MetricFamilySamples.Sample sample = sampleBuilder.createSample(dropwizardName, "", new ArrayList(), new ArrayList(), value);
+ return new MetricFamilySamples(sample.name, Type.GAUGE, getHelpMessage(dropwizardName, gauge), Arrays.asList(sample));
+ }
+
+ /**
+ * Export a histogram snapshot as a prometheus SUMMARY.
+ *
+ * @param dropwizardName metric name.
+ * @param snapshot the histogram snapshot.
+ * @param count the total sample count for this snapshot.
+ * @param factor a factor to apply to histogram values.
+ */
+ MetricFamilySamples fromSnapshotAndCount(String dropwizardName, Snapshot snapshot, long count, double factor,
+ String helpMessage) {
+ MetricName metricName = MetricUtils.stringToMetricName(dropwizardName);
+ Stat.HistogramType histogramType = Stat.HistogramType.valueOf(metricName.getType());
+ List samples = new ArrayList<>();
+ switch (histogramType) {
+ case Avg:
+ samples = Arrays.asList(sampleBuilder.createSample(dropwizardName, "", new ArrayList(), new ArrayList(), snapshot.getMean() * factor));
+ break;
+ case Min:
+ samples = Arrays.asList(sampleBuilder.createSample(dropwizardName, "", new ArrayList(), new ArrayList(), snapshot.getMin() * factor));
+ break;
+ case Max:
+ samples = Arrays.asList(sampleBuilder.createSample(dropwizardName, "", new ArrayList(), new ArrayList(), snapshot.getMax() * factor));
+ break;
+ case Percentile_75th:
+ samples = Arrays.asList(sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.75"), snapshot.get75thPercentile() * factor));
+ break;
+ case Percentile_95th:
+ samples = Arrays.asList(sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.95"), snapshot.get95thPercentile() * factor));
+ break;
+ case Percentile_98th:
+ samples = Arrays.asList(sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.98"), snapshot.get98thPercentile() * factor));
+ break;
+ case Percentile_99th:
+ samples = Arrays.asList(sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.99"), snapshot.get99thPercentile() * factor));
+ break;
+ case Percentile_999th:
+ samples = Arrays.asList(sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.999"), snapshot.get999thPercentile() * factor));
+ break;
+ default:
+ samples = Arrays.asList(sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.5"), snapshot.getMedian() * factor), sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.5"), snapshot.getMedian() * factor), sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.75"), snapshot.get75thPercentile() * factor), sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.95"), snapshot.get95thPercentile() * factor), sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.98"), snapshot.get98thPercentile() * factor), sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.99"), snapshot.get99thPercentile() * factor), sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.999"), snapshot.get999thPercentile() * factor), sampleBuilder.createSample(dropwizardName, "_count", new ArrayList(), new ArrayList(), count));
+
+ }
+ return new MetricFamilySamples(samples.get(0).name, Type.SUMMARY, helpMessage, samples);
+ }
+
+ /**
+ * Convert histogram snapshot.
+ */
+ MetricFamilySamples fromHistogram(String dropwizardName, Histogram histogram) {
+ return fromSnapshotAndCount(dropwizardName, histogram.getSnapshot(), histogram.getCount(), 1.0, getHelpMessage(dropwizardName, histogram));
+ }
+
+ /**
+ * Export Dropwizard Timer as a histogram. Use TIME_UNIT as time unit.
+ */
+ MetricFamilySamples fromTimer(String dropwizardName, Timer timer) {
+ return fromSnapshotAndCount(dropwizardName, timer.getSnapshot(), timer.getCount(), 1.0D / TimeUnit.SECONDS.toNanos(1L), getHelpMessage(dropwizardName, timer));
+ }
+
+ /**
+ * Export a Meter as as prometheus COUNTER.
+ */
+ MetricFamilySamples fromMeter(String dropwizardName, Meter meter) {
+ MetricName metricName = MetricUtils.stringToMetricName(dropwizardName);
+ final MetricFamilySamples.Sample sample = sampleBuilder.createSample(dropwizardName, "", new ArrayList(), new ArrayList(), MetricUtils.getMeterValue(metricName, meter));
+ return new MetricFamilySamples(sample.name, Type.COUNTER, getHelpMessage(dropwizardName, meter), Arrays.asList(sample));
+ }
+
+ @Override
+ public List collect() {
+ Map mfSamplesMap = new HashMap();
+
+ for (SortedMap.Entry entry : registry.getGauges(metricFilter).entrySet()) {
+ addToMap(mfSamplesMap, fromGauge(entry.getKey(), entry.getValue()));
+ }
+ for (SortedMap.Entry entry : registry.getCounters(metricFilter).entrySet()) {
+ addToMap(mfSamplesMap, fromCounter(entry.getKey(), entry.getValue()));
+ }
+ for (SortedMap.Entry entry : registry.getHistograms(metricFilter).entrySet()) {
+ addToMap(mfSamplesMap, fromHistogram(entry.getKey(), entry.getValue()));
+ }
+ for (SortedMap.Entry entry : registry.getTimers(metricFilter).entrySet()) {
+ addToMap(mfSamplesMap, fromTimer(entry.getKey(), entry.getValue()));
+ }
+ for (SortedMap.Entry entry : registry.getMeters(metricFilter).entrySet()) {
+ addToMap(mfSamplesMap, fromMeter(entry.getKey(), entry.getValue()));
+ }
+ return new ArrayList(mfSamplesMap.values());
+ }
+
+ private void addToMap(Map mfSamplesMap, MetricFamilySamples newMfSamples) {
+ if (newMfSamples != null) {
+ MetricFamilySamples currentMfSamples = mfSamplesMap.get(newMfSamples.name);
+ if (currentMfSamples == null) {
+ mfSamplesMap.put(newMfSamples.name, newMfSamples);
+ } else {
+ Set samples = new HashSet(currentMfSamples.samples);
+ samples.addAll(newMfSamples.samples);
+ List list = new ArrayList<>(samples);
+ mfSamplesMap.put(newMfSamples.name, new MetricFamilySamples(newMfSamples.name, currentMfSamples.type, currentMfSamples.help, list));
+ }
+ }
+ }
+
+ @Override
+ public List describe() {
+ return new ArrayList();
+ }
+}
\ No newline at end of file
diff --git a/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/PrometheusSampleBuilder.java b/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/PrometheusSampleBuilder.java
new file mode 100644
index 000000000..54f17b391
--- /dev/null
+++ b/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/PrometheusSampleBuilder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.metrics;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.dropwizard.samplebuilder.SampleBuilder;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
+
+public class PrometheusSampleBuilder implements SampleBuilder {
+ private static final List SOURCE_TASK_LABEL_NAMES = Arrays.asList("metric_group", "data_type", "connector", "task");
+
+ @Override
+ public Collector.MetricFamilySamples.Sample createSample(String dropwizardName, String nameSuffix,
+ List additionalLabelNames, List additionalLabelValues, double value) {
+ String suffix = nameSuffix == null ? "" : nameSuffix;
+ List labelValues = sanitizeLabelValues(dropwizardName);
+ return new Collector.MetricFamilySamples.Sample(sanitizeMetricName(dropwizardName + suffix), SOURCE_TASK_LABEL_NAMES, labelValues, value);
+ }
+
+ public String sanitizeMetricName(String dropwizardName) {
+ return dropwizardName.split(":")[1].split(",")[1].replaceAll("-", "_");
+ }
+
+ public List sanitizeLabelValues(String dropwizardName) {
+ String[] var = dropwizardName.split(":");
+ String[] split = var[1].split(",");
+
+ String metricGroup = split[0];
+ String metricName = split[1].replaceAll("-", "_");
+ String dateType = split[2];
+ String var3 = split[3];
+ String connectorName = StringUtils.EMPTY;
+ if (!StringUtils.equals(var3, "")) {
+ connectorName = var3.substring(var3.indexOf("=") + 1);
+ }
+ String var4 = split[4];
+ String taskid = StringUtils.EMPTY;
+ if (!StringUtils.equals(var4, "")) {
+ taskid = var4.substring(var4.indexOf("=") + 1);
+ }
+ return Arrays.asList(metricGroup, dateType, connectorName, taskid);
+ }
+}
diff --git a/pom.xml b/pom.xml
index 298f5d2a8..e49f6dc3f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -66,6 +66,8 @@
1.8
1.8
3.12.0
+ 0.6.0
+
@@ -202,6 +204,16 @@
commons-lang3
${commons-lang3.version}
+
+ io.prometheus
+ simpleclient
+ ${prometheus.version}
+
+
+ io.prometheus
+ simpleclient_dropwizard
+ ${prometheus.version}
+
diff --git a/rocketmq-connect-runtime/pom.xml b/rocketmq-connect-runtime/pom.xml
index 95f43c3a7..1413646e7 100644
--- a/rocketmq-connect-runtime/pom.xml
+++ b/rocketmq-connect-runtime/pom.xml
@@ -264,7 +264,10 @@
metrics-core
4.2.0
-
+
+ io.prometheus
+ simpleclient
+
org.apache.rocketmq
metric-exporter
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/WorkerConfig.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/WorkerConfig.java
index dd5866073..fd0d1d17c 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/WorkerConfig.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/WorkerConfig.java
@@ -58,6 +58,8 @@ public class WorkerConfig {
*/
private int httpPort = 8082;
+ private int exporterPort = 5557;
+
/**
* plugin paths config;
* Multiple use ',' split
@@ -591,6 +593,14 @@ public void setStateManagementService(String stateManagementService) {
this.stateManagementService = stateManagementService;
}
+ public int getExporterPort() {
+ return exporterPort;
+ }
+
+ public void setExporterPort(int exporterPort) {
+ this.exporterPort = exporterPort;
+ }
+
@Override
public String toString() {
return "WorkerConfig{" +
@@ -638,6 +648,7 @@ public String toString() {
", configManagementService='" + configManagementService + '\'' +
", positionManagementService='" + positionManagementService + '\'' +
", stateManagementService='" + stateManagementService + '\'' +
+ ", exporterPort=" + exporterPort +
'}';
}
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index 496e02767..de6e5fe75 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -27,6 +27,7 @@
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.RecordConverter;
import io.openmessaging.connector.api.errors.ConnectException;
+import io.prometheus.client.CollectorRegistry;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -50,6 +51,8 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.connect.metrics.DropwizardExports;
+import org.apache.rocketmq.connect.metrics.PrometheusSampleBuilder;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
@@ -148,6 +151,7 @@ public Worker(WorkerConfig workerConfig,
this.executor = Executors.newCachedThreadPool();
this.connectMetrics = new ConnectMetrics(workerConfig);
this.stateManagementService = stateManagementService;
+ CollectorRegistry.defaultRegistry.register(new DropwizardExports(connectMetrics.registry(), new PrometheusSampleBuilder()));
}
public void start() {
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/PrometheusMetricsServlet.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/PrometheusMetricsServlet.java
new file mode 100644
index 000000000..0cf915b8a
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/PrometheusMetricsServlet.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.rest;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+public class PrometheusMetricsServlet extends HttpServlet {
+ private CollectorRegistry registry;
+
+ public PrometheusMetricsServlet() {
+ this(CollectorRegistry.defaultRegistry);
+ }
+
+ public PrometheusMetricsServlet(CollectorRegistry registry) {
+ this.registry = registry;
+ }
+
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ resp.setStatus(200);
+ resp.setContentType("text/plain; version=0.0.4; charset=utf-8");
+ StringWriter writer = new StringWriter();
+
+ this.writeEscapedHelp(writer, registry);
+ resp.getOutputStream().print(writer.toString());
+ }
+
+ public void writeEscapedHelp(StringWriter writer, CollectorRegistry registry) throws IOException {
+ Enumeration metricFamilySamplesEnumeration = registry.metricFamilySamples();
+ List list = new ArrayList<>();
+ while (metricFamilySamplesEnumeration.hasMoreElements()) {
+ Collector.MetricFamilySamples metricFamilySamples = metricFamilySamplesEnumeration.nextElement();
+ list.add(metricFamilySamples);
+ }
+ writeEscapedHelp(writer, list);
+ }
+
+ public void writeEscapedHelp(StringWriter writer, List mfs) throws IOException {
+ if (Objects.nonNull(mfs) && mfs.size() != 0) {
+ for (Collector.MetricFamilySamples metricFamilySamples : mfs) {
+ for (Iterator var3 = metricFamilySamples.samples.iterator(); var3.hasNext(); writer.write(10)) {
+ Collector.MetricFamilySamples.Sample sample = (Collector.MetricFamilySamples.Sample) var3.next();
+ writer.write(sample.name);
+ if (sample.labelNames.size() > 0) {
+ writer.write(123);
+
+ for (int i = 0; i < sample.labelNames.size(); ++i) {
+ writer.write((String) sample.labelNames.get(i));
+ writer.write("=\"");
+ writeEscapedLabelValue(writer, (String) sample.labelValues.get(i));
+ writer.write("\",");
+ }
+
+ writer.write(125);
+ }
+
+ writer.write(32);
+ writer.write(Collector.doubleToGoString(sample.value));
+ if (sample.timestampMs != null) {
+ writer.write(32);
+ writer.write(sample.timestampMs.toString());
+ }
+ }
+ }
+ }
+
+ }
+
+ private static void writeEscapedLabelValue(Writer writer, String s) throws IOException {
+ for (int i = 0; i < s.length(); ++i) {
+ char c = s.charAt(i);
+ switch (c) {
+ case '\n':
+ writer.append("\\n");
+ break;
+ case '"':
+ writer.append("\\\"");
+ break;
+ case '\\':
+ writer.append("\\\\");
+ break;
+ default:
+ writer.append(c);
+ }
+ }
+
+ }
+
+ private Set parse(HttpServletRequest req) {
+ String[] includedParam = req.getParameterValues("name[]");
+ return (Set) (includedParam == null ? Collections.emptySet() : new HashSet(Arrays.asList(includedParam)));
+ }
+
+ protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ this.doGet(req, resp);
+ }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
index b4f6c378f..733379b54 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
@@ -31,6 +31,11 @@
import org.apache.rocketmq.connect.runtime.rest.entities.HttpResponse;
import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.handler.ContextHandlerCollection;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,6 +65,21 @@ public RestHandler(AbstractConnectController connectController) {
this.connectController = connectController;
pluginsResource = new ConnectorPluginsResource(connectController);
+ Javalin embeddedApp = Javalin.create(config -> {
+ config.server(() -> {
+ Server server = new Server(connectController.getConnectConfig().getExporterPort());
+ ServletContextHandler context = new ServletContextHandler();
+ context.setContextPath("/");
+ context.addServlet(new ServletHolder(new PrometheusMetricsServlet()), "/metrics");
+ ContextHandlerCollection handlers = new ContextHandlerCollection();
+ handlers.setHandlers(new Handler[]{context});
+ server.setHandler(handlers);
+ return server;
+ });
+ });
+
+ embeddedApp.start(connectController.getConnectConfig().getExporterPort());
+
Javalin app = Javalin.create();
app = app.start(connectController.getConnectConfig().getHttpPort());