From a2e02c22ffb6a8bb2e968af97bf7332a1374eb62 Mon Sep 17 00:00:00 2001 From: Ziy1-Tan Date: Tue, 27 Jun 2023 21:40:18 +0800 Subject: [PATCH] support oss sink connector Signed-off-by: Ziy1-Tan --- .../aliyun/rocketmq-connect-oss/README.md | 30 +++ .../aliyun/rocketmq-connect-oss/pom.xml | 221 ++++++++++++++++++ .../connect/oss/config/OSSBaseConfig.java | 108 +++++++++ .../connect/oss/config/OSSConstants.java | 15 ++ .../connect/oss/helper/OSSHelperClient.java | 65 ++++++ .../connect/oss/sink/OSSSinkConnector.java | 38 +++ .../connect/oss/sink/OSSSinkTask.java | 59 +++++ .../connect/oss/sink/OSSSinkTaskTest.java | 61 +++++ 8 files changed, 597 insertions(+) create mode 100644 connectors/aliyun/rocketmq-connect-oss/README.md create mode 100644 connectors/aliyun/rocketmq-connect-oss/pom.xml create mode 100644 connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/config/OSSBaseConfig.java create mode 100644 connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/config/OSSConstants.java create mode 100644 connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/helper/OSSHelperClient.java create mode 100644 connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OSSSinkConnector.java create mode 100644 connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OSSSinkTask.java create mode 100644 connectors/aliyun/rocketmq-connect-oss/src/test/java/org/apache/rocketmq/connect/oss/sink/OSSSinkTaskTest.java diff --git a/connectors/aliyun/rocketmq-connect-oss/README.md b/connectors/aliyun/rocketmq-connect-oss/README.md new file mode 100644 index 000000000..e36de3855 --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-oss/README.md @@ -0,0 +1,30 @@ +##### OSSSinkConnector fully-qualified name + +org.apache.rocketmq.connect.oss.sink.OSSSinkConnector + +**oss-sink-connector** start + +```shell +POST http://${runtime-ip}:${runtime-port}/connectors/OSSSinkConnector +{ + "connector.class":"org.apache.rocketmq.connect.oss.sink.OSSSinkConnector", + "endpoint":"https://oss-cn-hangzhou.aliyuncs.com", + "accessKeyId":"", + "accessKeySecret":"", + "bucketName ":"examplebucket", + "max.tasks":2, + "connect.topicnames":"test-oss-topic", + "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", + "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" +} +``` + +##### parameter configuration + +| parameter | effect | required | default | +| ------------------ | ---------------------------------------------- | --------------- | ------- | +| endpoint | endpoint to connect to | yes | null | +| accessKeyId | access key id | yes | null | +| accessKeySecret | access key secret | yes | null | +| bucketName | bucket name to connect to | yes | null | +| connect.topicnames | RocketMQ topic for sink connector to read from | yes (sink only) | null | diff --git a/connectors/aliyun/rocketmq-connect-oss/pom.xml b/connectors/aliyun/rocketmq-connect-oss/pom.xml new file mode 100644 index 000000000..d1d7250c8 --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-oss/pom.xml @@ -0,0 +1,221 @@ + + + + + 4.0.0 + + org.apache.rocketmq + rocketmq-connect-oss + 1.0-SNAPSHOT + + connect-oss + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + jira + https://issues.apache.org/jira/browse/RocketMQ + + + + + + org.codehaus.mojo + versions-maven-plugin + 2.3 + + + org.codehaus.mojo + clirr-maven-plugin + 2.7 + + + maven-compiler-plugin + 3.6.1 + + ${maven.compiler.source} + ${maven.compiler.target} + ${maven.compiler.source} + true + true + + + + maven-surefire-plugin + 2.19.1 + + -Xms512m -Xmx1024m + always + + **/*Test.java + + + + + maven-site-plugin + 3.6 + + en_US + UTF-8 + UTF-8 + + + + maven-source-plugin + 3.0.1 + + + attach-sources + + jar + + + + + + maven-javadoc-plugin + 2.10.4 + + UTF-8 + en_US + io.openmessaging.internal + + + + aggregate + + aggregate + + site + + + + + maven-resources-plugin + 3.0.2 + + ${project.build.sourceEncoding} + + + + org.codehaus.mojo + findbugs-maven-plugin + 3.0.4 + + + org.apache.rat + apache-rat-plugin + 0.12 + + + README.md + README-CN.md + + + + + maven-assembly-plugin + 3.0.0 + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + + + 8 + 8 + UTF-8 + + + + io.openmessaging + openmessaging-connector + 0.1.4 + compile + + + + org.apache.commons + commons-lang3 + 3.12.0 + + + + com.aliyun.oss + aliyun-sdk-oss + 3.15.1 + + + + javax.xml.bind + jaxb-api + 2.3.1 + + + javax.activation + activation + 1.1.1 + + + + org.glassfish.jaxb + jaxb-runtime + 2.3.3 + + + + org.lz4 + lz4-java + 1.8.0 + + + + + com.alibaba + fastjson + 1.2.83 + compile + + + junit + junit + RELEASE + test + + + org.slf4j + slf4j-api + 1.7.7 + + + + diff --git a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/config/OSSBaseConfig.java b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/config/OSSBaseConfig.java new file mode 100644 index 000000000..d9e917edf --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/config/OSSBaseConfig.java @@ -0,0 +1,108 @@ +package org.apache.rocketmq.connect.oss.config; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import io.openmessaging.KeyValue; +import java.lang.reflect.Method; + +public class OSSBaseConfig { + + private String endpoint; + + private String accessKeyId; + + private String accessKeySecret; + + private String bucketName; + + public String getEndpoint() { + return endpoint; + } + + public void setEndpoint(String endpoint) { + this.endpoint = endpoint; + } + + public String getAccessKeyId() { + return accessKeyId; + } + + public void setAccessKeyId(String accessKeyId) { + this.accessKeyId = accessKeyId; + } + + public String getBucketName() { + return bucketName; + } + + public void setBucketName(String userName) { + this.bucketName = userName; + } + + public String getAccessKeySecret() { + return accessKeySecret; + } + + public void setAccessKeySecret(String accessKeySecret) { + this.accessKeySecret = accessKeySecret; + } + + public void load(KeyValue props) { + properties2Object(props, this); + } + + private void properties2Object(final KeyValue p, final Object object) { + + Method[] methods = object.getClass().getMethods(); + for (Method method : methods) { + String mn = method.getName(); + if (mn.startsWith("set")) { + try { + String tmp = mn.substring(3); + String key = tmp.toLowerCase(); + + String property = p.getString(key); + if (property != null) { + Class[] pt = method.getParameterTypes(); + if (pt != null && pt.length > 0) { + String cn = pt[0].getSimpleName(); + Object arg; + if (cn.equals("int") || cn.equals("Integer")) { + arg = Integer.parseInt(property); + } else if (cn.equals("long") || cn.equals("Long")) { + arg = Long.parseLong(property); + } else if (cn.equals("double") || cn.equals("Double")) { + arg = Double.parseDouble(property); + } else if (cn.equals("boolean") || cn.equals("Boolean")) { + arg = Boolean.parseBoolean(property); + } else if (cn.equals("float") || cn.equals("Float")) { + arg = Float.parseFloat(property); + } else if (cn.equals("String")) { + arg = property; + } else { + continue; + } + method.invoke(object, arg); + } + } + } catch (Throwable ignored) { + } + } + } + } +} diff --git a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/config/OSSConstants.java b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/config/OSSConstants.java new file mode 100644 index 000000000..f06f9d549 --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/config/OSSConstants.java @@ -0,0 +1,15 @@ +package org.apache.rocketmq.connect.oss.config; + +public class OSSConstants { + public static final String ENDPOINT = "endpoint"; + + public static final String ACCESS_KEY_ID = "accessKeyId"; + + public static final String ACCESS_KEY_SECRET = "accessKeySecret"; + + public static final String BUCKET_NAME = "bucketName"; + + public static final String OSS_OFFSET = "OFFSET"; + + public static final String OSS_PARTITION = "OSS_PARTITION"; +} diff --git a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/helper/OSSHelperClient.java b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/helper/OSSHelperClient.java new file mode 100644 index 000000000..eb599ec6c --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/helper/OSSHelperClient.java @@ -0,0 +1,65 @@ +package org.apache.rocketmq.connect.oss.helper; + +import java.io.ByteArrayInputStream; + +import org.apache.rocketmq.connect.oss.config.OSSBaseConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSClientBuilder; +import com.aliyun.oss.common.auth.CredentialsProviderFactory; +import com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider; +import com.aliyun.oss.model.PutObjectRequest; +import com.aliyun.oss.model.PutObjectResult; +import com.aliyuncs.exceptions.ClientException; + +public class OSSHelperClient { + private static final Logger LOGGER = LoggerFactory.getLogger(OSSHelperClient.class); + + private OSSBaseConfig config; + OSS ossClient; + + public OSSHelperClient(OSSBaseConfig config) { + this.config = config; + this.ossClient = create(this.config); + } + + private OSS create(OSSBaseConfig config) { + if (config.getAccessKeyId() != null && config.getAccessKeySecret() != null) { + return new OSSClientBuilder().build(config.getEndpoint(), config.getAccessKeyId(), + config.getAccessKeySecret()); + } + + try { + EnvironmentVariableCredentialsProvider credentialsProvider = CredentialsProviderFactory + .newEnvironmentVariableCredentialsProvider(); + return new OSSClientBuilder().build(config.getEndpoint(), credentialsProvider); + } catch (ClientException e) { + e.printStackTrace(); + } + + throw new RuntimeException("Credentials cannot be empty!"); + } + + public OSS getOSSClient() { + return ossClient; + } + + public boolean isBucketExist(String bucketName) { + return ossClient.doesBucketExist(bucketName); + } + + public void putObject(String bucketName, String objectName, String content) { + PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, objectName, + new ByteArrayInputStream(content.getBytes())); + ossClient.putObject(putObjectRequest); + } + + public void stop() { + if (ossClient != null) { + ossClient.shutdown(); + } + } + +} diff --git a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OSSSinkConnector.java b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OSSSinkConnector.java new file mode 100644 index 000000000..5d99fe34b --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OSSSinkConnector.java @@ -0,0 +1,38 @@ +package org.apache.rocketmq.connect.oss.sink; + +import java.util.ArrayList; +import java.util.List; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.Task; +import io.openmessaging.connector.api.component.task.sink.SinkConnector; + +public class OSSSinkConnector extends SinkConnector { + + private KeyValue keyValue; + + @Override + public void start(KeyValue config) { + this.keyValue = config; + } + + @Override + public void stop() { + this.keyValue = null; + } + + @Override + public List taskConfigs(int maxTasks) { + List configs = new ArrayList<>(); + for (int i = 0; i < maxTasks; i++) { + configs.add(this.keyValue); + } + return configs; + } + + @Override + public Class taskClass() { + return OSSSinkTask.class; + } + +} diff --git a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OSSSinkTask.java b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OSSSinkTask.java new file mode 100644 index 000000000..f003db93a --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OSSSinkTask.java @@ -0,0 +1,59 @@ +package org.apache.rocketmq.connect.oss.sink; + +import java.util.List; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.connect.oss.config.OSSBaseConfig; +import org.apache.rocketmq.connect.oss.config.OSSConstants; +import org.apache.rocketmq.connect.oss.helper.OSSHelperClient; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.sink.SinkTask; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.errors.ConnectException; + +public class OSSSinkTask extends SinkTask { + + private OSSBaseConfig config; + private OSSHelperClient ossClient; + + @Override + public void validate(KeyValue config) { + if (StringUtils.isBlank(config.getString(OSSConstants.ENDPOINT)) + || StringUtils.isBlank(config.getString(OSSConstants.ACCESS_KEY_SECRET)) + || StringUtils.isBlank(config.getString(OSSConstants.ACCESS_KEY_SECRET)) + || StringUtils.isBlank(config.getString(OSSConstants.BUCKET_NAME))) { + throw new RuntimeException("OSS required parameter is null !"); + } + + if (ossClient.isBucketExist(OSSConstants.BUCKET_NAME)) { + throw new RuntimeException("Bucket is not exist !"); + } + } + + @Override + public void start(KeyValue keyValue) { + this.config = new OSSBaseConfig(); + this.config.load(keyValue); + this.ossClient = new OSSHelperClient(this.config); + } + + @Override + public void stop() { + this.ossClient.stop(); + this.ossClient = null; + } + + @Override + public void put(List sinkRecords) throws ConnectException { + if (sinkRecords == null || sinkRecords.size() < 1) { + return; + } + + for (ConnectRecord record : sinkRecords) { + this.ossClient.putObject(config.getBucketName(), (String) record.getKey(), + record.getData().toString()); + } + } + +} diff --git a/connectors/aliyun/rocketmq-connect-oss/src/test/java/org/apache/rocketmq/connect/oss/sink/OSSSinkTaskTest.java b/connectors/aliyun/rocketmq-connect-oss/src/test/java/org/apache/rocketmq/connect/oss/sink/OSSSinkTaskTest.java new file mode 100644 index 000000000..5180a1c7e --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-oss/src/test/java/org/apache/rocketmq/connect/oss/sink/OSSSinkTaskTest.java @@ -0,0 +1,61 @@ +package org.apache.rocketmq.connect.oss.sink; + +import static org.junit.Assert.assertTrue; + +import org.junit.Test; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.data.RecordOffset; +import io.openmessaging.connector.api.data.RecordPartition; +import io.openmessaging.connector.api.data.Schema; +import io.openmessaging.connector.api.data.SchemaBuilder; + +import io.openmessaging.internal.DefaultKeyValue; + +import org.apache.rocketmq.connect.oss.config.OSSConstants; + + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +public class OSSSinkTaskTest { + // private static final String endpoint = "https://oss-cn-hangzhou.aliyuncs.com"; + // private static final String bucketName = "ExampleBucket"; + // private static final String accessKeyId = ""; + // private static final String accessKeySecret = ""; + + // public static void main(String[] args) { + // List records = new ArrayList<>(); + // // build schema + // Schema keySchema = SchemaBuilder.string() + // .build(); + + // Schema valSchema = SchemaBuilder.string() + // .build(); + + // for (int i = 0; i < 4; i++) { + + // ConnectRecord record = new ConnectRecord( + // new RecordPartition(new ConcurrentHashMap<>()), + // new RecordOffset(new HashMap<>()), + // System.currentTimeMillis(), + // keySchema, + // "key" + i, + // valSchema, + // "data" + i); + // records.add(record); + + // } + + // OSSSinkTask task = new OSSSinkTask(); + // KeyValue config = new DefaultKeyValue(); + // config.put(OSSConstants.ENDPOINT, endpoint); + // config.put(OSSConstants.ACCESS_KEY_ID, accessKeyId); + // config.put(OSSConstants.ACCESS_KEY_SECRET, accessKeySecret); + // config.put(OSSConstants.BUCKET_NAME, bucketName); + // task.start(config); + // task.put(records); + // } +}