From 266c5ad4f0d1265b87747efccefaf7e2edeec5fe Mon Sep 17 00:00:00 2001 From: Ziy1-Tan Date: Sat, 1 Jul 2023 00:08:57 +0800 Subject: [PATCH] Support email sink connector Signed-off-by: Ziy1-Tan --- connectors/rocketmq-connect-email/README.md | 38 ++++ connectors/rocketmq-connect-email/pom.xml | 191 ++++++++++++++++++ .../connect/email/config/EmailConstants.java | 31 +++ .../connect/email/config/EmailSinkConfig.java | 156 ++++++++++++++ .../email/helper/EmailHelperClient.java | 156 ++++++++++++++ .../email/sink/EmailSinkConnector.java | 62 ++++++ .../connect/email/sink/EmailSinkTask.java | 66 ++++++ .../connect/email/EmailSInkTaskTest.java | 63 ++++++ 8 files changed, 763 insertions(+) create mode 100644 connectors/rocketmq-connect-email/README.md create mode 100644 connectors/rocketmq-connect-email/pom.xml create mode 100644 connectors/rocketmq-connect-email/src/main/java/org/apache/rocketmq/connect/email/config/EmailConstants.java create mode 100644 connectors/rocketmq-connect-email/src/main/java/org/apache/rocketmq/connect/email/config/EmailSinkConfig.java create mode 100644 connectors/rocketmq-connect-email/src/main/java/org/apache/rocketmq/connect/email/helper/EmailHelperClient.java create mode 100644 connectors/rocketmq-connect-email/src/main/java/org/apache/rocketmq/connect/email/sink/EmailSinkConnector.java create mode 100644 connectors/rocketmq-connect-email/src/main/java/org/apache/rocketmq/connect/email/sink/EmailSinkTask.java create mode 100644 connectors/rocketmq-connect-email/src/test/java/org/apache/rocketmq/connect/email/EmailSInkTaskTest.java diff --git a/connectors/rocketmq-connect-email/README.md b/connectors/rocketmq-connect-email/README.md new file mode 100644 index 00000000..455a5620 --- /dev/null +++ b/connectors/rocketmq-connect-email/README.md @@ -0,0 +1,38 @@ +##### EmailSinkConnector fully-qualified name +org.apache.rocketmq.connect.email.sink.EmailSinkConnector + +**email-sink-connector** start + +```shell +POST http://${runtime-ip}:${runtime-port}/connectors/EmailSinkConnector +{ + "connector.class": "org.apache.rocketmq.connect.email.sink.EmailSinkConnector", + "max.task": "1", + "fromAddress": "xxxxxx@qq.com", + "toAddress": "xxxxxx@gmail.com", + "host": "smtp.qq.com", + "password": "******", + "subject": "This is a subject", + "content": "this is a content.", + "transportProtocol": "smtp", + "smtpAuth": true, + "connect.topicnames": "test-mail-topic", + "value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", + "key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" +} +``` + +> **Note:** data will be send as a attachment called `emailsink.csv` with email. + +##### parameter configuration + +| parameter | effect | required | default | +| ----------------- | -------------------------------------------------------------- | -------- | ------- | +| fromAddress | Sender email address | yes | null | +| toAddress | Receiver email address | yes | null | +| host | URL to connect to. | yes | null | +| password | authorization code,You can obtain it from the mailbox Settings | yes | null | +| subject | The subject line of the entire message | yes | null | +| content | The body of the entire message | yes | null | +| transportProtocol | The protocol to load the session. connector to write into | yes | null | +| smtpAuth | Whether to authenticate the customer | yes | null | diff --git a/connectors/rocketmq-connect-email/pom.xml b/connectors/rocketmq-connect-email/pom.xml new file mode 100644 index 00000000..647dcaee --- /dev/null +++ b/connectors/rocketmq-connect-email/pom.xml @@ -0,0 +1,191 @@ + + + + + 4.0.0 + + org.apache.rocketmq + rocketmq-connect-email + 1.0-SNAPSHOT + + connect-email + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + jira + https://issues.apache.org/jira/browse/RocketMQ + + + + + + org.codehaus.mojo + versions-maven-plugin + 2.3 + + + org.codehaus.mojo + clirr-maven-plugin + 2.7 + + + maven-compiler-plugin + 3.6.1 + + ${maven.compiler.source} + ${maven.compiler.target} + ${maven.compiler.source} + true + true + + + + maven-surefire-plugin + 2.19.1 + + -Xms512m -Xmx1024m + always + + **/*Test.java + + + + + maven-site-plugin + 3.6 + + en_US + UTF-8 + UTF-8 + + + + maven-source-plugin + 3.0.1 + + + attach-sources + + jar + + + + + + maven-javadoc-plugin + 2.10.4 + + UTF-8 + en_US + io.openmessaging.internal + + + + aggregate + + aggregate + + site + + + + + maven-resources-plugin + 3.0.2 + + ${project.build.sourceEncoding} + + + + org.codehaus.mojo + findbugs-maven-plugin + 3.0.4 + + + org.apache.rat + apache-rat-plugin + 0.12 + + + README.md + README-CN.md + + + + + maven-assembly-plugin + 3.0.0 + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + + + 8 + 8 + UTF-8 + + + + io.openmessaging + openmessaging-connector + 0.1.4 + compile + + + + com.sun.mail + javax.mail + 1.5.6 + + + + com.alibaba + fastjson + 1.2.83 + compile + + + junit + junit + RELEASE + test + + + org.slf4j + slf4j-api + 1.7.7 + + + + diff --git a/connectors/rocketmq-connect-email/src/main/java/org/apache/rocketmq/connect/email/config/EmailConstants.java b/connectors/rocketmq-connect-email/src/main/java/org/apache/rocketmq/connect/email/config/EmailConstants.java new file mode 100644 index 00000000..40832a4b --- /dev/null +++ b/connectors/rocketmq-connect-email/src/main/java/org/apache/rocketmq/connect/email/config/EmailConstants.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.email.config; + +public class EmailConstants { + public static final String EMAIL_FROM_ADDRESS = "fromAddress"; + public static final String EMAIL_TO_ADDRESS = "toAddress"; + public static final String EMAIL_HOST = "host"; + public static final String EMAIL_PASSWORD = "password"; + public static final String EMAIL_SUBJECT = "subject"; + public static final String EMAIL_CONTENT = "content"; + public static final String EMAIL_TRANSPORT_PROTOCOL = "transportProtocol"; + public static final String EMAIL_SMTP_AUTH = "smtpAuth"; + + public static final String ATTACHMENT_FILE_NAME = "emailsink.csv"; +} diff --git a/connectors/rocketmq-connect-email/src/main/java/org/apache/rocketmq/connect/email/config/EmailSinkConfig.java b/connectors/rocketmq-connect-email/src/main/java/org/apache/rocketmq/connect/email/config/EmailSinkConfig.java new file mode 100644 index 00000000..40186ba7 --- /dev/null +++ b/connectors/rocketmq-connect-email/src/main/java/org/apache/rocketmq/connect/email/config/EmailSinkConfig.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.email.config; + +import java.lang.reflect.Method; +import java.util.HashSet; +import java.util.Set; + +import io.openmessaging.KeyValue; + +public class EmailSinkConfig { + public static final Set SINK_REQUEST_CONFIG = new HashSet() { + { + add(EmailConstants.EMAIL_FROM_ADDRESS); + add(EmailConstants.EMAIL_TO_ADDRESS); + add(EmailConstants.EMAIL_HOST); + add(EmailConstants.EMAIL_PASSWORD); + add(EmailConstants.EMAIL_SUBJECT); + add(EmailConstants.EMAIL_CONTENT); + add(EmailConstants.EMAIL_TRANSPORT_PROTOCOL); + add(EmailConstants.EMAIL_SMTP_AUTH); + } + }; + + private String fromAddress; + private String toAddress; + private String host; + private String password; + private String subject; + private String content; + private String transportProtocol; + private String stmpAuth; + + public String getFromAddress() { + return fromAddress; + } + + public void setFromAddress(String fromAddress) { + this.fromAddress = fromAddress; + } + + public String getToAddress() { + return toAddress; + } + + public void setToAddress(String toAddress) { + this.toAddress = toAddress; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getSubject() { + return subject; + } + + public void setSubject(String subject) { + this.subject = subject; + } + + public String getContent() { + return content; + } + + public void setContent(String content) { + this.content = content; + } + + public String getTransportProtocol() { + return transportProtocol; + } + + public void setTransportProtocol(String transportProtocol) { + this.transportProtocol = transportProtocol; + } + + public String getStmpAuth() { + return stmpAuth; + } + + public void setStmpAuth(String stmpAuth) { + this.stmpAuth = stmpAuth; + } + + public void load(KeyValue props) { + properties2Object(props, this); + } + + private void properties2Object(final KeyValue p, final Object object) { + + Method[] methods = object.getClass().getMethods(); + for (Method method : methods) { + String mn = method.getName(); + if (mn.startsWith("set")) { + try { + String tmp = mn.substring(3); + String key = tmp.toLowerCase(); + + String property = p.getString(key); + if (property != null) { + Class[] pt = method.getParameterTypes(); + if (pt != null && pt.length > 0) { + String cn = pt[0].getSimpleName(); + Object arg; + if (cn.equals("int") || cn.equals("Integer")) { + arg = Integer.parseInt(property); + } else if (cn.equals("long") || cn.equals("Long")) { + arg = Long.parseLong(property); + } else if (cn.equals("double") || cn.equals("Double")) { + arg = Double.parseDouble(property); + } else if (cn.equals("boolean") || cn.equals("Boolean")) { + arg = Boolean.parseBoolean(property); + } else if (cn.equals("float") || cn.equals("Float")) { + arg = Float.parseFloat(property); + } else if (cn.equals("String")) { + arg = property; + } else { + continue; + } + method.invoke(object, arg); + } + } + } catch (Throwable ignored) { + } + } + } + } +} diff --git a/connectors/rocketmq-connect-email/src/main/java/org/apache/rocketmq/connect/email/helper/EmailHelperClient.java b/connectors/rocketmq-connect-email/src/main/java/org/apache/rocketmq/connect/email/helper/EmailHelperClient.java new file mode 100644 index 00000000..2cf7bcab --- /dev/null +++ b/connectors/rocketmq-connect-email/src/main/java/org/apache/rocketmq/connect/email/helper/EmailHelperClient.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.email.helper; + +import java.io.File; +import java.io.FileWriter; +import java.util.Properties; + +import org.apache.rocketmq.connect.email.config.EmailConstants; +import org.apache.rocketmq.connect.email.config.EmailSinkConfig; +import com.sun.mail.util.MailSSLSocketFactory; + +import javax.activation.DataHandler; +import javax.activation.DataSource; +import javax.activation.FileDataSource; +import javax.mail.Authenticator; +import javax.mail.BodyPart; +import javax.mail.Message; +import javax.mail.Multipart; +import javax.mail.Session; +import javax.mail.PasswordAuthentication; +import javax.mail.Transport; +import javax.mail.internet.InternetAddress; +import javax.mail.internet.MimeBodyPart; +import javax.mail.internet.MimeMessage; +import javax.mail.internet.MimeMultipart; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.openmessaging.connector.api.data.ConnectRecord; + +public class EmailHelperClient { + protected final Logger LOGGER = LoggerFactory.getLogger(EmailHelperClient.class); + + private final EmailSinkConfig config; + + private Session mailSession; + + public EmailHelperClient(EmailSinkConfig config) { + this.config = config; + createSession(this.config); + } + + private void createSession(EmailSinkConfig config) { + Properties properties = new Properties(); + properties.setProperty("mail.host", config.getHost()); + properties.setProperty("mail.transport.protocol", config.getTransportProtocol()); + properties.setProperty("mail.smtp.auth", config.getStmpAuth()); + try { + MailSSLSocketFactory sf = new MailSSLSocketFactory(); + sf.setTrustAllHosts(true); + properties.put("mail.smtp.ssl.enable", "true"); + properties.put("mail.smtp.ssl.socketFactory", sf); + this.mailSession = Session.getDefaultInstance( + properties, + new Authenticator() { + @Override + protected PasswordAuthentication getPasswordAuthentication() { + return new PasswordAuthentication( + config.getFromAddress(), + config.getPassword()); + } + }); + LOGGER.info("Create email session successfully...."); + } catch (Exception e) { + throw new RuntimeException("Create email failed", e); + } + } + + /** + * Parse the record as a email and send it + * + * @param record the record to be sent + */ + public void send(ConnectRecord record) { + createAttachment(record); + try { + MimeMessage emailMessage = new MimeMessage(mailSession); + + // Set the sender email address + emailMessage.setFrom(new InternetAddress(config.getFromAddress())); + + // Set the recipient email address + emailMessage.addRecipient( + Message.RecipientType.TO, new InternetAddress(config.getToAddress())); + + // Set the email subject + emailMessage.setSubject(config.getSubject()); + + + // Create message with attachment + Multipart multipart = new MimeMultipart(); + + // Set the email content + BodyPart contentBodyPart = new MimeBodyPart(); + contentBodyPart.setText(config.getContent()); + multipart.addBodyPart(contentBodyPart); + + // Set the email attachment + BodyPart fileBodyPart = new MimeBodyPart(); + DataSource source = new FileDataSource(EmailConstants.ATTACHMENT_FILE_NAME); + fileBodyPart.setDataHandler(new DataHandler(source)); + fileBodyPart.setFileName(EmailConstants.ATTACHMENT_FILE_NAME); + multipart.addBodyPart(fileBodyPart); + + emailMessage.setContent(multipart); + + // Send a email + Transport.send(emailMessage); + LOGGER.info("Sent message successfully...."); + } catch (Exception e) { + throw new RuntimeException("Send message failed", e); + } + } + + /** + * Create attachment file + * + * @param record the record to be sent + */ + public void createAttachment(ConnectRecord record) { + try { + String payload = record.getKey() + "," + record.getData().toString(); + + File attachment = new File(EmailConstants.ATTACHMENT_FILE_NAME); + if (!attachment.exists()) { + attachment.createNewFile(); + } + // If the attachment exists, overwrite it + FileWriter writer = new FileWriter(attachment.getName()); + writer.write(payload); + writer.close(); + LOGGER.info("Create attachment successfully..."); + + } catch (Exception e) { + throw new RuntimeException("Create attachment file failed", e); + } + + } +} diff --git a/connectors/rocketmq-connect-email/src/main/java/org/apache/rocketmq/connect/email/sink/EmailSinkConnector.java b/connectors/rocketmq-connect-email/src/main/java/org/apache/rocketmq/connect/email/sink/EmailSinkConnector.java new file mode 100644 index 00000000..81d29a5f --- /dev/null +++ b/connectors/rocketmq-connect-email/src/main/java/org/apache/rocketmq/connect/email/sink/EmailSinkConnector.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.email.sink; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.rocketmq.connect.email.config.EmailSinkConfig; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.Task; +import io.openmessaging.connector.api.component.task.sink.SinkConnector; + +public class EmailSinkConnector extends SinkConnector { + + private KeyValue keyValue; + + @Override + public void start(KeyValue keyValue) { + for (String requestKey : EmailSinkConfig.SINK_REQUEST_CONFIG) { + if (!keyValue.containsKey(requestKey)) { + throw new RuntimeException("Request config key: " + requestKey); + } + } + + this.keyValue = keyValue; + } + + @Override + public void stop() { + this.keyValue = null; + } + + @Override + public List taskConfigs(int maxTasks) { + List configs = new ArrayList<>(); + for (int i = 0; i < maxTasks; i++) { + configs.add(this.keyValue); + } + return configs; + } + + @Override + public Class taskClass() { + return EmailSinkTask.class; + } +} diff --git a/connectors/rocketmq-connect-email/src/main/java/org/apache/rocketmq/connect/email/sink/EmailSinkTask.java b/connectors/rocketmq-connect-email/src/main/java/org/apache/rocketmq/connect/email/sink/EmailSinkTask.java new file mode 100644 index 00000000..08d766d6 --- /dev/null +++ b/connectors/rocketmq-connect-email/src/main/java/org/apache/rocketmq/connect/email/sink/EmailSinkTask.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.email.sink; + +import java.util.List; + +import org.apache.rocketmq.connect.email.config.EmailSinkConfig; +import org.apache.rocketmq.connect.email.helper.EmailHelperClient; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.sink.SinkTask; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.errors.ConnectException; + +public class EmailSinkTask extends SinkTask { + + private EmailSinkConfig config; + private EmailHelperClient emailClient; + + public EmailSinkConfig getConfig() { + return config; + } + + public EmailHelperClient getEmailClient() { + return emailClient; + } + + @Override + public void start(KeyValue keyValue) { + this.config = new EmailSinkConfig(); + this.config.load(keyValue); + this.emailClient = new EmailHelperClient(this.config); + } + + @Override + public void stop() { + this.emailClient = null; + } + + @Override + public void put(List sinkRecords) throws ConnectException { + if (sinkRecords == null || sinkRecords.isEmpty()) { + return; + } + + for (ConnectRecord record : sinkRecords) { + this.emailClient.send(record); + } + } + +} diff --git a/connectors/rocketmq-connect-email/src/test/java/org/apache/rocketmq/connect/email/EmailSInkTaskTest.java b/connectors/rocketmq-connect-email/src/test/java/org/apache/rocketmq/connect/email/EmailSInkTaskTest.java new file mode 100644 index 00000000..242c2057 --- /dev/null +++ b/connectors/rocketmq-connect-email/src/test/java/org/apache/rocketmq/connect/email/EmailSInkTaskTest.java @@ -0,0 +1,63 @@ +package org.apache.rocketmq.connect.email; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.rocketmq.connect.email.config.EmailConstants; +import org.apache.rocketmq.connect.email.sink.EmailSinkTask; +import org.junit.Assert; +import org.junit.Test; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.data.Schema; +import io.openmessaging.connector.api.data.SchemaBuilder; +import io.openmessaging.internal.DefaultKeyValue; + +public class EmailSInkTaskTest { + + // Replace these constants with your own configuration + private static final String fromAddress = "xxxxxx@qq.com"; + private static final String toAddress = "xxxxx@163.com"; + private static final String host = "smtp.qq.com"; + private static final String password = "******"; + private static final String subject = "Report statistics"; + private static final String content = "This is a test email"; + private static final String transportProtocol = "smtp"; + private static final String stmpAuth = "true"; + + @Test + public void testTask() { + // Init Task + KeyValue keyValue = new DefaultKeyValue(); + keyValue.put(EmailConstants.EMAIL_FROM_ADDRESS, fromAddress); + keyValue.put(EmailConstants.EMAIL_TO_ADDRESS, toAddress); + keyValue.put(EmailConstants.EMAIL_HOST, host); + keyValue.put(EmailConstants.EMAIL_PASSWORD, password); + keyValue.put(EmailConstants.EMAIL_SUBJECT, subject); + keyValue.put(EmailConstants.EMAIL_CONTENT, content); + keyValue.put(EmailConstants.EMAIL_TRANSPORT_PROTOCOL, transportProtocol); + keyValue.put(EmailConstants.EMAIL_SMTP_AUTH, stmpAuth); + + EmailSinkTask task = new EmailSinkTask(); + task.start(keyValue); + + Assert.assertNotNull(task.getConfig()); + Assert.assertNotNull(task.getEmailClient()); + + // Construct records + List records = new ArrayList<>(); + Schema stringSchema = SchemaBuilder.string().build(); + for (int i = 0; i < 4; i++) { + ConnectRecord record = new ConnectRecord(null, null, System.currentTimeMillis(), stringSchema, "key" + i, + stringSchema, "value" + i); + records.add(record); + } + + // Put records + task.put(records); + + task.stop(); + Assert.assertNull(task.getEmailClient()); + } +}