Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #478] Support ClickHouse source and sink #496

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions connectors/rocketmq-connect-clickhouse/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
##### ClickHouseSourceConnector fully-qualified name
org.apache.rocketmq.connect.clickhouse.source.ClickHouseSourceConnector

**clickhouse-source-connector** start

```
POST http://${runtime-ip}:${runtime-port}/connectors/clickhouseSourceConnector
{
"connector.class":"org.apache.rocketmq.connect.clickhouse.source.ClickHouseSourceConnector",
"clickhousehost":"localhost",
"clickhouseport":8123,
"database":"default",
"username":"default",
"password":"123456",
"table":"tableName",
"topic":"testClickHouseTopic",
"value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
"key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
}
```

##### ClickHouseSinkConnector fully-qualified name
org.apache.rocketmq.connect.clickhouse.sink.ClickHouseSinkConnector

**clickhouse-sink-connector** start

```
POST http://${runtime-ip}:${runtime-port}/connectors/clickhouseSinkConnector
{
"connector.class":"org.apache.rocketmq.connect.clickhouse.sink.ClickHouseSinkConnector",
"clickhousehost":"localhost",
"clickhouseport":8123,
"database":"clickhouse",
"username":"default",
"password":"123456",
"connect.topicnames":"testClickHouseTopic",
"value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
"key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
}
```

##### parameter configuration

| parameter | effect | required | default |
|--------------------|---------------------------------------------------|-------------------|---------|
| clickhousehost | The Host of the Clickhouse server | yes | null |
| clickhouseport | The Port of the Clickhouse server | yes | null |
| database | The database to read or write | yes | null |
| table | The source table to read | yes (source only) | null |
| topic | RocketMQ topic for source connector to write into | yes (source only) | null |
| connect.topicnames | RocketMQ topic for sink connector to read from | yes (sink only) | null |
204 changes: 204 additions & 0 deletions connectors/rocketmq-connect-clickhouse/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-connect-clickhouse</artifactId>
<version>1.0-SNAPSHOT</version>

<name>connect-clickhouse</name>

<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
</license>
</licenses>

<issueManagement>
<system>jira</system>
<url>https://issues.apache.org/jira/browse/RocketMQ</url>
</issueManagement>

<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>versions-maven-plugin</artifactId>
<version>2.3</version>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>clirr-maven-plugin</artifactId>
<version>2.7</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<compilerVersion>${maven.compiler.source}</compilerVersion>
<showDeprecation>true</showDeprecation>
<showWarnings>true</showWarnings>
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19.1</version>
<configuration>
<argLine>-Xms512m -Xmx1024m</argLine>
<forkMode>always</forkMode>
<includes>
<include>**/*Test.java</include>
</includes>
</configuration>
</plugin>
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.6</version>
<configuration>
<locales>en_US</locales>
<outputEncoding>UTF-8</outputEncoding>
<inputEncoding>UTF-8</inputEncoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-source-plugin</artifactId>
<version>3.0.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.10.4</version>
<configuration>
<charset>UTF-8</charset>
<locale>en_US</locale>
<excludePackageNames>io.openmessaging.internal</excludePackageNames>
</configuration>
<executions>
<execution>
<id>aggregate</id>
<goals>
<goal>aggregate</goal>
</goals>
<phase>site</phase>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>
<version>3.0.4</version>
</plugin>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<version>0.12</version>
<configuration>
<excludes>
<exclude>README.md</exclude>
<exclude>README-CN.md</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>io.openmessaging</groupId>
<artifactId>openmessaging-connector</artifactId>
<version>0.1.4</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.4.5</version>
<!-- use uber jar with all dependencies included, change classifier to http for smaller jar -->
<classifier>all</classifier>
</dependency>

<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<version>1.8.0</version>
</dependency>

<dependency>
<groupId>com.clickhouse</groupId>
<!-- or clickhouse-grpc-client if you prefer gRPC -->
<artifactId>clickhouse-http-client</artifactId>
<version>0.4.5</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.connect.clickhouse.config;

import io.openmessaging.KeyValue;
import java.lang.reflect.Method;

public class ClickHouseBaseConfig {

private String clickHouseHost;

private Integer clickHousePort;

private String database;

private String userName;

private String passWord;

private String accessToken;

private String topic;

public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
}

public String getClickHouseHost() {
return clickHouseHost;
}

public void setClickHouseHost(String clickHouseHost) {
this.clickHouseHost = clickHouseHost;
}

public Integer getClickHousePort() {
return clickHousePort;
}

public void setClickHousePort(Integer clickHousePort) {
this.clickHousePort = clickHousePort;
}

public String getUserName() {
return userName;
}

public void setUserName(String userName) {
this.userName = userName;
}

public String getPassWord() {
return passWord;
}

public void setPassWord(String passWord) {
this.passWord = passWord;
}

public String getAccessToken() {
return accessToken;
}

public void setAccessToken(String accessToken) {
this.accessToken = accessToken;
}

public String getDatabase() {
return database;
}

public void setDatabase(String database) {
this.database = database;
}

public void load(KeyValue props) {
properties2Object(props, this);
}

private void properties2Object(final KeyValue p, final Object object) {

Method[] methods = object.getClass().getMethods();
for (Method method : methods) {
String mn = method.getName();
if (mn.startsWith("set")) {
try {
String tmp = mn.substring(3);
String key = tmp.toLowerCase();

String property = p.getString(key);
if (property != null) {
Class<?>[] pt = method.getParameterTypes();
if (pt != null && pt.length > 0) {
String cn = pt[0].getSimpleName();
Object arg;
if (cn.equals("int") || cn.equals("Integer")) {
arg = Integer.parseInt(property);
} else if (cn.equals("long") || cn.equals("Long")) {
arg = Long.parseLong(property);
} else if (cn.equals("double") || cn.equals("Double")) {
arg = Double.parseDouble(property);
} else if (cn.equals("boolean") || cn.equals("Boolean")) {
arg = Boolean.parseBoolean(property);
} else if (cn.equals("float") || cn.equals("Float")) {
arg = Float.parseFloat(property);
} else if (cn.equals("String")) {
arg = property;
} else {
continue;
}
method.invoke(object, arg);
}
}
} catch (Throwable ignored) {
}
}
}
}
}
Loading