Skip to content

Commit

Permalink
[ISSUE #492]optimize jdbc connector & fixed bug (#493)
Browse files Browse the repository at this point in the history
* fixed null pointer exception #294

* Fix invalid offset submitted by sinktask #310

* optimize jdbc connector

* fixed

* remove unused import
  • Loading branch information
sunxiaojian committed May 15, 2023
1 parent 517bcbb commit 5b254a0
Show file tree
Hide file tree
Showing 87 changed files with 2,873 additions and 4,557 deletions.
48 changes: 27 additions & 21 deletions connectors/rocketmq-connect-jdbc/README.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
# rocketmq-connect-jdbc

为方便扩展,rocketmq-connect-jdbc目前采用模块化插件的形式进行组织,内部的connector需要用户手动编译成jar包来进行使用。目前支持Mysql、OpenMLDB等数据库
为方便扩展,rocketmq-connect-jdbc目前采用Spi插件的形式进行扩展,核心扩展api主要有:
```SPI api
org.apache.rocketmq.connect.jdbc.dialect.DatabaseDialectFactory
org.apache.rocketmq.connect.jdbc.dialect.DatabaseDialect
```
目前支持Mysql、OpenMLDB数据库, pg、oracle、sqlserver、db2 等关系型数据库还在持续扩展中

## rocketmq-connect-jdbc使用方法

1. 进入想要使用的connector
目录下(以rocketmq-connect-jdbc-mysql目录为例),使用以下指令将此connector打包为jar文件

1. 进入想要使用的connectors目录下(以rocketmq-connect-jdbc目录为例),使用以下指令将插件进行打包
```shell
mvn clean package -Dmaven.test.skip=true
```
2. 打包好的jar文件将出现在`rocketmq-connect-jdbc-mysql/target/`目录下
2. 打包好的插件以tar.gz的模式出现在`rocketmq-connect-jdbc/target/`目录下

3.`distribution/conf`目录下找的对应的配置文件进行更新,对于standalone的启动方式,更新`connect-standalone.conf`文件中的`pluginPaths`变量

```lombok.config
pluginPaths=rocketmq-connect-sample/target/rocketmq-connect-jdbc-mysql-0.0.1-SNAPSHOT-jar-with-dependencies.jar
pluginPaths=(you plugin path)
```

相应的,使用distributed启动方式,则更新`connect-distributed.conf`中的变量
Expand All @@ -40,7 +44,7 @@ mvn clean package -Dmaven.test.skip=true
```
POST http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-jdbc-source-connector-name}
{
"connector.class":"org.apache.rocketmq.connect.jdbc.mysql.source.BaseSourceConnector",
"connector.class":"org.apache.rocketmq.connect.jdbc.mysql.source.JdbcSourceConnector",
"max.tasks":"2",
"connection.url":"jdbc:mysql://XXXXXXXXX:3306",
"connection.user":"*****",
Expand Down Expand Up @@ -92,7 +96,7 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-jdbc-connector-name}/
* **jdbc-source-connector 参数说明**

| KEY | TYPE | Must be filled | Description | Example |
| ------------------------ | ------- | -------------- | ---------------- | --------------------------------------------------------- |
|--------------------------| ------- | -------------- |------------------| --------------------------------------------------------- |
| connection.url | String | YES | source端 jdbc连接 | jdbc:mysql://XXXXXXXXX:3306 |
| connection.user | String | YES | source端 DB 用户名 | root |
| connection.password | String | YES | source端 DB 密码 | root |
Expand All @@ -104,8 +108,9 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-jdbc-connector-name}/
| incrementing.column.name | Integer | NO | 增量字段,常用ID | id |
| timestamp.column.name | String | YES | 时间增量字段 | modified_time |
| table.whitelist | String | YES | 需要扫描的表 | db.table,db.table01 |
| max-task | Integer | YES | 任务数量,最大不能大于表的数量 | 2 |
| source-record-converter | Integer | YES | data转换器 | org.apache.rocketmq.connect.doris.converter.JsonConverter |
| max.tasks | Integer | YES | 任务数量,最大不能大于表的数量 | 2 |
| key.converter | Integer | YES | key转换器 | org.apache.rocketmq.connect.doris.converter.JsonConverter |
| value.converter | Integer | YES | data转换器 | org.apache.rocketmq.connect.doris.converter.JsonConverter |

```
注:1.source拉取的数据写入到以表名自动创建的topic中,如果需要写入特定的topic中则需要指定"connect-topicname" 参数
Expand All @@ -114,14 +119,15 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-jdbc-connector-name}/

* **jdbc-sink-connector 参数说明**

| KEY | TYPE | Must be filled | Description | Example |
| ----------------------- | ------- | -------------- | --------------- | --------------------------------------------------------- |
| connection.url | String | YES | sink端 jdbc连接 | jdbc:mysql://XXXXXXXXX:3306 |
| connection.user | String | YES | sink端 DB 用户名 | root |
| connection.password | String | YES | sink端 DB 密码 | root |
| host | String | YES | doris host | 192.168.0.1 |
| port | String | YES | doris http port | 8030 |
| user | String | YES | 监听的topic | root |
| passwd | String | YES | 监听的topic | passwd |
| max-task | Integer | NO | 任务数量 | 2 |
| source-record-converter | Integer | YES | data转换器 | org.apache.rocketmq.connect.doris.converter.JsonConverter |
| KEY | TYPE | Must be filled | Description | Example |
|---------------------| ------- | -------------- |-----------------| --------------------------------------------------------- |
| connection.url | String | YES | sink端 jdbc连接 | jdbc:mysql://XXXXXXXXX:3306 |
| connection.user | String | YES | sink端 DB 用户名 | root |
| connection.password | String | YES | sink端 DB 密码 | root |
| host | String | YES | doris host | 192.168.0.1 |
| port | String | YES | doris http port | 8030 |
| user | String | YES | 监听的topic | root |
| passwd | String | YES | 监听的topic | passwd |
| max.tasks | Integer | NO | 任务数量 | 2 |
| key.converter | Integer | YES | key转换器 | org.apache.rocketmq.connect.doris.converter.JsonConverter |
| value.converter | Integer | YES | data转换器 | org.apache.rocketmq.connect.doris.converter.JsonConverter |
204 changes: 166 additions & 38 deletions connectors/rocketmq-connect-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,12 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-connect-jdbc</artifactId>
<packaging>pom</packaging>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>
<modules>
<module>rocketmq-connect-jdbc-core</module>
<module>rocketmq-connect-jdbc-mysql</module>
<module>rocketmq-connect-jdbc-openmldb</module>
</modules>

<name>rocketmq-connect-jdbc</name>
<url>https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-connect-jdbc</url>


<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
Expand Down Expand Up @@ -73,96 +67,211 @@

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-connect-jdbc-core</artifactId>
<version>${project.version}</version>
</dependency>
<!--rocketmq connect api-->
<dependency>
<groupId>io.openmessaging</groupId>
<artifactId>openmessaging-connector</artifactId>
<version>${openmessaging-connector.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.openmessaging</groupId>
<artifactId>openmessaging-api</artifactId>
<version>${openmessaging-api.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>${commons-codec.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>${commons-cli.version}</version>
</dependency>

<!--junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
<scope>provided</scope>
</dependency>

<!--fast json version-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j-api.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback-classic.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback-core.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${debezium.version}</version>
<scope>provided</scope>
</dependency>


<!--junit -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>

<!--db driver jar-->

<!-- mysql -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.30</version>
<scope>provided</scope>
</dependency>

<!--openmldb-->
<dependency>
<groupId>com.4paradigm.openmldb</groupId>
<artifactId>openmldb-native</artifactId>
<version>0.5.0-macos</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.4paradigm.openmldb</groupId>
<artifactId>openmldb-jdbc</artifactId>
<version>0.5.0</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>com.4paradigm.openmldb</groupId>
<artifactId>openmldb-native</artifactId>
</exclusion>
<exclusion>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>io.openmessaging</groupId>
<artifactId>openmessaging-connector</artifactId>
</dependency>
<dependency>
<groupId>io.openmessaging</groupId>
<artifactId>openmessaging-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>

<!--fast json version-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</dependency>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
</dependency>


<!--junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

<!--db driver jar-->

<!-- mysql -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>

<!--openmldb-->
<dependency>
<groupId>com.4paradigm.openmldb</groupId>
<artifactId>openmldb-native</artifactId>
</dependency>
<dependency>
<groupId>com.4paradigm.openmldb</groupId>
<artifactId>openmldb-jdbc</artifactId>
</dependency>

</dependencies>

<build>
<resources>
Expand Down Expand Up @@ -263,6 +372,24 @@
<artifactId>findbugs-maven-plugin</artifactId>
<version>3.0.4</version>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptors>
<descriptor>src/assembly/assembly.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.17</version>
Expand All @@ -271,7 +398,7 @@
<id>verify</id>
<phase>verify</phase>
<configuration>
<configLocation>../../style/rmq_checkstyle.xml</configLocation>
<configLocation>../../../style/rmq_checkstyle.xml</configLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
Expand All @@ -286,4 +413,5 @@
</plugin>
</plugins>
</build>
</project>

</project>
Loading

0 comments on commit 5b254a0

Please sign in to comment.