From 5117ede982c455d1aa744ab265871d1e3aad3524 Mon Sep 17 00:00:00 2001 From: joeCarf Date: Fri, 5 May 2023 17:23:10 +0800 Subject: [PATCH 1/8] initial commit --- .../rocketmq-connect-clickhouse/pom.xml | 75 +++++++ .../connector/config/ClickHouseConstants.java | 32 +++ .../connector/config/ClickhouseConfig.java | 137 +++++++++++++ .../sink/ClickHouseSinkConnector.java | 41 ++++ .../connector/sink/ClickHouseSinkTask.java | 188 ++++++++++++++++++ .../source/ClickHouseSourceConnector.java | 58 ++++++ .../source/ClickHouseSourceTask.java | 47 +++++ .../sink/ClickHouseSinkTaskTest.java | 64 ++++++ 8 files changed, 642 insertions(+) create mode 100644 connectors/rocketmq-connect-clickhouse/pom.xml create mode 100644 connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/config/ClickHouseConstants.java create mode 100644 connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/config/ClickhouseConfig.java create mode 100644 connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/sink/ClickHouseSinkConnector.java create mode 100644 connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/sink/ClickHouseSinkTask.java create mode 100644 connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/source/ClickHouseSourceConnector.java create mode 100644 connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/source/ClickHouseSourceTask.java create mode 100644 connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/connector/sink/ClickHouseSinkTaskTest.java diff --git a/connectors/rocketmq-connect-clickhouse/pom.xml b/connectors/rocketmq-connect-clickhouse/pom.xml new file mode 100644 index 00000000..8e56aa5a --- /dev/null +++ b/connectors/rocketmq-connect-clickhouse/pom.xml @@ -0,0 +1,75 @@ + + + + + 4.0.0 + + org.apache.rocketmq + rocketmq-connect-clickhouse + 1.0-SNAPSHOT + + connect-clickhouse + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + jira + https://issues.apache.org/jira/browse/RocketMQ + + + + 8 + 8 + UTF-8 + + + + io.openmessaging + openmessaging-connector + 0.1.4 + compile + + + + org.lz4 + lz4-java + 1.8.0 + + + + com.clickhouse + + clickhouse-http-client + 0.4.5 + + + com.alibaba + fastjson + 1.2.83 + compile + + + junit + junit + RELEASE + test + + + + \ No newline at end of file diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/config/ClickHouseConstants.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/config/ClickHouseConstants.java new file mode 100644 index 00000000..bcab5f9a --- /dev/null +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/config/ClickHouseConstants.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.clickhouse.connector.config; + +public class ClickHouseConstants { + public static final String CLICKHOUSE_HOST = "clickhousehost"; + + public static final String CLICKHOUSE_PORT = "clickhouseport"; + + public static final String CLICKHOUSE_DATABASE = "clickhousedatabase"; + + public static final String CLICKHOUSE_USERNAME = "username"; + + public static final String CLICKHOUSE_PASSWORD = "password"; + + public static final String CLICKHOUSE_ACCESSTOKEN = "accesstoken"; +} diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/config/ClickhouseConfig.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/config/ClickhouseConfig.java new file mode 100644 index 00000000..f65a9fd3 --- /dev/null +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/config/ClickhouseConfig.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.clickhouse.connector.config; + +import io.openmessaging.KeyValue; +import java.lang.reflect.Method; +import java.util.HashSet; +import java.util.Set; + +public class ClickhouseConfig { + + public static final Set REQUEST_CONFIG = new HashSet() { + { + add(ClickHouseConstants.CLICKHOUSE_HOST); + add(ClickHouseConstants.CLICKHOUSE_PORT); + } + }; + + private String clickHouseHost; + + private String clickHousePort; + + private String database; + + private String userName; + + private String passWord; + + private String accessToken; + + public String getClickHouseHost() { + return clickHouseHost; + } + + public void setClickHouseHost(String clickHouseHost) { + this.clickHouseHost = clickHouseHost; + } + + public String getClickHousePort() { + return clickHousePort; + } + + public void setClickHousePort(String clickHousePort) { + this.clickHousePort = clickHousePort; + } + + public String getUserName() { + return userName; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + public String getPassWord() { + return passWord; + } + + public void setPassWord(String passWord) { + this.passWord = passWord; + } + + public String getAccessToken() { + return accessToken; + } + + public void setAccessToken(String accessToken) { + this.accessToken = accessToken; + } + + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public void load(KeyValue props) { + properties2Object(props, this); + } + + private void properties2Object(final KeyValue p, final Object object) { + + Method[] methods = object.getClass().getMethods(); + for (Method method : methods) { + String mn = method.getName(); + if (mn.startsWith("set")) { + try { + String tmp = mn.substring(3); + String key = tmp.toLowerCase(); + + String property = p.getString(key); + if (property != null) { + Class[] pt = method.getParameterTypes(); + if (pt != null && pt.length > 0) { + String cn = pt[0].getSimpleName(); + Object arg; + if (cn.equals("int") || cn.equals("Integer")) { + arg = Integer.parseInt(property); + } else if (cn.equals("long") || cn.equals("Long")) { + arg = Long.parseLong(property); + } else if (cn.equals("double") || cn.equals("Double")) { + arg = Double.parseDouble(property); + } else if (cn.equals("boolean") || cn.equals("Boolean")) { + arg = Boolean.parseBoolean(property); + } else if (cn.equals("float") || cn.equals("Float")) { + arg = Float.parseFloat(property); + } else if (cn.equals("String")) { + arg = property; + } else { + continue; + } + method.invoke(object, arg); + } + } + } catch (Throwable ignored) { + } + } + } + } +} diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/sink/ClickHouseSinkConnector.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/sink/ClickHouseSinkConnector.java new file mode 100644 index 00000000..44587098 --- /dev/null +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/sink/ClickHouseSinkConnector.java @@ -0,0 +1,41 @@ +package org.apache.rocketmq.connect.clickhouse.connector.sink; + + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.Task; +import io.openmessaging.connector.api.component.task.sink.SinkConnector; +import java.util.ArrayList; +import java.util.List; +import org.apache.rocketmq.connect.clickhouse.connector.config.ClickhouseConfig; + +public class ClickHouseSinkConnector extends SinkConnector { + + private KeyValue keyValue; + + @Override public List taskConfigs(int maxTasks) { + List configs = new ArrayList<>(); + for (int i = 0; i < maxTasks; i++) { + configs.add(this.keyValue); + } + return configs; + } + + @Override public Class taskClass() { + return ClickHouseSinkTask.class; + } + + @Override public void start(KeyValue value) { + + for (String requestKey : ClickhouseConfig.REQUEST_CONFIG) { + if (!value.containsKey(requestKey)) { + throw new RuntimeException("Request config key: " + requestKey); + } + } + + this.keyValue = value; + } + + @Override public void stop() { + this.keyValue = null; + } +} diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/sink/ClickHouseSinkTask.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/sink/ClickHouseSinkTask.java new file mode 100644 index 00000000..502db495 --- /dev/null +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/sink/ClickHouseSinkTask.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.clickhouse.connector.sink; + +import com.alibaba.fastjson.JSONObject; +import com.clickhouse.client.ClickHouseClient; +import com.clickhouse.client.ClickHouseConfig; +import com.clickhouse.client.ClickHouseCredentials; +import com.clickhouse.client.ClickHouseException; +import com.clickhouse.client.ClickHouseNode; +import com.clickhouse.client.ClickHouseProtocol; +import com.clickhouse.client.ClickHouseRequest; +import com.clickhouse.client.ClickHouseResponse; +import com.clickhouse.client.ClickHouseResponseSummary; +import com.clickhouse.client.config.ClickHouseClientOption; + +import com.clickhouse.data.ClickHouseDataStreamFactory; +import com.clickhouse.data.ClickHouseFormat; +import com.clickhouse.data.ClickHousePipedOutputStream; +import com.clickhouse.data.format.BinaryStreamUtils; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.sink.SinkTask; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.data.Field; +import io.openmessaging.connector.api.data.Struct; +import io.openmessaging.connector.api.errors.ConnectException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import org.apache.rocketmq.connect.clickhouse.connector.config.ClickhouseConfig; + +public class ClickHouseSinkTask extends SinkTask { + + public ClickhouseConfig config; + + private ClickHouseNode server; + + @Override public void put(List sinkRecords) throws ConnectException { + if (sinkRecords == null || sinkRecords.size() < 1) { + return; + } + + try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol())) { + boolean pingOK = client.ping(server, 30000); + if (!pingOK) { + throw new RuntimeException("Cannot connect to clickhouse server!"); + } + + + for (ConnectRecord record : sinkRecords) { + + String table = record.getSchema().getName(); + ClickHouseRequest.Mutation request = client.connect(server) + .write() + .table(table) + .format(ClickHouseFormat.JSONEachRow); + + ClickHouseConfig config = request.getConfig(); + request.option(ClickHouseClientOption.WRITE_BUFFER_SIZE, 8192); + try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance() + .createPipedOutputStream(config, (Runnable) null)) { + CompletableFuture future = request.data(stream.getInputStream()).execute(); + + final List fields = record.getSchema().getFields(); + final Struct structData = (Struct) record.getData(); + JSONObject object = new JSONObject(); + for (Field field : fields) { + object.put(field.getName(), structData.get(field)); + } + BinaryStreamUtils.writeBytes(stream, object.toJSONString().getBytes(StandardCharsets.UTF_8)); + + try (ClickHouseResponse response = future.get()) { + ClickHouseResponseSummary summary = response.getSummary(); + + } + } + + } + +// ClickHouseRequest.Mutation request = client.connect(server).write().table("table") +// .format(ClickHouseFormat.RowBinary); +// ClickHouseConfig config = request.getConfig(); +// CompletableFuture future; +// // back-pressuring is not supported, you can adjust the first two arguments +// try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance() +// .createPipedOutputStream(config, (Runnable) null)) { +// // in async mode, which is default, execution happens in a worker thread +// future = request.data(stream.getInputStream()).execute(); +// +// // writing happens in main thread +// for (int i = 0; i < 10_000; i++) { +// BinaryStreamUtils.writeString(stream, String.valueOf(i % 16)); +// BinaryStreamUtils.writeNonNull(stream); +// BinaryStreamUtils.writeString(stream, UUID.randomUUID().toString()); +// } +// } +// +// // response should be always closed +// try (ClickHouseResponse response = future.get()) { +// ClickHouseResponseSummary summary = response.getSummary(); +//// return summary.getWrittenRows(); +// } + } catch(InterruptedException e){ + Thread.currentThread().interrupt(); + try { + throw ClickHouseException.forCancellation(e, server); + } catch (ClickHouseException ex) { + throw new RuntimeException(ex); + } + } catch(Exception e){ + try { + throw ClickHouseException.of(e, server); + } catch (ClickHouseException ex) { + throw new RuntimeException(ex); + } + } + + } + + @Override public void start (KeyValue keyValue){ + this.config = new ClickhouseConfig(); + this.config.load(keyValue); + + this.server = ClickHouseNode.builder() + .host(config.getClickHouseHost()) + .port(ClickHouseProtocol.HTTP, Integer.valueOf(config.getClickHousePort())) + .database(config.getDatabase()).credentials(getCredentials(config)) + .build(); + + ClickHouseClient clientPing = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP); + boolean pingOK = clientPing.ping(server, 30000); +// if (!pingOK) { +// throw new RuntimeException("Cannot connect to clickhouse server!"); +// } +// try { +// dropAndCreateTable(server, "tableName"); +// } catch (ClickHouseException e) { +// e.printStackTrace(); +// } + + } + + void dropAndCreateTable(ClickHouseNode server, String table) throws ClickHouseException { + try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol())) { + ClickHouseRequest request = client.connect(server); + // or use future chaining + request.query("drop table if exists " + table).execute().get(); + request.query("create table " + table + "(a String, b Nullable(String)) engine=MergeTree() order by a") + .execute().get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw ClickHouseException.forCancellation(e, server); + } catch (ExecutionException e) { + throw ClickHouseException.of(e, server); + } + } + + private ClickHouseCredentials getCredentials (ClickhouseConfig config){ + if (config.getUserName() != null && config.getPassWord() != null) { + return ClickHouseCredentials.fromUserAndPassword(config.getUserName(), config.getPassWord()); + } + if (config.getAccessToken() != null) { + return ClickHouseCredentials.fromAccessToken(config.getAccessToken()); + } + throw new RuntimeException("Credentials cannot be empty!"); + + } + + @Override public void stop () { + this.server = null; + } + } diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/source/ClickHouseSourceConnector.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/source/ClickHouseSourceConnector.java new file mode 100644 index 00000000..3e4c1fb8 --- /dev/null +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/source/ClickHouseSourceConnector.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.clickhouse.connector.source; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.Task; +import io.openmessaging.connector.api.component.task.source.SourceConnector; +import java.util.ArrayList; +import java.util.List; +import org.apache.rocketmq.connect.clickhouse.connector.config.ClickhouseConfig; + +public class ClickHouseSourceConnector extends SourceConnector { + + private KeyValue keyValue; + + private ClickhouseConfig config; + @Override public List taskConfigs(int maxTasks) { + List configs = new ArrayList<>(); + for (int i = 0; i < maxTasks; i++) { + configs.add(this.keyValue); + } + return configs; + } + + @Override public Class taskClass() { + return ClickHouseSourceTask.class; + } + + @Override public void start(KeyValue config) { + + for (String requestKey : ClickhouseConfig.REQUEST_CONFIG) { + if (!config.containsKey(requestKey)) { + throw new RuntimeException("Request config key: " + requestKey); + } + } + this.keyValue = config; + + } + + @Override public void stop() { + this.keyValue = null; + } +} diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/source/ClickHouseSourceTask.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/source/ClickHouseSourceTask.java new file mode 100644 index 00000000..5d40e0ab --- /dev/null +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/source/ClickHouseSourceTask.java @@ -0,0 +1,47 @@ +package org.apache.rocketmq.connect.clickhouse.connector.source; + +import com.clickhouse.client.ClickHouseCredentials; +import com.clickhouse.client.ClickHouseNode; +import com.clickhouse.client.ClickHouseProtocol; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.source.SourceTask; +import io.openmessaging.connector.api.data.ConnectRecord; +import java.util.List; +import org.apache.rocketmq.connect.clickhouse.connector.config.ClickhouseConfig; + +public class ClickHouseSourceTask extends SourceTask { + + private ClickhouseConfig config; + + private ClickHouseNode server; + + @Override public List poll() throws InterruptedException { + return null; + } + + @Override public void start(KeyValue keyValue) { + this.config.load(keyValue); + + this.server = ClickHouseNode.builder() + .host(config.getClickHouseHost()) + .port(ClickHouseProtocol.HTTP, Integer.valueOf(config.getClickHousePort())) + .database(config.getDatabase()).credentials(getCredentials(config)) + .build(); + + } + + private ClickHouseCredentials getCredentials (ClickhouseConfig config){ + if (config.getUserName() != null && config.getPassWord() != null) { + return ClickHouseCredentials.fromUserAndPassword(config.getUserName(), config.getPassWord()); + } + if (config.getAccessToken() != null) { + return ClickHouseCredentials.fromAccessToken(config.getAccessToken()); + } + throw new RuntimeException("Credentials cannot be empty!"); + + } + + @Override public void stop() { + this.server = null; + } +} diff --git a/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/connector/sink/ClickHouseSinkTaskTest.java b/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/connector/sink/ClickHouseSinkTaskTest.java new file mode 100644 index 00000000..786490f6 --- /dev/null +++ b/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/connector/sink/ClickHouseSinkTaskTest.java @@ -0,0 +1,64 @@ +package org.apache.rocketmq.connect.clickhouse.connector.sink; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.data.RecordOffset; +import io.openmessaging.connector.api.data.RecordPartition; +import io.openmessaging.connector.api.data.Schema; +import io.openmessaging.connector.api.data.SchemaBuilder; +import io.openmessaging.connector.api.data.Struct; +import io.openmessaging.internal.DefaultKeyValue; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.connect.clickhouse.connector.config.ClickHouseConstants; + + +class ClickHouseSinkTaskTest { + + private static final String host = "120.48.26.195"; + private static final String port = "8123"; + private static final String db = "default"; + private static final String username = "default"; + private static final String password = "123456"; + + + + public static void main(String[] args) { + List records = new ArrayList<>(); + // build schema + Schema schema = SchemaBuilder.struct() + .name("tableName") + .field("c1",SchemaBuilder.int32().build()) + .field("c2", SchemaBuilder.string().build()) + .build(); + // build record + int param0 = 1001; + Struct struct= new Struct(schema); + struct.put("c1",param0); + struct.put("c2",String.format("test-data-%s", param0)); + + ConnectRecord record = new ConnectRecord( + // offset partition + // offset partition" + new RecordPartition(new ConcurrentHashMap<>()), + new RecordOffset(new HashMap<>()), + System.currentTimeMillis(), + schema, + struct + ); + records.add(record); + ClickHouseSinkTask task = new ClickHouseSinkTask(); + KeyValue config = new DefaultKeyValue(); + config.put(ClickHouseConstants.CLICKHOUSE_HOST, host); + config.put(ClickHouseConstants.CLICKHOUSE_PORT, port); + config.put(ClickHouseConstants.CLICKHOUSE_DATABASE, db); + config.put(ClickHouseConstants.CLICKHOUSE_USERNAME, username); + config.put(ClickHouseConstants.CLICKHOUSE_PASSWORD, password); + task.start(config); + task.put(records); + + } + +} \ No newline at end of file From 25e7ba5215d32f145efa49889345c7f77cd602aa Mon Sep 17 00:00:00 2001 From: joeCarf Date: Fri, 5 May 2023 19:53:05 +0800 Subject: [PATCH 2/8] commit --- .../rocketmq-connect-clickhouse/pom.xml | 6 +++ .../connector/sink/ClickHouseSinkTask.java | 45 ++++++++++++++++--- .../source/ClickHouseSourceTask.java | 27 +++++++++++ 3 files changed, 73 insertions(+), 5 deletions(-) diff --git a/connectors/rocketmq-connect-clickhouse/pom.xml b/connectors/rocketmq-connect-clickhouse/pom.xml index 8e56aa5a..2f3d554b 100644 --- a/connectors/rocketmq-connect-clickhouse/pom.xml +++ b/connectors/rocketmq-connect-clickhouse/pom.xml @@ -70,6 +70,12 @@ RELEASE test + + com.google.code.gson + gson + 2.2.4 + compile + \ No newline at end of file diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/sink/ClickHouseSinkTask.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/sink/ClickHouseSinkTask.java index 502db495..da6f21cf 100644 --- a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/sink/ClickHouseSinkTask.java +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/sink/ClickHouseSinkTask.java @@ -32,7 +32,10 @@ import com.clickhouse.data.ClickHouseDataStreamFactory; import com.clickhouse.data.ClickHouseFormat; import com.clickhouse.data.ClickHousePipedOutputStream; +import com.clickhouse.data.ClickHouseRecord; import com.clickhouse.data.format.BinaryStreamUtils; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; import io.openmessaging.KeyValue; import io.openmessaging.connector.api.component.task.sink.SinkTask; import io.openmessaging.connector.api.data.ConnectRecord; @@ -40,7 +43,9 @@ import io.openmessaging.connector.api.data.Struct; import io.openmessaging.connector.api.errors.ConnectException; import java.nio.charset.StandardCharsets; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.apache.rocketmq.connect.clickhouse.connector.config.ClickhouseConfig; @@ -56,13 +61,15 @@ public class ClickHouseSinkTask extends SinkTask { return; } + + try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol())) { + boolean pingOK = client.ping(server, 30000); if (!pingOK) { throw new RuntimeException("Cannot connect to clickhouse server!"); } - for (ConnectRecord record : sinkRecords) { String table = record.getSchema().getName(); @@ -79,12 +86,19 @@ public class ClickHouseSinkTask extends SinkTask { final List fields = record.getSchema().getFields(); final Struct structData = (Struct) record.getData(); + Gson gson = new Gson(); + Map data = new HashMap<>(); + java.lang.reflect.Type gsonType = new TypeToken(){}.getType(); + + JSONObject object = new JSONObject(); for (Field field : fields) { object.put(field.getName(), structData.get(field)); + data.put(field.getName(), structData.get(field).toString()); } - BinaryStreamUtils.writeBytes(stream, object.toJSONString().getBytes(StandardCharsets.UTF_8)); - + String gsonString = gson.toJson(data,gsonType); +// BinaryStreamUtils.writeBytes(stream, object.toJSONString().getBytes(StandardCharsets.UTF_8)); + BinaryStreamUtils.writeBytes(stream, gsonString.getBytes(StandardCharsets.UTF_8)); try (ClickHouseResponse response = future.get()) { ClickHouseResponseSummary summary = response.getSummary(); @@ -133,6 +147,27 @@ public class ClickHouseSinkTask extends SinkTask { } +// static int query(ClickHouseNode server, String table) throws ClickHouseException { +// try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol()); +// ClickHouseResponse response = client.read(server) +// // prefer to use RowBinaryWithNamesAndTypes as it's fully supported +// // see details at https://github.com/ClickHouse/clickhouse-java/issues/928 +// .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) +// .query("select * from " + table).execute().get()) { +// int count = 0; +// // or use stream API via response.stream() +// for (ClickHouseRecord r : response.records()) { +// count++; +// } +// return count; +// } catch (InterruptedException e) { +// Thread.currentThread().interrupt(); +// throw ClickHouseException.forCancellation(e, server); +// } catch (ExecutionException e) { +// throw ClickHouseException.of(e, server); +// } +// } + @Override public void start (KeyValue keyValue){ this.config = new ClickhouseConfig(); this.config.load(keyValue); @@ -143,8 +178,8 @@ public class ClickHouseSinkTask extends SinkTask { .database(config.getDatabase()).credentials(getCredentials(config)) .build(); - ClickHouseClient clientPing = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP); - boolean pingOK = clientPing.ping(server, 30000); +// ClickHouseClient clientPing = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP); +// boolean pingOK = clientPing.ping(server, 30000); // if (!pingOK) { // throw new RuntimeException("Cannot connect to clickhouse server!"); // } diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/source/ClickHouseSourceTask.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/source/ClickHouseSourceTask.java index 5d40e0ab..0451e69a 100644 --- a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/source/ClickHouseSourceTask.java +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/source/ClickHouseSourceTask.java @@ -1,12 +1,18 @@ package org.apache.rocketmq.connect.clickhouse.connector.source; +import com.clickhouse.client.ClickHouseClient; import com.clickhouse.client.ClickHouseCredentials; +import com.clickhouse.client.ClickHouseException; import com.clickhouse.client.ClickHouseNode; import com.clickhouse.client.ClickHouseProtocol; +import com.clickhouse.client.ClickHouseResponse; +import com.clickhouse.data.ClickHouseFormat; +import com.clickhouse.data.ClickHouseRecord; import io.openmessaging.KeyValue; import io.openmessaging.connector.api.component.task.source.SourceTask; import io.openmessaging.connector.api.data.ConnectRecord; import java.util.List; +import java.util.concurrent.ExecutionException; import org.apache.rocketmq.connect.clickhouse.connector.config.ClickhouseConfig; public class ClickHouseSourceTask extends SourceTask { @@ -19,6 +25,27 @@ public class ClickHouseSourceTask extends SourceTask { return null; } + int query(ClickHouseNode server, String table) throws ClickHouseException { + try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol()); + ClickHouseResponse response = client.read(server) + // prefer to use RowBinaryWithNamesAndTypes as it's fully supported + // see details at https://github.com/ClickHouse/clickhouse-java/issues/928 + .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) + .query("select * from " + table).execute().get()) { + int count = 0; + // or use stream API via response.stream() + for (ClickHouseRecord r : response.records()) { + count++; + } + return count; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw ClickHouseException.forCancellation(e, server); + } catch (ExecutionException e) { + throw ClickHouseException.of(e, server); + } + } + @Override public void start(KeyValue keyValue) { this.config.load(keyValue); From 54c2517318194a9301f88f200070d469a4b29642 Mon Sep 17 00:00:00 2001 From: joeCarf Date: Fri, 5 May 2023 22:53:14 +0800 Subject: [PATCH 3/8] rename --- .../clickhouse/ClickHouseHelperClient.java | 222 ++++++++++++++++++ .../config/ClickHouseConstants.java | 4 +- .../config/ClickhouseConfig.java | 2 +- .../sink/ClickHouseSinkConnector.java | 4 +- .../sink/ClickHouseSinkTask.java | 12 +- .../source/ClickHouseSourceConnector.java | 4 +- .../source/ClickHouseSourceTask.java | 4 +- .../sink/ClickHouseSinkTaskTest.java | 8 +- 8 files changed, 244 insertions(+), 16 deletions(-) create mode 100644 connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/ClickHouseHelperClient.java rename connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/{connector => }/config/ClickHouseConstants.java (95%) rename connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/{connector => }/config/ClickhouseConfig.java (98%) rename connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/{connector => }/sink/ClickHouseSinkConnector.java (87%) rename connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/{connector => }/sink/ClickHouseSinkTask.java (96%) rename connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/{connector => }/source/ClickHouseSourceConnector.java (92%) rename connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/{connector => }/source/ClickHouseSourceTask.java (95%) rename connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/{connector => }/sink/ClickHouseSinkTaskTest.java (90%) diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/ClickHouseHelperClient.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/ClickHouseHelperClient.java new file mode 100644 index 00000000..efb496e2 --- /dev/null +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/ClickHouseHelperClient.java @@ -0,0 +1,222 @@ +package org.apache.rocketmq.connect.clickhouse; + +import com.clickhouse.client.ClickHouseClient; +import com.clickhouse.client.ClickHouseException; +import com.clickhouse.client.ClickHouseNode; +import com.clickhouse.client.ClickHouseProtocol; +import com.clickhouse.client.ClickHouseResponse; +import com.sun.org.slf4j.internal.Logger; +import com.sun.org.slf4j.internal.LoggerFactory; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ClickHouseHelperClient { + + private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseHelperClient.class); + + private String hostname = null; + private int port = -1; + private String username = "default"; + private String database = "default"; + private String password = ""; + private boolean sslEnabled = false; + private int timeout = ClickHouseSinkConfig.timeoutSecondsDefault * ClickHouseSinkConfig.MILLI_IN_A_SEC; + private ClickHouseNode server = null; + private int retry; + public ClickHouseHelperClient(ClickHouseClientBuilder builder) { + this.hostname = builder.hostname; + this.port = builder.port; + this.username = builder.username; + this.password = builder.password; + this.database = builder.database; + this.sslEnabled = builder.sslEnabled; + this.timeout = builder.timeout; + this.retry = builder.retry; + this.server = create(); + } + + private ClickHouseNode create() { + String protocol = "http"; + if (this.sslEnabled) + protocol += "s"; + + String url = String.format("%s://%s:%d/%s", protocol, hostname, port, database); + + LOGGER.info("url: " + url); + + if (username != null && password != null) { + LOGGER.info(String.format("Adding username [%s] password [%s] ", username, Mask.passwordMask(password))); + Map options = new HashMap<>(); + options.put("user", username); + options.put("password", password); + server = ClickHouseNode.of(url, options); + } else { + server = ClickHouseNode.of(url); + } + return server; + } + + public boolean ping() { + ClickHouseClient clientPing = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP); + LOGGER.debug(String.format("server [%s] , timeout [%d]", server, timeout)); + int retryCount = 0; + + while (retryCount < retry) { + if (clientPing.ping(server, timeout)) { + LOGGER.info("Ping is successful."); + clientPing.close(); + return true; + } + retryCount++; + LOGGER.warn(String.format("Ping retry %d out of %d", retryCount, retry)); + } + LOGGER.error("unable to ping to clickhouse server. "); + clientPing.close(); + return false; + } + + public ClickHouseNode getServer() { + return this.server; + } + + public ClickHouseResponse query(String query) { + return query(query, null); + } + + public ClickHouseResponse query(String query, ClickHouseFormat clickHouseFormat) { + int retryCount = 0; + ClickHouseException ce = null; + while (retryCount < retry) { + try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP); + ClickHouseResponse response = client.connect(server) // or client.connect(endpoints) + // you'll have to parse response manually if using a different format + + .format(clickHouseFormat) + .query(query) + .executeAndWait()) { + return response; + } catch (ClickHouseException e) { + retryCount++; + LOGGER.warn(String.format("Query retry %d out of %d", retryCount, retry), e); + ce = e; + } + } + throw new RuntimeException(ce); + } + + + public List showTables() { + List tablesNames = new ArrayList<>(); + try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP); + ClickHouseResponse response = client.connect(server) // or client.connect(endpoints) + // you'll have to parse response manually if using a different format + + .query("SHOW TABLES") + .executeAndWait()) { + for (ClickHouseRecord r : response.records()) { + ClickHouseValue v = r.getValue(0); + String tableName = v.asString(); + tablesNames.add(tableName); + } + + } catch (ClickHouseException e) { + LOGGER.error("Failed in show tables", e); + } + return tablesNames; + } + + public Table describeTable(String tableName) { + if (tableName.startsWith(".inner")) + return null; + String describeQuery = String.format("DESCRIBE TABLE `%s`.`%s`", this.database, tableName); + + try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP); + ClickHouseResponse response = client.connect(server) // or client.connect(endpoints) + .query(describeQuery) + .executeAndWait()) { + Table table = new Table(tableName); + for (ClickHouseRecord r : response.records()) { + ClickHouseValue v = r.getValue(0); + String value = v.asString(); + String[] cols = value.split("\t"); + if (cols.length > 2) { + String defaultKind = cols[2]; + if ("ALIAS".equals(defaultKind) || "MATERIALIZED".equals(defaultKind)) { + // Only insert into "real" columns + continue; + } + } + String name = cols[0]; + String type = cols[1]; + table.addColumn(Column.extractColumn(name, type, false)); + } + return table; + } catch (ClickHouseException e) { + LOGGER.error(String.format("Got exception when running %s", describeQuery), e); + return null; + } + + } + public List extractTablesMapping() { + List
tableList = new ArrayList<>(); + for (String tableName : showTables() ) { + Table table = describeTable(tableName); + if (table != null ) + tableList.add(table); + } + return tableList; + } + + public static class ClickHouseClientBuilder{ + private String hostname = null; + private int port = -1; + private String username = "default"; + private String database = "default"; + private String password = ""; + private boolean sslEnabled = false; + private int timeout = ClickHouseSinkConfig.timeoutSecondsDefault * ClickHouseSinkConfig.MILLI_IN_A_SEC; + private int retry = ClickHouseSinkConfig.retryCountDefault; + public ClickHouseClientBuilder(String hostname, int port) { + this.hostname = hostname; + this.port = port; + } + + + public ClickHouseClientBuilder setUsername(String username) { + this.username = username; + return this; + } + + public ClickHouseClientBuilder setPassword(String password) { + this.password = password; + return this; + } + + public ClickHouseClientBuilder setDatabase(String database) { + this.database = database; + return this; + } + + public ClickHouseClientBuilder sslEnable(boolean sslEnabled) { + this.sslEnabled = sslEnabled; + return this; + } + + public ClickHouseClientBuilder setTimeout(int timeout) { + this.timeout = timeout; + return this; + } + + public ClickHouseClientBuilder setRetry(int retry) { + this.retry = retry; + return this; + } + + public ClickHouseHelperClient build(){ + return new ClickHouseHelperClient(this); + } + + } +} diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/config/ClickHouseConstants.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseConstants.java similarity index 95% rename from connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/config/ClickHouseConstants.java rename to connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseConstants.java index bcab5f9a..3ec51f8f 100644 --- a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/config/ClickHouseConstants.java +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseConstants.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.connect.clickhouse.connector.config; +package org.apache.rocketmq.connect.clickhouse.config; public class ClickHouseConstants { public static final String CLICKHOUSE_HOST = "clickhousehost"; @@ -29,4 +29,6 @@ public class ClickHouseConstants { public static final String CLICKHOUSE_PASSWORD = "password"; public static final String CLICKHOUSE_ACCESSTOKEN = "accesstoken"; + + } diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/config/ClickhouseConfig.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickhouseConfig.java similarity index 98% rename from connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/config/ClickhouseConfig.java rename to connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickhouseConfig.java index f65a9fd3..aa4728ec 100644 --- a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/config/ClickhouseConfig.java +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickhouseConfig.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.connect.clickhouse.connector.config; +package org.apache.rocketmq.connect.clickhouse.config; import io.openmessaging.KeyValue; import java.lang.reflect.Method; diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/sink/ClickHouseSinkConnector.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkConnector.java similarity index 87% rename from connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/sink/ClickHouseSinkConnector.java rename to connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkConnector.java index 44587098..a5f34b38 100644 --- a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/sink/ClickHouseSinkConnector.java +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkConnector.java @@ -1,4 +1,4 @@ -package org.apache.rocketmq.connect.clickhouse.connector.sink; +package org.apache.rocketmq.connect.clickhouse.sink; import io.openmessaging.KeyValue; @@ -6,7 +6,7 @@ import io.openmessaging.connector.api.component.task.sink.SinkConnector; import java.util.ArrayList; import java.util.List; -import org.apache.rocketmq.connect.clickhouse.connector.config.ClickhouseConfig; +import org.apache.rocketmq.connect.clickhouse.config.ClickhouseConfig; public class ClickHouseSinkConnector extends SinkConnector { diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/sink/ClickHouseSinkTask.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTask.java similarity index 96% rename from connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/sink/ClickHouseSinkTask.java rename to connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTask.java index da6f21cf..905fdaf9 100644 --- a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/sink/ClickHouseSinkTask.java +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTask.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.connect.clickhouse.connector.sink; +package org.apache.rocketmq.connect.clickhouse.sink; import com.alibaba.fastjson.JSONObject; import com.clickhouse.client.ClickHouseClient; @@ -32,7 +32,6 @@ import com.clickhouse.data.ClickHouseDataStreamFactory; import com.clickhouse.data.ClickHouseFormat; import com.clickhouse.data.ClickHousePipedOutputStream; -import com.clickhouse.data.ClickHouseRecord; import com.clickhouse.data.format.BinaryStreamUtils; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; @@ -40,6 +39,8 @@ import io.openmessaging.connector.api.component.task.sink.SinkTask; import io.openmessaging.connector.api.data.ConnectRecord; import io.openmessaging.connector.api.data.Field; +import io.openmessaging.connector.api.data.Schema; +import io.openmessaging.connector.api.data.SchemaBuilder; import io.openmessaging.connector.api.data.Struct; import io.openmessaging.connector.api.errors.ConnectException; import java.nio.charset.StandardCharsets; @@ -48,7 +49,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import org.apache.rocketmq.connect.clickhouse.connector.config.ClickhouseConfig; +import org.apache.rocketmq.connect.clickhouse.config.ClickhouseConfig; public class ClickHouseSinkTask extends SinkTask { @@ -72,6 +73,7 @@ public class ClickHouseSinkTask extends SinkTask { for (ConnectRecord record : sinkRecords) { + String table = record.getSchema().getName(); ClickHouseRequest.Mutation request = client.connect(server) .write() @@ -94,8 +96,10 @@ public class ClickHouseSinkTask extends SinkTask { JSONObject object = new JSONObject(); for (Field field : fields) { object.put(field.getName(), structData.get(field)); - data.put(field.getName(), structData.get(field).toString()); + data.put(field.getName(), structData.get(field)); } + Schema NESTED_SCHEMA = SchemaBuilder.struct().build(); + String gsonString = gson.toJson(data,gsonType); // BinaryStreamUtils.writeBytes(stream, object.toJSONString().getBytes(StandardCharsets.UTF_8)); BinaryStreamUtils.writeBytes(stream, gsonString.getBytes(StandardCharsets.UTF_8)); diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/source/ClickHouseSourceConnector.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceConnector.java similarity index 92% rename from connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/source/ClickHouseSourceConnector.java rename to connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceConnector.java index 3e4c1fb8..f97373e8 100644 --- a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/source/ClickHouseSourceConnector.java +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceConnector.java @@ -15,14 +15,14 @@ * limitations under the License. */ -package org.apache.rocketmq.connect.clickhouse.connector.source; +package org.apache.rocketmq.connect.clickhouse.source; import io.openmessaging.KeyValue; import io.openmessaging.connector.api.component.task.Task; import io.openmessaging.connector.api.component.task.source.SourceConnector; import java.util.ArrayList; import java.util.List; -import org.apache.rocketmq.connect.clickhouse.connector.config.ClickhouseConfig; +import org.apache.rocketmq.connect.clickhouse.config.ClickhouseConfig; public class ClickHouseSourceConnector extends SourceConnector { diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/source/ClickHouseSourceTask.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTask.java similarity index 95% rename from connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/source/ClickHouseSourceTask.java rename to connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTask.java index 0451e69a..f74ad75b 100644 --- a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/connector/source/ClickHouseSourceTask.java +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTask.java @@ -1,4 +1,4 @@ -package org.apache.rocketmq.connect.clickhouse.connector.source; +package org.apache.rocketmq.connect.clickhouse.source; import com.clickhouse.client.ClickHouseClient; import com.clickhouse.client.ClickHouseCredentials; @@ -13,7 +13,7 @@ import io.openmessaging.connector.api.data.ConnectRecord; import java.util.List; import java.util.concurrent.ExecutionException; -import org.apache.rocketmq.connect.clickhouse.connector.config.ClickhouseConfig; +import org.apache.rocketmq.connect.clickhouse.config.ClickhouseConfig; public class ClickHouseSourceTask extends SourceTask { diff --git a/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/connector/sink/ClickHouseSinkTaskTest.java b/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTaskTest.java similarity index 90% rename from connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/connector/sink/ClickHouseSinkTaskTest.java rename to connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTaskTest.java index 786490f6..18f3c620 100644 --- a/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/connector/sink/ClickHouseSinkTaskTest.java +++ b/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTaskTest.java @@ -1,4 +1,4 @@ -package org.apache.rocketmq.connect.clickhouse.connector.sink; +package org.apache.rocketmq.connect.clickhouse.sink; import io.openmessaging.KeyValue; import io.openmessaging.connector.api.data.ConnectRecord; @@ -12,7 +12,7 @@ import java.util.HashMap; import java.util.List; import java.util.concurrent.ConcurrentHashMap; -import org.apache.rocketmq.connect.clickhouse.connector.config.ClickHouseConstants; +import org.apache.rocketmq.connect.clickhouse.config.ClickHouseConstants; class ClickHouseSinkTaskTest { @@ -30,11 +30,11 @@ public static void main(String[] args) { // build schema Schema schema = SchemaBuilder.struct() .name("tableName") - .field("c1",SchemaBuilder.int32().build()) + .field("c1",SchemaBuilder.string().build()) .field("c2", SchemaBuilder.string().build()) .build(); // build record - int param0 = 1001; + String param0 = "1001"; Struct struct= new Struct(schema); struct.put("c1",param0); struct.put("c2",String.format("test-data-%s", param0)); From 1957ce897fe71fff6ce027369dbb524eca7f9cb2 Mon Sep 17 00:00:00 2001 From: joeCarf Date: Sat, 6 May 2023 21:10:57 +0800 Subject: [PATCH 4/8] finish source connector --- .../rocketmq-connect-clickhouse/README.md | 58 +++ .../rocketmq-connect-clickhouse/pom.xml | 122 +++++++ .../clickhouse/ClickHouseHelperClient.java | 335 +++++++++--------- ...eConfig.java => ClickHouseBaseConfig.java} | 18 +- .../config/ClickHouseConstants.java | 19 +- .../config/ClickHouseSourceConfig.java | 41 +++ .../sink/ClickHouseSinkConnector.java | 4 +- .../clickhouse/sink/ClickHouseSinkTask.java | 84 ++--- .../source/ClickHouseSourceConnector.java | 8 +- .../source/ClickHouseSourceTask.java | 196 +++++++--- .../source/ClickHouseSourceTaskTest.java | 41 +++ distribution/conf/connect-standalone.conf | 2 +- 12 files changed, 658 insertions(+), 270 deletions(-) create mode 100644 connectors/rocketmq-connect-clickhouse/README.md rename connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/{ClickhouseConfig.java => ClickHouseBaseConfig.java} (92%) create mode 100644 connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseSourceConfig.java create mode 100644 connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTaskTest.java diff --git a/connectors/rocketmq-connect-clickhouse/README.md b/connectors/rocketmq-connect-clickhouse/README.md new file mode 100644 index 00000000..ba2f0143 --- /dev/null +++ b/connectors/rocketmq-connect-clickhouse/README.md @@ -0,0 +1,58 @@ +##### ElasticsearchSourceConnector fully-qualified name +org.apache.rocketmq.connect.elasticsearch.connector.ElasticsearchSourceConnector + +**elasticsearch-source-connector** start + +``` +POST http://${runtime-ip}:${runtime-port}/connectors/elasticsearchSourceConnector +{ + "connector.class":"org.apache.rocketmq.connect.clickhouse.source.ClickHouseSourceConnector", + "clickhousehost":"120.48.26.195", + "clickhouseport":8123, + "clickhousedatabase":"default", + "username":"default", + "password":"123456", + "table":"tableName", + "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", + "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" +} +``` + +``` +POST http://localhost:8082/connectors/clickhouseSourceConnector +{ + "connector.class":"org.apache.rocketmq.connect.clickhouse.source.ClickHouseSourceConnector", + "clickhousehost":"120.48.26.195", + "clickhouseport":8123, + "database":"default", + "username":"default", + "password":"123456", + "table":"tableName", + "topic":"topic", + "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", + "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" +} +``` + +**elasticsearch-sink-connector** start + +``` +POST http://${runtime-ip}:${runtime-port}/connectors/elasticsearchSinkConnector +{ + "connector.class":"org.apache.rocketmq.connect.elasticsearch.connector.ElasticsearchSinkConnector", + "elasticsearchHost":"localhost", + "elasticsearchPort":9200, + "max.tasks":1, + "connect.topicnames":"esTopic", + "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", + "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" +} +``` + +##### parameter configuration + +parameter | effect | required |default +---|-----------------------------------------------------------------------------------------------------------------------------------------------------------|----------| --- +elasticsearchHost | The Host of the Elasticsearch server | yes | null +elasticsearchPort | The Port of the Elasticsearch server | yes | null +index| The info of the index | yes | null \ No newline at end of file diff --git a/connectors/rocketmq-connect-clickhouse/pom.xml b/connectors/rocketmq-connect-clickhouse/pom.xml index 2f3d554b..d280ca08 100644 --- a/connectors/rocketmq-connect-clickhouse/pom.xml +++ b/connectors/rocketmq-connect-clickhouse/pom.xml @@ -33,6 +33,123 @@ https://issues.apache.org/jira/browse/RocketMQ + + + + org.codehaus.mojo + versions-maven-plugin + 2.3 + + + org.codehaus.mojo + clirr-maven-plugin + 2.7 + + + maven-compiler-plugin + 3.6.1 + + ${maven.compiler.source} + ${maven.compiler.target} + ${maven.compiler.source} + true + true + + + + maven-surefire-plugin + 2.19.1 + + -Xms512m -Xmx1024m + always + + **/*Test.java + + + + + maven-site-plugin + 3.6 + + en_US + UTF-8 + UTF-8 + + + + maven-source-plugin + 3.0.1 + + + attach-sources + + jar + + + + + + maven-javadoc-plugin + 2.10.4 + + UTF-8 + en_US + io.openmessaging.internal + + + + aggregate + + aggregate + + site + + + + + maven-resources-plugin + 3.0.2 + + ${project.build.sourceEncoding} + + + + org.codehaus.mojo + findbugs-maven-plugin + 3.0.4 + + + org.apache.rat + apache-rat-plugin + 0.12 + + + README.md + README-CN.md + + + + + maven-assembly-plugin + 3.0.0 + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + 8 8 @@ -70,6 +187,11 @@ RELEASE test + + org.slf4j + slf4j-api + 1.7.7 + com.google.code.gson gson diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/ClickHouseHelperClient.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/ClickHouseHelperClient.java index efb496e2..f6068acf 100644 --- a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/ClickHouseHelperClient.java +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/ClickHouseHelperClient.java @@ -1,61 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.connect.clickhouse; import com.clickhouse.client.ClickHouseClient; -import com.clickhouse.client.ClickHouseException; +import com.clickhouse.client.ClickHouseCredentials; import com.clickhouse.client.ClickHouseNode; import com.clickhouse.client.ClickHouseProtocol; import com.clickhouse.client.ClickHouseResponse; -import com.sun.org.slf4j.internal.Logger; -import com.sun.org.slf4j.internal.LoggerFactory; +import com.clickhouse.data.ClickHouseFormat; +import com.clickhouse.data.ClickHouseRecord; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; +import org.apache.rocketmq.connect.clickhouse.config.ClickHouseConstants; +import org.apache.rocketmq.connect.clickhouse.config.ClickHouseBaseConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ClickHouseHelperClient { private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseHelperClient.class); - private String hostname = null; - private int port = -1; - private String username = "default"; - private String database = "default"; - private String password = ""; - private boolean sslEnabled = false; - private int timeout = ClickHouseSinkConfig.timeoutSecondsDefault * ClickHouseSinkConfig.MILLI_IN_A_SEC; + private ClickHouseBaseConfig config; + private int timeout = ClickHouseConstants.timeoutSecondsDefault * ClickHouseConstants.MILLI_IN_A_SEC; private ClickHouseNode server = null; - private int retry; - public ClickHouseHelperClient(ClickHouseClientBuilder builder) { - this.hostname = builder.hostname; - this.port = builder.port; - this.username = builder.username; - this.password = builder.password; - this.database = builder.database; - this.sslEnabled = builder.sslEnabled; - this.timeout = builder.timeout; - this.retry = builder.retry; - this.server = create(); - } + private int retry = ClickHouseConstants.retryCountDefault; - private ClickHouseNode create() { - String protocol = "http"; - if (this.sslEnabled) - protocol += "s"; + public ClickHouseHelperClient(ClickHouseBaseConfig config) { + this.config = config; + this.server = create(config); + } - String url = String.format("%s://%s:%d/%s", protocol, hostname, port, database); + private ClickHouseNode create(ClickHouseBaseConfig config) { + this.server = ClickHouseNode.builder() + .host(config.getClickHouseHost()) + .port(ClickHouseProtocol.HTTP, config.getClickHousePort()) + .database(config.getDatabase()).credentials(getCredentials(config)) + .build(); - LOGGER.info("url: " + url); + return this.server; + } - if (username != null && password != null) { - LOGGER.info(String.format("Adding username [%s] password [%s] ", username, Mask.passwordMask(password))); - Map options = new HashMap<>(); - options.put("user", username); - options.put("password", password); - server = ClickHouseNode.of(url, options); - } else { - server = ClickHouseNode.of(url); + private ClickHouseCredentials getCredentials(ClickHouseBaseConfig config) { + if (config.getUserName() != null && config.getPassWord() != null) { + return ClickHouseCredentials.fromUserAndPassword(config.getUserName(), config.getPassWord()); + } + if (config.getAccessToken() != null) { + return ClickHouseCredentials.fromAccessToken(config.getAccessToken()); } - return server; + throw new RuntimeException("Credentials cannot be empty!"); + } public boolean ping() { @@ -65,7 +73,6 @@ public boolean ping() { while (retryCount < retry) { if (clientPing.ping(server, timeout)) { - LOGGER.info("Ping is successful."); clientPing.close(); return true; } @@ -81,142 +88,146 @@ public ClickHouseNode getServer() { return this.server; } - public ClickHouseResponse query(String query) { - return query(query, null); + public List query(String query) { + return query(query, ClickHouseFormat.RowBinaryWithNamesAndTypes); } - public ClickHouseResponse query(String query, ClickHouseFormat clickHouseFormat) { + public List query(String query, ClickHouseFormat clickHouseFormat) { int retryCount = 0; - ClickHouseException ce = null; + Exception ce = null; while (retryCount < retry) { try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP); - ClickHouseResponse response = client.connect(server) // or client.connect(endpoints) - // you'll have to parse response manually if using a different format - + ClickHouseResponse response = client.read(server) .format(clickHouseFormat) .query(query) - .executeAndWait()) { - return response; - } catch (ClickHouseException e) { + .execute().get()) { + + List recordList = new ArrayList<>(); + for (ClickHouseRecord r : response.records()) { + recordList.add(r); + } + return recordList; + + } catch (Exception e) { retryCount++; LOGGER.warn(String.format("Query retry %d out of %d", retryCount, retry), e); ce = e; } } throw new RuntimeException(ce); - } - - - public List showTables() { - List tablesNames = new ArrayList<>(); - try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP); - ClickHouseResponse response = client.connect(server) // or client.connect(endpoints) - // you'll have to parse response manually if using a different format - .query("SHOW TABLES") - .executeAndWait()) { - for (ClickHouseRecord r : response.records()) { - ClickHouseValue v = r.getValue(0); - String tableName = v.asString(); - tablesNames.add(tableName); - } - - } catch (ClickHouseException e) { - LOGGER.error("Failed in show tables", e); - } - return tablesNames; } - public Table describeTable(String tableName) { - if (tableName.startsWith(".inner")) - return null; - String describeQuery = String.format("DESCRIBE TABLE `%s`.`%s`", this.database, tableName); - - try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP); - ClickHouseResponse response = client.connect(server) // or client.connect(endpoints) - .query(describeQuery) - .executeAndWait()) { - Table table = new Table(tableName); - for (ClickHouseRecord r : response.records()) { - ClickHouseValue v = r.getValue(0); - String value = v.asString(); - String[] cols = value.split("\t"); - if (cols.length > 2) { - String defaultKind = cols[2]; - if ("ALIAS".equals(defaultKind) || "MATERIALIZED".equals(defaultKind)) { - // Only insert into "real" columns - continue; - } - } - String name = cols[0]; - String type = cols[1]; - table.addColumn(Column.extractColumn(name, type, false)); - } - return table; - } catch (ClickHouseException e) { - LOGGER.error(String.format("Got exception when running %s", describeQuery), e); - return null; - } - - } - public List
extractTablesMapping() { - List
tableList = new ArrayList<>(); - for (String tableName : showTables() ) { - Table table = describeTable(tableName); - if (table != null ) - tableList.add(table); - } - return tableList; - } - - public static class ClickHouseClientBuilder{ - private String hostname = null; - private int port = -1; - private String username = "default"; - private String database = "default"; - private String password = ""; - private boolean sslEnabled = false; - private int timeout = ClickHouseSinkConfig.timeoutSecondsDefault * ClickHouseSinkConfig.MILLI_IN_A_SEC; - private int retry = ClickHouseSinkConfig.retryCountDefault; - public ClickHouseClientBuilder(String hostname, int port) { - this.hostname = hostname; - this.port = port; - } - - - public ClickHouseClientBuilder setUsername(String username) { - this.username = username; - return this; - } - - public ClickHouseClientBuilder setPassword(String password) { - this.password = password; - return this; - } - - public ClickHouseClientBuilder setDatabase(String database) { - this.database = database; - return this; - } - - public ClickHouseClientBuilder sslEnable(boolean sslEnabled) { - this.sslEnabled = sslEnabled; - return this; - } - - public ClickHouseClientBuilder setTimeout(int timeout) { - this.timeout = timeout; - return this; - } - - public ClickHouseClientBuilder setRetry(int retry) { - this.retry = retry; - return this; - } - - public ClickHouseHelperClient build(){ - return new ClickHouseHelperClient(this); - } - - } +// public List showTables() { +// List tablesNames = new ArrayList<>(); +// try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP); +// ClickHouseResponse response = client.connect(server) // or client.connect(endpoints) +// // you'll have to parse response manually if using a different format +// +// .query("SHOW TABLES") +// .executeAndWait()) { +// for (ClickHouseRecord r : response.records()) { +// ClickHouseValue v = r.getValue(0); +// String tableName = v.asString(); +// tablesNames.add(tableName); +// } +// +// } catch (ClickHouseException e) { +// LOGGER.error("Failed in show tables", e); +// } +// return tablesNames; +// } + +// public Table describeTable(String tableName) { +// if (tableName.startsWith(".inner")) +// return null; +// String describeQuery = String.format("DESCRIBE TABLE `%s`.`%s`", this.database, tableName); +// +// try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP); +// ClickHouseResponse response = client.connect(server) // or client.connect(endpoints) +// .query(describeQuery) +// .executeAndWait()) { +// Table table = new Table(tableName); +// for (ClickHouseRecord r : response.records()) { +// ClickHouseValue v = r.getValue(0); +// String value = v.asString(); +// String[] cols = value.split("\t"); +// if (cols.length > 2) { +// String defaultKind = cols[2]; +// if ("ALIAS".equals(defaultKind) || "MATERIALIZED".equals(defaultKind)) { +// // Only insert into "real" columns +// continue; +// } +// } +// String name = cols[0]; +// String type = cols[1]; +// table.addColumn(Column.extractColumn(name, type, false)); +// } +// return table; +// } catch (ClickHouseException e) { +// LOGGER.error(String.format("Got exception when running %s", describeQuery), e); +// return null; +// } +// +// } +// public List
extractTablesMapping() { +// List
tableList = new ArrayList<>(); +// for (String tableName : showTables() ) { +// Table table = describeTable(tableName); +// if (table != null ) +// tableList.add(table); +// } +// return tableList; +// } + +// public static class ClickHouseClientBuilder{ +// private String hostname = null; +// private int port = -1; +// private String username = "default"; +// private String database = "default"; +// private String password = ""; +// private boolean sslEnabled = false; +// private int timeout = ClickHouseSinkConfig.timeoutSecondsDefault * ClickHouseSinkConfig.MILLI_IN_A_SEC; +// private int retry = ClickHouseSinkConfig.retryCountDefault; +// public ClickHouseClientBuilder(String hostname, int port) { +// this.hostname = hostname; +// this.port = port; +// } +// +// +// public ClickHouseClientBuilder setUsername(String username) { +// this.username = username; +// return this; +// } +// +// public ClickHouseClientBuilder setPassword(String password) { +// this.password = password; +// return this; +// } +// +// public ClickHouseClientBuilder setDatabase(String database) { +// this.database = database; +// return this; +// } +// +// public ClickHouseClientBuilder sslEnable(boolean sslEnabled) { +// this.sslEnabled = sslEnabled; +// return this; +// } +// +// public ClickHouseClientBuilder setTimeout(int timeout) { +// this.timeout = timeout; +// return this; +// } +// +// public ClickHouseClientBuilder setRetry(int retry) { +// this.retry = retry; +// return this; +// } +// +// public ClickHouseHelperClient build(){ +// return new ClickHouseHelperClient(this); +// } +// +// } } diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickhouseConfig.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseBaseConfig.java similarity index 92% rename from connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickhouseConfig.java rename to connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseBaseConfig.java index aa4728ec..32b09b82 100644 --- a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickhouseConfig.java +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseBaseConfig.java @@ -22,7 +22,7 @@ import java.util.HashSet; import java.util.Set; -public class ClickhouseConfig { +public class ClickHouseBaseConfig { public static final Set REQUEST_CONFIG = new HashSet() { { @@ -33,7 +33,7 @@ public class ClickhouseConfig { private String clickHouseHost; - private String clickHousePort; + private Integer clickHousePort; private String database; @@ -43,6 +43,16 @@ public class ClickhouseConfig { private String accessToken; + private String topic; + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + public String getClickHouseHost() { return clickHouseHost; } @@ -51,11 +61,11 @@ public void setClickHouseHost(String clickHouseHost) { this.clickHouseHost = clickHouseHost; } - public String getClickHousePort() { + public Integer getClickHousePort() { return clickHousePort; } - public void setClickHousePort(String clickHousePort) { + public void setClickHousePort(Integer clickHousePort) { this.clickHousePort = clickHousePort; } diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseConstants.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseConstants.java index 3ec51f8f..e9c026d1 100644 --- a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseConstants.java +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseConstants.java @@ -22,7 +22,7 @@ public class ClickHouseConstants { public static final String CLICKHOUSE_PORT = "clickhouseport"; - public static final String CLICKHOUSE_DATABASE = "clickhousedatabase"; + public static final String CLICKHOUSE_DATABASE = "database"; public static final String CLICKHOUSE_USERNAME = "username"; @@ -30,5 +30,20 @@ public class ClickHouseConstants { public static final String CLICKHOUSE_ACCESSTOKEN = "accesstoken"; - + public static final String CLICKHOUSE_TABLE = "table"; + + public static final String TOPIC = "topic"; + + public static final String CLICKHOUSE_OFFSET = "OFFSET"; + + public static final String CLICKHOUSE_PARTITION = "CLICKHOUSE_PARTITION"; + + public static final Integer timeoutSecondsDefault = 30; + + public static final int MILLI_IN_A_SEC = 1000; + + public static final Integer retryCountDefault = 3; + + public static final int MAX_NUMBER_SEND_CONNECT_RECORD_EACH_TIME = 2000; + } diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseSourceConfig.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseSourceConfig.java new file mode 100644 index 00000000..a3120d03 --- /dev/null +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseSourceConfig.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.clickhouse.config; + +import java.util.HashSet; +import java.util.Set; + +public class ClickHouseSourceConfig extends ClickHouseBaseConfig { + + public static final Set REQUEST_CONFIG = new HashSet() { + { + add(ClickHouseConstants.CLICKHOUSE_HOST); + add(ClickHouseConstants.CLICKHOUSE_PORT); + add(ClickHouseConstants.CLICKHOUSE_TABLE); + } + }; + private String table; + + public String getTable() { + return table; + } + + public void setTable(String table) { + this.table = table; + } +} diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkConnector.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkConnector.java index a5f34b38..f2585078 100644 --- a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkConnector.java +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkConnector.java @@ -6,7 +6,7 @@ import io.openmessaging.connector.api.component.task.sink.SinkConnector; import java.util.ArrayList; import java.util.List; -import org.apache.rocketmq.connect.clickhouse.config.ClickhouseConfig; +import org.apache.rocketmq.connect.clickhouse.config.ClickHouseBaseConfig; public class ClickHouseSinkConnector extends SinkConnector { @@ -26,7 +26,7 @@ public class ClickHouseSinkConnector extends SinkConnector { @Override public void start(KeyValue value) { - for (String requestKey : ClickhouseConfig.REQUEST_CONFIG) { + for (String requestKey : ClickHouseBaseConfig.REQUEST_CONFIG) { if (!value.containsKey(requestKey)) { throw new RuntimeException("Request config key: " + requestKey); } diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTask.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTask.java index 905fdaf9..4ad8b23f 100644 --- a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTask.java +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTask.java @@ -21,7 +21,6 @@ import com.clickhouse.client.ClickHouseClient; import com.clickhouse.client.ClickHouseConfig; import com.clickhouse.client.ClickHouseCredentials; -import com.clickhouse.client.ClickHouseException; import com.clickhouse.client.ClickHouseNode; import com.clickhouse.client.ClickHouseProtocol; import com.clickhouse.client.ClickHouseRequest; @@ -49,11 +48,11 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import org.apache.rocketmq.connect.clickhouse.config.ClickhouseConfig; +import org.apache.rocketmq.connect.clickhouse.config.ClickHouseBaseConfig; public class ClickHouseSinkTask extends SinkTask { - public ClickhouseConfig config; + public ClickHouseBaseConfig config; private ClickHouseNode server; @@ -62,8 +61,6 @@ public class ClickHouseSinkTask extends SinkTask { return; } - - try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol())) { boolean pingOK = client.ping(server, 30000); @@ -73,7 +70,6 @@ public class ClickHouseSinkTask extends SinkTask { for (ConnectRecord record : sinkRecords) { - String table = record.getSchema().getName(); ClickHouseRequest.Mutation request = client.connect(server) .write() @@ -90,8 +86,8 @@ public class ClickHouseSinkTask extends SinkTask { final Struct structData = (Struct) record.getData(); Gson gson = new Gson(); Map data = new HashMap<>(); - java.lang.reflect.Type gsonType = new TypeToken(){}.getType(); - + java.lang.reflect.Type gsonType = new TypeToken() { + }.getType(); JSONObject object = new JSONObject(); for (Field field : fields) { @@ -100,7 +96,7 @@ public class ClickHouseSinkTask extends SinkTask { } Schema NESTED_SCHEMA = SchemaBuilder.struct().build(); - String gsonString = gson.toJson(data,gsonType); + String gsonString = gson.toJson(data, gsonType); // BinaryStreamUtils.writeBytes(stream, object.toJSONString().getBytes(StandardCharsets.UTF_8)); BinaryStreamUtils.writeBytes(stream, gsonString.getBytes(StandardCharsets.UTF_8)); try (ClickHouseResponse response = future.get()) { @@ -134,23 +130,12 @@ public class ClickHouseSinkTask extends SinkTask { // ClickHouseResponseSummary summary = response.getSummary(); //// return summary.getWrittenRows(); // } - } catch(InterruptedException e){ - Thread.currentThread().interrupt(); - try { - throw ClickHouseException.forCancellation(e, server); - } catch (ClickHouseException ex) { - throw new RuntimeException(ex); - } - } catch(Exception e){ - try { - throw ClickHouseException.of(e, server); - } catch (ClickHouseException ex) { - throw new RuntimeException(ex); - } - } - + } catch (Exception e) { + throw new RuntimeException(e); } + } + // static int query(ClickHouseNode server, String table) throws ClickHouseException { // try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol()); // ClickHouseResponse response = client.read(server) @@ -172,15 +157,15 @@ public class ClickHouseSinkTask extends SinkTask { // } // } - @Override public void start (KeyValue keyValue){ - this.config = new ClickhouseConfig(); - this.config.load(keyValue); + @Override public void start(KeyValue keyValue) { + this.config = new ClickHouseBaseConfig(); + this.config.load(keyValue); - this.server = ClickHouseNode.builder() - .host(config.getClickHouseHost()) - .port(ClickHouseProtocol.HTTP, Integer.valueOf(config.getClickHousePort())) - .database(config.getDatabase()).credentials(getCredentials(config)) - .build(); + this.server = ClickHouseNode.builder() + .host(config.getClickHouseHost()) + .port(ClickHouseProtocol.HTTP, config.getClickHousePort()) + .database(config.getDatabase()).credentials(getCredentials(config)) + .build(); // ClickHouseClient clientPing = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP); // boolean pingOK = clientPing.ping(server, 30000); @@ -193,35 +178,32 @@ public class ClickHouseSinkTask extends SinkTask { // e.printStackTrace(); // } - } + } - void dropAndCreateTable(ClickHouseNode server, String table) throws ClickHouseException { + void dropAndCreateTable(ClickHouseNode server, String table) throws Exception { try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol())) { ClickHouseRequest request = client.connect(server); // or use future chaining request.query("drop table if exists " + table).execute().get(); request.query("create table " + table + "(a String, b Nullable(String)) engine=MergeTree() order by a") .execute().get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw ClickHouseException.forCancellation(e, server); - } catch (ExecutionException e) { - throw ClickHouseException.of(e, server); + } catch (Exception e) { + throw new RuntimeException(e); } } - private ClickHouseCredentials getCredentials (ClickhouseConfig config){ - if (config.getUserName() != null && config.getPassWord() != null) { - return ClickHouseCredentials.fromUserAndPassword(config.getUserName(), config.getPassWord()); - } - if (config.getAccessToken() != null) { - return ClickHouseCredentials.fromAccessToken(config.getAccessToken()); - } - throw new RuntimeException("Credentials cannot be empty!"); - + private ClickHouseCredentials getCredentials(ClickHouseBaseConfig config) { + if (config.getUserName() != null && config.getPassWord() != null) { + return ClickHouseCredentials.fromUserAndPassword(config.getUserName(), config.getPassWord()); } - - @Override public void stop () { - this.server = null; + if (config.getAccessToken() != null) { + return ClickHouseCredentials.fromAccessToken(config.getAccessToken()); } + throw new RuntimeException("Credentials cannot be empty!"); + + } + + @Override public void stop() { + this.server = null; } +} diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceConnector.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceConnector.java index f97373e8..589b8159 100644 --- a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceConnector.java +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceConnector.java @@ -22,13 +22,15 @@ import io.openmessaging.connector.api.component.task.source.SourceConnector; import java.util.ArrayList; import java.util.List; -import org.apache.rocketmq.connect.clickhouse.config.ClickhouseConfig; +import org.apache.rocketmq.connect.clickhouse.config.ClickHouseBaseConfig; +import org.apache.rocketmq.connect.clickhouse.config.ClickHouseSourceConfig; public class ClickHouseSourceConnector extends SourceConnector { private KeyValue keyValue; - private ClickhouseConfig config; + private ClickHouseBaseConfig config; + @Override public List taskConfigs(int maxTasks) { List configs = new ArrayList<>(); for (int i = 0; i < maxTasks; i++) { @@ -43,7 +45,7 @@ public class ClickHouseSourceConnector extends SourceConnector { @Override public void start(KeyValue config) { - for (String requestKey : ClickhouseConfig.REQUEST_CONFIG) { + for (String requestKey : ClickHouseSourceConfig.REQUEST_CONFIG) { if (!config.containsKey(requestKey)) { throw new RuntimeException("Request config key: " + requestKey); } diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTask.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTask.java index f74ad75b..9619285d 100644 --- a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTask.java +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTask.java @@ -1,74 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.connect.clickhouse.source; -import com.clickhouse.client.ClickHouseClient; -import com.clickhouse.client.ClickHouseCredentials; -import com.clickhouse.client.ClickHouseException; -import com.clickhouse.client.ClickHouseNode; -import com.clickhouse.client.ClickHouseProtocol; -import com.clickhouse.client.ClickHouseResponse; -import com.clickhouse.data.ClickHouseFormat; +import com.clickhouse.data.ClickHouseColumn; +import com.clickhouse.data.ClickHouseDataType; import com.clickhouse.data.ClickHouseRecord; +import com.clickhouse.data.value.UnsignedByte; +import com.clickhouse.data.value.UnsignedInteger; +import com.clickhouse.data.value.UnsignedShort; import io.openmessaging.KeyValue; import io.openmessaging.connector.api.component.task.source.SourceTask; import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.data.Field; +import io.openmessaging.connector.api.data.RecordOffset; +import io.openmessaging.connector.api.data.RecordPartition; +import io.openmessaging.connector.api.data.Schema; +import io.openmessaging.connector.api.data.SchemaBuilder; +import io.openmessaging.connector.api.data.Struct; +import io.openmessaging.internal.DefaultKeyValue; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; import java.util.List; -import java.util.concurrent.ExecutionException; -import org.apache.rocketmq.connect.clickhouse.config.ClickhouseConfig; +import java.util.Map; +import org.apache.rocketmq.connect.clickhouse.ClickHouseHelperClient; +import org.apache.rocketmq.connect.clickhouse.config.ClickHouseConstants; +import org.apache.rocketmq.connect.clickhouse.config.ClickHouseSourceConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ClickHouseSourceTask extends SourceTask { - private ClickhouseConfig config; + private static final Logger log = LoggerFactory.getLogger(ClickHouseSourceTask.class); - private ClickHouseNode server; + private ClickHouseSourceConfig config; - @Override public List poll() throws InterruptedException { - return null; - } + private ClickHouseHelperClient helperClient; + + @Override public List poll() { + List res = new ArrayList<>(); + long offset = readRecordOffset(); + String sql = buildSql(config.getTable(), ClickHouseConstants.MAX_NUMBER_SEND_CONNECT_RECORD_EACH_TIME, offset); - int query(ClickHouseNode server, String table) throws ClickHouseException { - try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol()); - ClickHouseResponse response = client.read(server) - // prefer to use RowBinaryWithNamesAndTypes as it's fully supported - // see details at https://github.com/ClickHouse/clickhouse-java/issues/928 - .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) - .query("select * from " + table).execute().get()) { - int count = 0; - // or use stream API via response.stream() - for (ClickHouseRecord r : response.records()) { - count++; + try { + List recordList = helperClient.query(sql); + for (ClickHouseRecord clickHouseRecord : recordList) { + res.add(clickHouseRecord2ConnectRecord(clickHouseRecord, ++offset)); } - return count; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw ClickHouseException.forCancellation(e, server); - } catch (ExecutionException e) { - throw ClickHouseException.of(e, server); + } catch (Exception e) { + log.error(String.format("Fail to poll data from clickhouse! Table=%s offset=%d", config.getTable(), offset)); } + return res; } - @Override public void start(KeyValue keyValue) { - this.config.load(keyValue); + private long readRecordOffset() { + final RecordOffset positionInfo = this.sourceTaskContext.offsetStorageReader().readOffset(buildRecordPartition(config.getTable())); + if (positionInfo == null) { + return 0; + } + Object offset = positionInfo.getOffset().get(config.getTable() + "_" + ClickHouseConstants.CLICKHOUSE_OFFSET); + return offset == null ? 0 : Long.parseLong(offset.toString()); + } + + private String buildSql(String table, int maxNum, long offset) { + return String.format("SELECT * FROM `%s` LIMIT %d OFFSET %d;", table, maxNum, offset); + } + + private ConnectRecord clickHouseRecord2ConnectRecord(ClickHouseRecord clickHouseRecord, + long offset) throws NoSuchFieldException, IllegalAccessException { + Schema schema = SchemaBuilder.struct().name(config.getTable()).build(); + final List fields = buildFields(clickHouseRecord); + schema.setFields(fields); + final ConnectRecord connectRecord = new ConnectRecord(buildRecordPartition(config.getTable()), + buildRecordOffset(offset), + System.currentTimeMillis(), + schema, + this.buildPayLoad(fields, schema, clickHouseRecord)); + connectRecord.setExtensions(this.buildExtensions(clickHouseRecord)); + return connectRecord; + } + + private List buildFields( + ClickHouseRecord clickHouseRecord) throws NoSuchFieldException, IllegalAccessException { + java.lang.reflect.Field columns = clickHouseRecord.getClass().getDeclaredField("columns"); + columns.setAccessible(true); + List fields = new ArrayList<>(); + for (ClickHouseColumn column : (Iterable) columns.get(clickHouseRecord)) { + fields.add(new Field(column.getColumnIndex(), column.getColumnName(), getSchema(column.getDataType().getObjectClass()))); + } + return fields; + } - this.server = ClickHouseNode.builder() - .host(config.getClickHouseHost()) - .port(ClickHouseProtocol.HTTP, Integer.valueOf(config.getClickHousePort())) - .database(config.getDatabase()).credentials(getCredentials(config)) - .build(); + private RecordPartition buildRecordPartition(String partitionValue) { + Map partitionMap = new HashMap<>(); + partitionMap.put(ClickHouseConstants.CLICKHOUSE_PARTITION, partitionValue); + return new RecordPartition(partitionMap); + } + private Struct buildPayLoad(List fields, Schema schema, ClickHouseRecord clickHouseRecord) { + Struct payLoad = new Struct(schema); + for (int i = 0; i < fields.size(); i++) { + payLoad.put(fields.get(i), clickHouseRecord.getValue(i).asObject()); + } + return payLoad; } - private ClickHouseCredentials getCredentials (ClickhouseConfig config){ - if (config.getUserName() != null && config.getPassWord() != null) { - return ClickHouseCredentials.fromUserAndPassword(config.getUserName(), config.getPassWord()); + private KeyValue buildExtensions(ClickHouseRecord clickHouseRecord) { + KeyValue keyValue = new DefaultKeyValue(); + String topicName = config.getTopic(); + if (topicName == null || topicName.equals("")) { + String connectorName = this.sourceTaskContext.getConnectorName(); + topicName = config.getTable() + "_" + connectorName; } - if (config.getAccessToken() != null) { - return ClickHouseCredentials.fromAccessToken(config.getAccessToken()); + keyValue.put(ClickHouseConstants.TOPIC, topicName); + return keyValue; + } + + private RecordOffset buildRecordOffset(long offset) { + Map offsetMap = new HashMap<>(); + offsetMap.put(config.getTable() + "_" + ClickHouseConstants.CLICKHOUSE_OFFSET, offset); + return new RecordOffset(offsetMap); + } + + private static Schema getSchema(Class clazz) { + if (clazz.equals(Byte.class)) { + return SchemaBuilder.int8().build(); + } else if (clazz.equals(Short.class) || clazz.equals(UnsignedByte.class)) { + return SchemaBuilder.int16().build(); + } else if (clazz.equals(Integer.class) || clazz.equals(UnsignedShort.class)) { + return SchemaBuilder.int32().build(); + } else if (clazz.equals(Long.class) || clazz.equals(UnsignedInteger.class)) { + return SchemaBuilder.int64().build(); + } else if (clazz.equals(Float.class)) { + return SchemaBuilder.float32().build(); + } else if (clazz.equals(Double.class)) { + return SchemaBuilder.float64().build(); + } else if (clazz.equals(String.class)) { + return SchemaBuilder.string().build(); + } else if (clazz.equals(Date.class) || clazz.equals(LocalDateTime.class) || clazz.equals(LocalDate.class)) { + return SchemaBuilder.time().build(); + } else if (clazz.equals(Timestamp.class)) { + return SchemaBuilder.timestamp().build(); + } else if (clazz.equals(Boolean.class)) { + return SchemaBuilder.bool().build(); } - throw new RuntimeException("Credentials cannot be empty!"); + return SchemaBuilder.string().build(); + } + @Override public void start(KeyValue keyValue) { + this.config = new ClickHouseSourceConfig(); + this.config.load(keyValue); + this.helperClient = new ClickHouseHelperClient(this.config); + if (!helperClient.ping()) { + throw new RuntimeException("Cannot connect to clickhouse server!"); + } } @Override public void stop() { - this.server = null; + this.helperClient = null; } } diff --git a/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTaskTest.java b/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTaskTest.java new file mode 100644 index 00000000..7c506e46 --- /dev/null +++ b/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTaskTest.java @@ -0,0 +1,41 @@ +package org.apache.rocketmq.connect.clickhouse.source; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.internal.DefaultKeyValue; +import java.util.List; +import junit.framework.TestCase; +import org.apache.rocketmq.connect.clickhouse.config.ClickHouseConstants; + +import static java.lang.Thread.sleep; + +public class ClickHouseSourceTaskTest extends TestCase { + + private static final String host = "120.48.26.195"; + private static final String port = "8123"; + private static final String db = "default"; + private static final String username = "default"; + private static final String password = "123456"; + + public void testPoll() { + } + + public void testStart() throws InterruptedException { + ClickHouseSourceTask task = new ClickHouseSourceTask(); + KeyValue config = new DefaultKeyValue(); + config.put(ClickHouseConstants.CLICKHOUSE_HOST, host); + config.put(ClickHouseConstants.CLICKHOUSE_PORT, port); + config.put(ClickHouseConstants.CLICKHOUSE_DATABASE, db); + config.put(ClickHouseConstants.CLICKHOUSE_USERNAME, username); + config.put(ClickHouseConstants.CLICKHOUSE_PASSWORD, password); + config.put(ClickHouseConstants.CLICKHOUSE_TABLE, "tableName"); + task.start(config); + while (true) { + List records = task.poll(); + for (ConnectRecord r : records) { + System.out.println(r); + } + sleep(3000); + } + } +} \ No newline at end of file diff --git a/distribution/conf/connect-standalone.conf b/distribution/conf/connect-standalone.conf index 06c16c81..873affae 100644 --- a/distribution/conf/connect-standalone.conf +++ b/distribution/conf/connect-standalone.conf @@ -31,6 +31,6 @@ autoCreateGroupEnable=false clusterName="DefaultCluster" # Source or sink connector jar file dir,The default value is rocketmq-connect-sample -pluginPaths=rocketmq-connect-sample/target/rocketmq-connect-sample-0.0.1-SNAPSHOT.jar +pluginPaths=/Users/qiaoao/Documents/GitHub/rocketmq-connect/connectors/rocketmq-connect-clickhouse/target/rocketmq-connect-clickhouse-1.0-SNAPSHOT-jar-with-dependencies.jar From eb5033dae27ee488feffe505dad0953fad90abe1 Mon Sep 17 00:00:00 2001 From: joeCarf Date: Sun, 7 May 2023 00:13:52 +0800 Subject: [PATCH 5/8] finish sink connector --- .../rocketmq-connect-clickhouse/README.md | 17 +- .../rocketmq-connect-clickhouse/pom.xml | 13 +- .../clickhouse/ClickHouseHelperClient.java | 159 +++++----------- .../config/ClickHouseBaseConfig.java | 9 - .../config/ClickHouseSinkConfig.java | 13 ++ .../config/ClickHouseSourceConfig.java | 1 + .../sink/ClickHouseSinkConnector.java | 22 ++- .../clickhouse/sink/ClickHouseSinkTask.java | 179 +++--------------- .../source/ClickHouseSourceConnector.java | 3 - .../source/ClickHouseSourceTask.java | 1 - .../sink/ClickHouseSinkTaskTest.java | 31 ++- 11 files changed, 150 insertions(+), 298 deletions(-) create mode 100644 connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseSinkConfig.java diff --git a/connectors/rocketmq-connect-clickhouse/README.md b/connectors/rocketmq-connect-clickhouse/README.md index ba2f0143..fc1756d4 100644 --- a/connectors/rocketmq-connect-clickhouse/README.md +++ b/connectors/rocketmq-connect-clickhouse/README.md @@ -28,7 +28,7 @@ POST http://localhost:8082/connectors/clickhouseSourceConnector "username":"default", "password":"123456", "table":"tableName", - "topic":"topic", + "topic":"testClickHouseTopic", "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" } @@ -37,13 +37,16 @@ POST http://localhost:8082/connectors/clickhouseSourceConnector **elasticsearch-sink-connector** start ``` -POST http://${runtime-ip}:${runtime-port}/connectors/elasticsearchSinkConnector +POST http://localhost:8082/connectors/clickhouseSinkConnector { - "connector.class":"org.apache.rocketmq.connect.elasticsearch.connector.ElasticsearchSinkConnector", - "elasticsearchHost":"localhost", - "elasticsearchPort":9200, - "max.tasks":1, - "connect.topicnames":"esTopic", + "connector.class":"org.apache.rocketmq.connect.clickhouse.sink.ClickHouseSinkConnector", + "clickhousehost":"120.48.26.195", + "clickhouseport":8123, + "database":"clickhouse", + "username":"default", + "password":"123456", + "table":"tableName", + "connect.topicnames":"testClickHouseTopic", "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" } diff --git a/connectors/rocketmq-connect-clickhouse/pom.xml b/connectors/rocketmq-connect-clickhouse/pom.xml index d280ca08..76d7b868 100644 --- a/connectors/rocketmq-connect-clickhouse/pom.xml +++ b/connectors/rocketmq-connect-clickhouse/pom.xml @@ -162,6 +162,13 @@ 0.1.4 compile + + com.clickhouse + clickhouse-jdbc + 0.4.5 + + all + org.lz4 @@ -192,12 +199,6 @@ slf4j-api 1.7.7 - - com.google.code.gson - gson - 2.2.4 - compile - \ No newline at end of file diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/ClickHouseHelperClient.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/ClickHouseHelperClient.java index f6068acf..fafd13f9 100644 --- a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/ClickHouseHelperClient.java +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/ClickHouseHelperClient.java @@ -23,9 +23,17 @@ import com.clickhouse.client.ClickHouseProtocol; import com.clickhouse.client.ClickHouseResponse; import com.clickhouse.data.ClickHouseFormat; +import com.clickhouse.data.ClickHouseOutputStream; import com.clickhouse.data.ClickHouseRecord; +import com.clickhouse.data.ClickHouseWriter; +import com.clickhouse.jdbc.ClickHouseDataSource; +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; import java.util.ArrayList; import java.util.List; +import java.util.Properties; import org.apache.rocketmq.connect.clickhouse.config.ClickHouseConstants; import org.apache.rocketmq.connect.clickhouse.config.ClickHouseBaseConfig; import org.slf4j.Logger; @@ -118,116 +126,43 @@ public List query(String query, ClickHouseFormat clickHouseFor } -// public List showTables() { -// List tablesNames = new ArrayList<>(); -// try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP); -// ClickHouseResponse response = client.connect(server) // or client.connect(endpoints) -// // you'll have to parse response manually if using a different format -// -// .query("SHOW TABLES") -// .executeAndWait()) { -// for (ClickHouseRecord r : response.records()) { -// ClickHouseValue v = r.getValue(0); -// String tableName = v.asString(); -// tablesNames.add(tableName); -// } -// -// } catch (ClickHouseException e) { -// LOGGER.error("Failed in show tables", e); -// } -// return tablesNames; -// } - -// public Table describeTable(String tableName) { -// if (tableName.startsWith(".inner")) -// return null; -// String describeQuery = String.format("DESCRIBE TABLE `%s`.`%s`", this.database, tableName); -// -// try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP); -// ClickHouseResponse response = client.connect(server) // or client.connect(endpoints) -// .query(describeQuery) -// .executeAndWait()) { -// Table table = new Table(tableName); -// for (ClickHouseRecord r : response.records()) { -// ClickHouseValue v = r.getValue(0); -// String value = v.asString(); -// String[] cols = value.split("\t"); -// if (cols.length > 2) { -// String defaultKind = cols[2]; -// if ("ALIAS".equals(defaultKind) || "MATERIALIZED".equals(defaultKind)) { -// // Only insert into "real" columns -// continue; -// } -// } -// String name = cols[0]; -// String type = cols[1]; -// table.addColumn(Column.extractColumn(name, type, false)); -// } -// return table; -// } catch (ClickHouseException e) { -// LOGGER.error(String.format("Got exception when running %s", describeQuery), e); -// return null; -// } -// -// } -// public List
extractTablesMapping() { -// List
tableList = new ArrayList<>(); -// for (String tableName : showTables() ) { -// Table table = describeTable(tableName); -// if (table != null ) -// tableList.add(table); -// } -// return tableList; -// } - -// public static class ClickHouseClientBuilder{ -// private String hostname = null; -// private int port = -1; -// private String username = "default"; -// private String database = "default"; -// private String password = ""; -// private boolean sslEnabled = false; -// private int timeout = ClickHouseSinkConfig.timeoutSecondsDefault * ClickHouseSinkConfig.MILLI_IN_A_SEC; -// private int retry = ClickHouseSinkConfig.retryCountDefault; -// public ClickHouseClientBuilder(String hostname, int port) { -// this.hostname = hostname; -// this.port = port; -// } -// -// -// public ClickHouseClientBuilder setUsername(String username) { -// this.username = username; -// return this; -// } -// -// public ClickHouseClientBuilder setPassword(String password) { -// this.password = password; -// return this; -// } -// -// public ClickHouseClientBuilder setDatabase(String database) { -// this.database = database; -// return this; -// } -// -// public ClickHouseClientBuilder sslEnable(boolean sslEnabled) { -// this.sslEnabled = sslEnabled; -// return this; -// } -// -// public ClickHouseClientBuilder setTimeout(int timeout) { -// this.timeout = timeout; -// return this; -// } -// -// public ClickHouseClientBuilder setRetry(int retry) { -// this.retry = retry; -// return this; -// } -// -// public ClickHouseHelperClient build(){ -// return new ClickHouseHelperClient(this); -// } -// -// } + private Connection getConnection(String url, Properties properties) throws SQLException { + ClickHouseDataSource dataSource = new ClickHouseDataSource(url, properties); + Connection conn = dataSource.getConnection(config.getUserName(), config.getPassWord()); + + System.out.println("Connected to: " + conn.getMetaData().getURL()); + return conn; + } + + private boolean insertJson(String jsonString, String table, String sql, String url) { + + try (Connection connection = getConnection(url, new Properties()); + PreparedStatement ps = connection.prepareStatement(sql)) { + ps.setObject(1, new ClickHouseWriter() { + @Override + public void write(ClickHouseOutputStream output) throws IOException { + output.writeBytes(jsonString.getBytes()); + } + }); + ps.executeUpdate(); + + } catch (Exception e) { + return false; + } + return true; + } + + public void insertJson(String jsonString, String table) { + String url = String.format("jdbc:clickhouse://%s:%s/%s", config.getClickHouseHost(), config.getClickHousePort(), config.getDatabase()); + String sql = String.format("INSERT INTO %s FORMAT JSONEachRow", table); + int retryCount = 0; + + while (retryCount < this.retry) { + if (insertJson(jsonString, table, sql, url)) { + return; + } + } + LOGGER.error(String.format("Insert into table %s error", table)); + } + } diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseBaseConfig.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseBaseConfig.java index 32b09b82..64eb6766 100644 --- a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseBaseConfig.java +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseBaseConfig.java @@ -19,18 +19,9 @@ import io.openmessaging.KeyValue; import java.lang.reflect.Method; -import java.util.HashSet; -import java.util.Set; public class ClickHouseBaseConfig { - public static final Set REQUEST_CONFIG = new HashSet() { - { - add(ClickHouseConstants.CLICKHOUSE_HOST); - add(ClickHouseConstants.CLICKHOUSE_PORT); - } - }; - private String clickHouseHost; private Integer clickHousePort; diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseSinkConfig.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseSinkConfig.java new file mode 100644 index 00000000..ec2710f8 --- /dev/null +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseSinkConfig.java @@ -0,0 +1,13 @@ +package org.apache.rocketmq.connect.clickhouse.config; + +import java.util.HashSet; +import java.util.Set; + +public class ClickHouseSinkConfig extends ClickHouseBaseConfig { + public static final Set SINK_REQUEST_CONFIG = new HashSet() { + { + add(ClickHouseConstants.CLICKHOUSE_HOST); + add(ClickHouseConstants.CLICKHOUSE_PORT); + } + }; +} diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseSourceConfig.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseSourceConfig.java index a3120d03..c7e754d9 100644 --- a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseSourceConfig.java +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseSourceConfig.java @@ -27,6 +27,7 @@ public class ClickHouseSourceConfig extends ClickHouseBaseConfig { add(ClickHouseConstants.CLICKHOUSE_HOST); add(ClickHouseConstants.CLICKHOUSE_PORT); add(ClickHouseConstants.CLICKHOUSE_TABLE); + add(ClickHouseConstants.TOPIC); } }; private String table; diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkConnector.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkConnector.java index f2585078..ae236ca2 100644 --- a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkConnector.java +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkConnector.java @@ -1,12 +1,28 @@ -package org.apache.rocketmq.connect.clickhouse.sink; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.connect.clickhouse.sink; import io.openmessaging.KeyValue; import io.openmessaging.connector.api.component.task.Task; import io.openmessaging.connector.api.component.task.sink.SinkConnector; import java.util.ArrayList; import java.util.List; -import org.apache.rocketmq.connect.clickhouse.config.ClickHouseBaseConfig; +import org.apache.rocketmq.connect.clickhouse.config.ClickHouseSinkConfig; public class ClickHouseSinkConnector extends SinkConnector { @@ -26,7 +42,7 @@ public class ClickHouseSinkConnector extends SinkConnector { @Override public void start(KeyValue value) { - for (String requestKey : ClickHouseBaseConfig.REQUEST_CONFIG) { + for (String requestKey : ClickHouseSinkConfig.SINK_REQUEST_CONFIG) { if (!value.containsKey(requestKey)) { throw new RuntimeException("Request config key: " + requestKey); } diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTask.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTask.java index 4ad8b23f..7dc31d58 100644 --- a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTask.java +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTask.java @@ -17,193 +17,64 @@ package org.apache.rocketmq.connect.clickhouse.sink; +import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; -import com.clickhouse.client.ClickHouseClient; -import com.clickhouse.client.ClickHouseConfig; -import com.clickhouse.client.ClickHouseCredentials; -import com.clickhouse.client.ClickHouseNode; -import com.clickhouse.client.ClickHouseProtocol; -import com.clickhouse.client.ClickHouseRequest; -import com.clickhouse.client.ClickHouseResponse; -import com.clickhouse.client.ClickHouseResponseSummary; -import com.clickhouse.client.config.ClickHouseClientOption; - -import com.clickhouse.data.ClickHouseDataStreamFactory; -import com.clickhouse.data.ClickHouseFormat; -import com.clickhouse.data.ClickHousePipedOutputStream; -import com.clickhouse.data.format.BinaryStreamUtils; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; import io.openmessaging.KeyValue; import io.openmessaging.connector.api.component.task.sink.SinkTask; import io.openmessaging.connector.api.data.ConnectRecord; import io.openmessaging.connector.api.data.Field; -import io.openmessaging.connector.api.data.Schema; -import io.openmessaging.connector.api.data.SchemaBuilder; import io.openmessaging.connector.api.data.Struct; import io.openmessaging.connector.api.errors.ConnectException; -import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import org.apache.rocketmq.connect.clickhouse.config.ClickHouseBaseConfig; +import org.apache.rocketmq.connect.clickhouse.ClickHouseHelperClient; +import org.apache.rocketmq.connect.clickhouse.config.ClickHouseSinkConfig; public class ClickHouseSinkTask extends SinkTask { - public ClickHouseBaseConfig config; + public ClickHouseSinkConfig config; - private ClickHouseNode server; + private ClickHouseHelperClient helperClient; @Override public void put(List sinkRecords) throws ConnectException { if (sinkRecords == null || sinkRecords.size() < 1) { return; } + Map valueMap = new HashMap<>(); + for (ConnectRecord record : sinkRecords) { + String table = record.getSchema().getName(); + JSONArray jsonArray = valueMap.getOrDefault(table, new JSONArray()); - try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol())) { + final List fields = record.getSchema().getFields(); + final Struct structData = (Struct) record.getData(); - boolean pingOK = client.ping(server, 30000); - if (!pingOK) { - throw new RuntimeException("Cannot connect to clickhouse server!"); + JSONObject object = new JSONObject(); + for (Field field : fields) { + object.put(field.getName(), structData.get(field)); } - for (ConnectRecord record : sinkRecords) { - - String table = record.getSchema().getName(); - ClickHouseRequest.Mutation request = client.connect(server) - .write() - .table(table) - .format(ClickHouseFormat.JSONEachRow); - - ClickHouseConfig config = request.getConfig(); - request.option(ClickHouseClientOption.WRITE_BUFFER_SIZE, 8192); - try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance() - .createPipedOutputStream(config, (Runnable) null)) { - CompletableFuture future = request.data(stream.getInputStream()).execute(); - - final List fields = record.getSchema().getFields(); - final Struct structData = (Struct) record.getData(); - Gson gson = new Gson(); - Map data = new HashMap<>(); - java.lang.reflect.Type gsonType = new TypeToken() { - }.getType(); - - JSONObject object = new JSONObject(); - for (Field field : fields) { - object.put(field.getName(), structData.get(field)); - data.put(field.getName(), structData.get(field)); - } - Schema NESTED_SCHEMA = SchemaBuilder.struct().build(); - - String gsonString = gson.toJson(data, gsonType); -// BinaryStreamUtils.writeBytes(stream, object.toJSONString().getBytes(StandardCharsets.UTF_8)); - BinaryStreamUtils.writeBytes(stream, gsonString.getBytes(StandardCharsets.UTF_8)); - try (ClickHouseResponse response = future.get()) { - ClickHouseResponseSummary summary = response.getSummary(); - - } - } - - } + jsonArray.add(object); + valueMap.put(table, jsonArray); + } -// ClickHouseRequest.Mutation request = client.connect(server).write().table("table") -// .format(ClickHouseFormat.RowBinary); -// ClickHouseConfig config = request.getConfig(); -// CompletableFuture future; -// // back-pressuring is not supported, you can adjust the first two arguments -// try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance() -// .createPipedOutputStream(config, (Runnable) null)) { -// // in async mode, which is default, execution happens in a worker thread -// future = request.data(stream.getInputStream()).execute(); -// -// // writing happens in main thread -// for (int i = 0; i < 10_000; i++) { -// BinaryStreamUtils.writeString(stream, String.valueOf(i % 16)); -// BinaryStreamUtils.writeNonNull(stream); -// BinaryStreamUtils.writeString(stream, UUID.randomUUID().toString()); -// } -// } -// -// // response should be always closed -// try (ClickHouseResponse response = future.get()) { -// ClickHouseResponseSummary summary = response.getSummary(); -//// return summary.getWrittenRows(); -// } - } catch (Exception e) { - throw new RuntimeException(e); + for (Map.Entry entry : valueMap.entrySet()) { + String jsonString = entry.getValue().toString(); + helperClient.insertJson(jsonString, entry.getKey()); } } -// static int query(ClickHouseNode server, String table) throws ClickHouseException { -// try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol()); -// ClickHouseResponse response = client.read(server) -// // prefer to use RowBinaryWithNamesAndTypes as it's fully supported -// // see details at https://github.com/ClickHouse/clickhouse-java/issues/928 -// .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) -// .query("select * from " + table).execute().get()) { -// int count = 0; -// // or use stream API via response.stream() -// for (ClickHouseRecord r : response.records()) { -// count++; -// } -// return count; -// } catch (InterruptedException e) { -// Thread.currentThread().interrupt(); -// throw ClickHouseException.forCancellation(e, server); -// } catch (ExecutionException e) { -// throw ClickHouseException.of(e, server); -// } -// } - @Override public void start(KeyValue keyValue) { - this.config = new ClickHouseBaseConfig(); + this.config = new ClickHouseSinkConfig(); this.config.load(keyValue); - - this.server = ClickHouseNode.builder() - .host(config.getClickHouseHost()) - .port(ClickHouseProtocol.HTTP, config.getClickHousePort()) - .database(config.getDatabase()).credentials(getCredentials(config)) - .build(); - -// ClickHouseClient clientPing = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP); -// boolean pingOK = clientPing.ping(server, 30000); -// if (!pingOK) { -// throw new RuntimeException("Cannot connect to clickhouse server!"); -// } -// try { -// dropAndCreateTable(server, "tableName"); -// } catch (ClickHouseException e) { -// e.printStackTrace(); -// } - - } - - void dropAndCreateTable(ClickHouseNode server, String table) throws Exception { - try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol())) { - ClickHouseRequest request = client.connect(server); - // or use future chaining - request.query("drop table if exists " + table).execute().get(); - request.query("create table " + table + "(a String, b Nullable(String)) engine=MergeTree() order by a") - .execute().get(); - } catch (Exception e) { - throw new RuntimeException(e); + this.helperClient = new ClickHouseHelperClient(this.config); + if (!helperClient.ping()) { + throw new RuntimeException("Cannot connect to clickhouse server!"); } } - private ClickHouseCredentials getCredentials(ClickHouseBaseConfig config) { - if (config.getUserName() != null && config.getPassWord() != null) { - return ClickHouseCredentials.fromUserAndPassword(config.getUserName(), config.getPassWord()); - } - if (config.getAccessToken() != null) { - return ClickHouseCredentials.fromAccessToken(config.getAccessToken()); - } - throw new RuntimeException("Credentials cannot be empty!"); - - } - @Override public void stop() { - this.server = null; + this.helperClient = null; } } diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceConnector.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceConnector.java index 589b8159..3a9f8e4b 100644 --- a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceConnector.java +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceConnector.java @@ -22,15 +22,12 @@ import io.openmessaging.connector.api.component.task.source.SourceConnector; import java.util.ArrayList; import java.util.List; -import org.apache.rocketmq.connect.clickhouse.config.ClickHouseBaseConfig; import org.apache.rocketmq.connect.clickhouse.config.ClickHouseSourceConfig; public class ClickHouseSourceConnector extends SourceConnector { private KeyValue keyValue; - private ClickHouseBaseConfig config; - @Override public List taskConfigs(int maxTasks) { List configs = new ArrayList<>(); for (int i = 0; i < maxTasks; i++) { diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTask.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTask.java index 9619285d..efe91b4c 100644 --- a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTask.java +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTask.java @@ -18,7 +18,6 @@ package org.apache.rocketmq.connect.clickhouse.source; import com.clickhouse.data.ClickHouseColumn; -import com.clickhouse.data.ClickHouseDataType; import com.clickhouse.data.ClickHouseRecord; import com.clickhouse.data.value.UnsignedByte; import com.clickhouse.data.value.UnsignedInteger; diff --git a/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTaskTest.java b/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTaskTest.java index 18f3c620..7c49a874 100644 --- a/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTaskTest.java +++ b/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTaskTest.java @@ -39,7 +39,18 @@ public static void main(String[] args) { struct.put("c1",param0); struct.put("c2",String.format("test-data-%s", param0)); - ConnectRecord record = new ConnectRecord( + Schema schema2 = SchemaBuilder.struct() + .name("t1") + .field("c1",SchemaBuilder.string().build()) + .field("c2", SchemaBuilder.string().build()) + .build(); + // build record + Struct struct2= new Struct(schema2); + struct.put("c1",param0); + struct.put("c2",String.format("test-data-%s", param0)); + + for (int i = 0; i < 4; i++) { + ConnectRecord record = new ConnectRecord( // offset partition // offset partition" new RecordPartition(new ConcurrentHashMap<>()), @@ -47,8 +58,22 @@ public static void main(String[] args) { System.currentTimeMillis(), schema, struct - ); - records.add(record); + ); + records.add(record); + + ConnectRecord record2 = new ConnectRecord( + // offset partition + // offset partition" + new RecordPartition(new ConcurrentHashMap<>()), + new RecordOffset(new HashMap<>()), + System.currentTimeMillis(), + schema2, + struct + ); + records.add(record2); + + } + ClickHouseSinkTask task = new ClickHouseSinkTask(); KeyValue config = new DefaultKeyValue(); config.put(ClickHouseConstants.CLICKHOUSE_HOST, host); From 969b39520753da13bd693173857ce4ba8ea11f9c Mon Sep 17 00:00:00 2001 From: joeCarf Date: Sun, 7 May 2023 00:17:05 +0800 Subject: [PATCH 6/8] add license --- .../config/ClickHouseSinkConfig.java | 17 +++ .../{ => helper}/ClickHouseHelperClient.java | 2 +- .../clickhouse/sink/ClickHouseSinkTask.java | 2 +- .../source/ClickHouseSourceTask.java | 2 +- .../sink/ClickHouseSinkTaskTest.java | 138 +++++++++--------- .../source/ClickHouseSourceTaskTest.java | 56 +++---- 6 files changed, 117 insertions(+), 100 deletions(-) rename connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/{ => helper}/ClickHouseHelperClient.java (99%) diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseSinkConfig.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseSinkConfig.java index ec2710f8..2353d1a8 100644 --- a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseSinkConfig.java +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/config/ClickHouseSinkConfig.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.connect.clickhouse.config; import java.util.HashSet; diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/ClickHouseHelperClient.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/helper/ClickHouseHelperClient.java similarity index 99% rename from connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/ClickHouseHelperClient.java rename to connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/helper/ClickHouseHelperClient.java index fafd13f9..ccd73c8e 100644 --- a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/ClickHouseHelperClient.java +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/helper/ClickHouseHelperClient.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.connect.clickhouse; +package org.apache.rocketmq.connect.clickhouse.helper; import com.clickhouse.client.ClickHouseClient; import com.clickhouse.client.ClickHouseCredentials; diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTask.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTask.java index 7dc31d58..2cfc1c12 100644 --- a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTask.java +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTask.java @@ -28,7 +28,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.rocketmq.connect.clickhouse.ClickHouseHelperClient; +import org.apache.rocketmq.connect.clickhouse.helper.ClickHouseHelperClient; import org.apache.rocketmq.connect.clickhouse.config.ClickHouseSinkConfig; public class ClickHouseSinkTask extends SinkTask { diff --git a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTask.java b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTask.java index efe91b4c..0de60c2e 100644 --- a/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTask.java +++ b/connectors/rocketmq-connect-clickhouse/src/main/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTask.java @@ -40,7 +40,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.rocketmq.connect.clickhouse.ClickHouseHelperClient; +import org.apache.rocketmq.connect.clickhouse.helper.ClickHouseHelperClient; import org.apache.rocketmq.connect.clickhouse.config.ClickHouseConstants; import org.apache.rocketmq.connect.clickhouse.config.ClickHouseSourceConfig; import org.slf4j.Logger; diff --git a/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTaskTest.java b/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTaskTest.java index 7c49a874..11be1300 100644 --- a/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTaskTest.java +++ b/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/sink/ClickHouseSinkTaskTest.java @@ -16,74 +16,74 @@ class ClickHouseSinkTaskTest { - - private static final String host = "120.48.26.195"; - private static final String port = "8123"; - private static final String db = "default"; - private static final String username = "default"; - private static final String password = "123456"; - - - - public static void main(String[] args) { - List records = new ArrayList<>(); - // build schema - Schema schema = SchemaBuilder.struct() - .name("tableName") - .field("c1",SchemaBuilder.string().build()) - .field("c2", SchemaBuilder.string().build()) - .build(); - // build record - String param0 = "1001"; - Struct struct= new Struct(schema); - struct.put("c1",param0); - struct.put("c2",String.format("test-data-%s", param0)); - - Schema schema2 = SchemaBuilder.struct() - .name("t1") - .field("c1",SchemaBuilder.string().build()) - .field("c2", SchemaBuilder.string().build()) - .build(); - // build record - Struct struct2= new Struct(schema2); - struct.put("c1",param0); - struct.put("c2",String.format("test-data-%s", param0)); - - for (int i = 0; i < 4; i++) { - ConnectRecord record = new ConnectRecord( - // offset partition - // offset partition" - new RecordPartition(new ConcurrentHashMap<>()), - new RecordOffset(new HashMap<>()), - System.currentTimeMillis(), - schema, - struct - ); - records.add(record); - - ConnectRecord record2 = new ConnectRecord( - // offset partition - // offset partition" - new RecordPartition(new ConcurrentHashMap<>()), - new RecordOffset(new HashMap<>()), - System.currentTimeMillis(), - schema2, - struct - ); - records.add(record2); - - } - - ClickHouseSinkTask task = new ClickHouseSinkTask(); - KeyValue config = new DefaultKeyValue(); - config.put(ClickHouseConstants.CLICKHOUSE_HOST, host); - config.put(ClickHouseConstants.CLICKHOUSE_PORT, port); - config.put(ClickHouseConstants.CLICKHOUSE_DATABASE, db); - config.put(ClickHouseConstants.CLICKHOUSE_USERNAME, username); - config.put(ClickHouseConstants.CLICKHOUSE_PASSWORD, password); - task.start(config); - task.put(records); - - } +// +// private static final String host = "120.48.26.195"; +// private static final String port = "8123"; +// private static final String db = "default"; +// private static final String username = "default"; +// private static final String password = "123456"; +// +// +// +// public static void main(String[] args) { +// List records = new ArrayList<>(); +// // build schema +// Schema schema = SchemaBuilder.struct() +// .name("tableName") +// .field("c1",SchemaBuilder.string().build()) +// .field("c2", SchemaBuilder.string().build()) +// .build(); +// // build record +// String param0 = "1001"; +// Struct struct= new Struct(schema); +// struct.put("c1",param0); +// struct.put("c2",String.format("test-data-%s", param0)); +// +// Schema schema2 = SchemaBuilder.struct() +// .name("t1") +// .field("c1",SchemaBuilder.string().build()) +// .field("c2", SchemaBuilder.string().build()) +// .build(); +// // build record +// Struct struct2= new Struct(schema2); +// struct.put("c1",param0); +// struct.put("c2",String.format("test-data-%s", param0)); +// +// for (int i = 0; i < 4; i++) { +// ConnectRecord record = new ConnectRecord( +// // offset partition +// // offset partition" +// new RecordPartition(new ConcurrentHashMap<>()), +// new RecordOffset(new HashMap<>()), +// System.currentTimeMillis(), +// schema, +// struct +// ); +// records.add(record); +// +// ConnectRecord record2 = new ConnectRecord( +// // offset partition +// // offset partition" +// new RecordPartition(new ConcurrentHashMap<>()), +// new RecordOffset(new HashMap<>()), +// System.currentTimeMillis(), +// schema2, +// struct +// ); +// records.add(record2); +// +// } +// +// ClickHouseSinkTask task = new ClickHouseSinkTask(); +// KeyValue config = new DefaultKeyValue(); +// config.put(ClickHouseConstants.CLICKHOUSE_HOST, host); +// config.put(ClickHouseConstants.CLICKHOUSE_PORT, port); +// config.put(ClickHouseConstants.CLICKHOUSE_DATABASE, db); +// config.put(ClickHouseConstants.CLICKHOUSE_USERNAME, username); +// config.put(ClickHouseConstants.CLICKHOUSE_PASSWORD, password); +// task.start(config); +// task.put(records); +// +// } } \ No newline at end of file diff --git a/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTaskTest.java b/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTaskTest.java index 7c506e46..e7a10a80 100644 --- a/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTaskTest.java +++ b/connectors/rocketmq-connect-clickhouse/src/test/java/org/apache/rocketmq/connect/clickhouse/source/ClickHouseSourceTaskTest.java @@ -9,33 +9,33 @@ import static java.lang.Thread.sleep; -public class ClickHouseSourceTaskTest extends TestCase { +public class ClickHouseSourceTaskTest { - private static final String host = "120.48.26.195"; - private static final String port = "8123"; - private static final String db = "default"; - private static final String username = "default"; - private static final String password = "123456"; - - public void testPoll() { - } - - public void testStart() throws InterruptedException { - ClickHouseSourceTask task = new ClickHouseSourceTask(); - KeyValue config = new DefaultKeyValue(); - config.put(ClickHouseConstants.CLICKHOUSE_HOST, host); - config.put(ClickHouseConstants.CLICKHOUSE_PORT, port); - config.put(ClickHouseConstants.CLICKHOUSE_DATABASE, db); - config.put(ClickHouseConstants.CLICKHOUSE_USERNAME, username); - config.put(ClickHouseConstants.CLICKHOUSE_PASSWORD, password); - config.put(ClickHouseConstants.CLICKHOUSE_TABLE, "tableName"); - task.start(config); - while (true) { - List records = task.poll(); - for (ConnectRecord r : records) { - System.out.println(r); - } - sleep(3000); - } - } +// private static final String host = "120.48.26.195"; +// private static final String port = "8123"; +// private static final String db = "default"; +// private static final String username = "default"; +// private static final String password = "123456"; +// +// public void testPoll() { +// } +// +// public void testStart() throws InterruptedException { +// ClickHouseSourceTask task = new ClickHouseSourceTask(); +// KeyValue config = new DefaultKeyValue(); +// config.put(ClickHouseConstants.CLICKHOUSE_HOST, host); +// config.put(ClickHouseConstants.CLICKHOUSE_PORT, port); +// config.put(ClickHouseConstants.CLICKHOUSE_DATABASE, db); +// config.put(ClickHouseConstants.CLICKHOUSE_USERNAME, username); +// config.put(ClickHouseConstants.CLICKHOUSE_PASSWORD, password); +// config.put(ClickHouseConstants.CLICKHOUSE_TABLE, "tableName"); +// task.start(config); +// while (true) { +// List records = task.poll(); +// for (ConnectRecord r : records) { +// System.out.println(r); +// } +// sleep(3000); +// } +// } } \ No newline at end of file From ca0d4543cd5d869ba24adb8d311a74fce6c9bcf6 Mon Sep 17 00:00:00 2001 From: joeCarf Date: Sun, 7 May 2023 00:32:14 +0800 Subject: [PATCH 7/8] Update README.md --- .../rocketmq-connect-clickhouse/README.md | 48 ++++++++----------- 1 file changed, 19 insertions(+), 29 deletions(-) diff --git a/connectors/rocketmq-connect-clickhouse/README.md b/connectors/rocketmq-connect-clickhouse/README.md index fc1756d4..48b19874 100644 --- a/connectors/rocketmq-connect-clickhouse/README.md +++ b/connectors/rocketmq-connect-clickhouse/README.md @@ -1,28 +1,13 @@ -##### ElasticsearchSourceConnector fully-qualified name -org.apache.rocketmq.connect.elasticsearch.connector.ElasticsearchSourceConnector +##### ClickHouseSourceConnector fully-qualified name +org.apache.rocketmq.connect.clickhouse.source.ClickHouseSourceConnector -**elasticsearch-source-connector** start +**clickhouse-source-connector** start ``` -POST http://${runtime-ip}:${runtime-port}/connectors/elasticsearchSourceConnector +POST http://${runtime-ip}:${runtime-port}/connectors/clickhouseSourceConnector { "connector.class":"org.apache.rocketmq.connect.clickhouse.source.ClickHouseSourceConnector", - "clickhousehost":"120.48.26.195", - "clickhouseport":8123, - "clickhousedatabase":"default", - "username":"default", - "password":"123456", - "table":"tableName", - "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", - "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" -} -``` - -``` -POST http://localhost:8082/connectors/clickhouseSourceConnector -{ - "connector.class":"org.apache.rocketmq.connect.clickhouse.source.ClickHouseSourceConnector", - "clickhousehost":"120.48.26.195", + "clickhousehost":"localhost", "clickhouseport":8123, "database":"default", "username":"default", @@ -34,18 +19,20 @@ POST http://localhost:8082/connectors/clickhouseSourceConnector } ``` -**elasticsearch-sink-connector** start +##### ClickHouseSinkConnector fully-qualified name +org.apache.rocketmq.connect.clickhouse.sink.ClickHouseSinkConnector + +**clickhouse-sink-connector** start ``` -POST http://localhost:8082/connectors/clickhouseSinkConnector +POST http://${runtime-ip}:${runtime-port}/connectors/clickhouseSinkConnector { "connector.class":"org.apache.rocketmq.connect.clickhouse.sink.ClickHouseSinkConnector", - "clickhousehost":"120.48.26.195", + "clickhousehost":"localhost", "clickhouseport":8123, "database":"clickhouse", "username":"default", "password":"123456", - "table":"tableName", "connect.topicnames":"testClickHouseTopic", "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" @@ -54,8 +41,11 @@ POST http://localhost:8082/connectors/clickhouseSinkConnector ##### parameter configuration -parameter | effect | required |default ----|-----------------------------------------------------------------------------------------------------------------------------------------------------------|----------| --- -elasticsearchHost | The Host of the Elasticsearch server | yes | null -elasticsearchPort | The Port of the Elasticsearch server | yes | null -index| The info of the index | yes | null \ No newline at end of file +| parameter | effect | required | default | +|--------------------|---------------------------------------------------|-------------------|---------| +| clickhousehost | The Host of the Clickhouse server | yes | null | +| clickhouseport | The Port of the Clickhouse server | yes | null | +| database | The database to read or write | yes | null | +| table | The source table to read | yes (source only) | null | +| topic | RocketMQ topic for source connector to write into | yes (source only) | null | +| connect.topicnames | RocketMQ topic for sink connector to read from | yes (sink only) | null | From 31cd5e4833ff8f876a334a72b92e3d7fd2d53e7e Mon Sep 17 00:00:00 2001 From: joeCarf Date: Sun, 7 May 2023 00:33:47 +0800 Subject: [PATCH 8/8] Update connect-standalone.conf --- distribution/conf/connect-standalone.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distribution/conf/connect-standalone.conf b/distribution/conf/connect-standalone.conf index 873affae..06c16c81 100644 --- a/distribution/conf/connect-standalone.conf +++ b/distribution/conf/connect-standalone.conf @@ -31,6 +31,6 @@ autoCreateGroupEnable=false clusterName="DefaultCluster" # Source or sink connector jar file dir,The default value is rocketmq-connect-sample -pluginPaths=/Users/qiaoao/Documents/GitHub/rocketmq-connect/connectors/rocketmq-connect-clickhouse/target/rocketmq-connect-clickhouse-1.0-SNAPSHOT-jar-with-dependencies.jar +pluginPaths=rocketmq-connect-sample/target/rocketmq-connect-sample-0.0.1-SNAPSHOT.jar