From 517bcbb6e18db146bd3e8d282075065fd3f4c57a Mon Sep 17 00:00:00 2001 From: zhoubo <877036922@qq.com> Date: Fri, 21 Apr 2023 17:54:00 +0800 Subject: [PATCH] add replicator checkpoint doc (#486) modify metrics.conf default config --- connectors/rocketmq-replicator/README.md | 104 ++++++++++++++---- .../replicator/ReplicatorCheckpointTask.java | 2 +- .../replicator/ReplicatorSourceTask.java | 2 +- distribution/conf/metrics.conf | 3 +- 4 files changed, 89 insertions(+), 22 deletions(-) diff --git a/connectors/rocketmq-replicator/README.md b/connectors/rocketmq-replicator/README.md index 7d1bec67c..21d64945a 100644 --- a/connectors/rocketmq-replicator/README.md +++ b/connectors/rocketmq-replicator/README.md @@ -74,6 +74,68 @@ curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connector }' ```` +同步位点 + +```` +curl -X POST -H "Content-Type: application/json" http://${runtime-port}:${runtime-ip}/connectors/${replicator-name} -d '{ + "connector.class": "org.apache.rocketmq.replicator.ReplicatorCheckpointConnector", + "src.cluster": "${srcDefaultCluster}", + "src.endpoint": "${namesrvEndpoint}", + "dest.acl.enable": "false", + "src.secret.key": "${sk}", + "dest.access.key": "${ak}", + "src.topictags": "test1", + "sync.gids":"test_gid1", + "src.acl.enable": "false", + "errors.tolerance": "all", + "dest.secret.key": "${sk}", + "dest.endpoint": "${namesrvEndpoint}", + "src.access.key": "${ak}", + "dest.cluster": "${targetDefaultCluster}", + "source.cluster": "${sourceDefaultCluster}", + "dest.region": "${regionA}", + "src.region": "${regionB}", + "dest.cloud": "${cloud1}", + "source.cloud": "${cloud2}", + "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", + "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" +}' +```` +例如 +```` +curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/test_checkpoint_replicator -d '{ + "connector.class": "org.apache.rocketmq.replicator.ReplicatorCheckpointConnector", + "src.endpoint": "127.0.0.1:9876", + "src.cluster": "DefaultCluster", + "src.region": "regionA", + "src.cloud": "cloud1", + "dest.acl.enable": "false", + "src.topictags": "TopicTest", + "src.acl.enable": "false", + "dest.endpoint": "127.0.0.2:9876", + "dest.region": "regionB", + "dest.cluster": "DefaultCluster", + "errors.tolerance": "all", + "sync.gids":"test_gid1", + "dest.cloud": "dest-cloud", + "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", + "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" +}' +```` +同步位点任务创建成功以后,可以再目的 RocketMQ 集群 replicator_checkpoint Topic中消费到位点信息 + +位点信息格式 + +```` +Struct{consumerGroup=test_gid1,topic=TopicTest,upstreamLastTimestamp=1681962375040,downstreamLastTimestamp=1681439588399,metadata=1682068439879} +```` + +parameter | type| description | +---|-------------|----------------------------------------| +consumerGroup | String | consumerGroup | +topic | String | topic | +upstreamLastTimestamp | long | 源 RocketMQ topic consumerGourp 的消费位点 | +downstreamLastTimestamp | long | 目的 RocketMQ topic consumerGourp 的消费位点 | ## rocketmq-replicator停止 ```` @@ -82,24 +144,28 @@ curl http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-replicator-name} ## rocketmq-replicator参数说明 -parameter | type | must | description | sample value ----|---|------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------| -src.endpoint | String | Yes | namesrv address of source rocketmq cluster | 127.0.0.1:9876 | -src.topictags | String | Yes | source cluster topic and tag,${topic},{tag} | test1,* | -dest.topic | String | Yes | target cluster topic | test2 | -dest.endpoint | String | Yes | namesrv address of target rocketmq cluster | 127.0.0.1:9876 | -max.task | String | No | maximum number of tasks | 2 | -dest.acl.enable | String | No | acl switch,enumeration value : true/false | false | -dest.access.key | String | No | please refer to the RocketMQ ACL module,when dest.acl.enable is false, this parameter does not take effect | accesskey | -dest.secret.key | String | No | please refer to the RocketMQ ACL module,when dest.acl.enable is false, this parameter does not take effect | secretkey | -src.acl.enable | String | No | acl switch,enumeration value : true/false | true | -src.access.key | String | No | please refer to the RocketMQ ACL module,when dest.acl.enable is false, this parameter does not take effect | accesskey | -src.secret.key | String | No | please refer to the RocketMQ ACL module,when dest.acl.enable is false, this parameter does not take effect | secretkey | +parameter | type | must | description | sample value +---|---|------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------| +src.endpoint | String | Yes | namesrv address of source rocketmq cluster | 127.0.0.1:9876 | +src.topictags | String | Yes | source cluster topic and tag,${topic},{tag} | test1,* | +dest.topic | String | Yes | target cluster topic | test2 | +dest.endpoint | String | Yes | namesrv address of target rocketmq cluster | 127.0.0.1:9876 | +max.task | String | No | maximum number of tasks | 2 | +dest.acl.enable | String | No | acl switch,enumeration value : true/false | false | +dest.access.key | String | No | please refer to the RocketMQ ACL module,when dest.acl.enable is false, this parameter does not take effect | accesskey | +dest.secret.key | String | No | please refer to the RocketMQ ACL module,when dest.acl.enable is false, this parameter does not take effect | secretkey | +src.acl.enable | String | No | acl switch,enumeration value : true/false | true | +src.access.key | String | No | please refer to the RocketMQ ACL module,when dest.acl.enable is false, this parameter does not take effect | accesskey | +src.secret.key | String | No | please refer to the RocketMQ ACL module,when dest.acl.enable is false, this parameter does not take effect | secretkey | errors.tolerance | String | No | error tolerance ,enumeration value : all . all means to tolerate all errors, the synchronization message failure will be skipped and error log will be printed. If there is no error tolerance configured, all errors will not be tolerated by default, a synchronization failure occurs, and the task will stop after multiple retries | all | -src.cluster | String | No | source cluster | DefaultCluster | -dest.cluster | String | No | target cluster | DefaultCluster | -src.region | String | No | source region | regionA | -dest.region | String | No | source region | regionB | -src.cloud | String | No | source cloud | cloud1 | -dest.cloud | String | No | source cloud | cloud2 | +src.cluster | String | No | source cluster | DefaultCluster | +dest.cluster | String | No | target cluster | DefaultCluster | +src.region | String | No | source region | regionA | +dest.region | String | No | source region | regionB | +src.cloud | String | No | source cloud | cloud1 | +dest.cloud | String | No | source cloud | cloud2 | +sync.gids | String | No | consumeGroup | consumerGroup1 | +key.converter | String | No | key.converter | org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter | +value.converter | String | No | value converter | org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter | + diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java index 6c9946d2a..5a44ec614 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java @@ -258,7 +258,7 @@ private void fillConnectorConfig(KeyValue config) { connectorConfig.setSrcCluster(config.getString(connectorConfig.SRC_CLUSTER)); connectorConfig.setSrcInstanceId(config.getString(connectorConfig.SRC_INSTANCEID)); connectorConfig.setSrcEndpoint(config.getString(connectorConfig.SRC_ENDPOINT)); - connectorConfig.setSrcTopicTags(config.getString(connectorConfig.getSrcTopicTags())); + connectorConfig.setSrcTopicTags(config.getString(connectorConfig.SRC_TOPICTAGS)); connectorConfig.setDestCloud(config.getString(connectorConfig.DEST_CLOUD)); connectorConfig.setDestRegion(config.getString(connectorConfig.DEST_REGION)); connectorConfig.setDestCluster(config.getString(connectorConfig.DEST_CLUSTER)); diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java index af9352a83..f20cba48f 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java @@ -386,7 +386,7 @@ public void accept(MessageQueue messageQueue, OffsetWrapper offsetWrapper) { } public synchronized boolean putPulledQueueOffset(MessageQueue mq, long currentOffset, int needAck, MessageExt msg) { - log.info("putPulledQueueOffset " + mq + ", currentOffset : " + currentOffset + ", ackCount : " + needAck); + log.trace("putPulledQueueOffset " + mq + ", currentOffset : " + currentOffset + ", ackCount : " + needAck); TreeMap offsets = queue2Offsets.get(mq); if (offsets == null) { TreeMap newOffsets = new TreeMap<>(); diff --git a/distribution/conf/metrics.conf b/distribution/conf/metrics.conf index 45158f7e9..98c1d6db1 100644 --- a/distribution/conf/metrics.conf +++ b/distribution/conf/metrics.conf @@ -15,10 +15,11 @@ # metrics reporter class -metrics.reporter=org.apache.rocketmq.connect.runtime.metrics.RocketMQScheduledReporter +metrics.reporter=org.apache.rocketmq.connect.metrics.reporter.RocketMQScheduledReporter # metrics topic metrics.topic=metrics-topic +group.id=metrics-gid # Rocketmq namesrvAddr name.srv.addr=localhost:9876