From 5bcca0ce9c5cb8fbacf8839b4325e124af64fc31 Mon Sep 17 00:00:00 2001 From: liuzongliang Date: Thu, 1 Jun 2023 12:40:54 +0800 Subject: [PATCH 1/2] Neo4jSourceConnector and Neo4jSinkConnector impl --- connectors/rocketmq-connect-neo4j/README.md | 228 ++++++++++++++++++ connectors/rocketmq-connect-neo4j/pom.xml | 203 ++++++++++++++++ .../connect/neo4j/config/Neo4jBaseConfig.java | 133 ++++++++++ .../connect/neo4j/config/Neo4jConstants.java | 107 ++++++++ .../connect/neo4j/config/Neo4jSinkConfig.java | 44 ++++ .../neo4j/config/Neo4jSourceConfig.java | 64 +++++ .../connect/neo4j/helper/LabelTypeEnum.java | 35 +++ .../connect/neo4j/helper/MappingRule.java | 183 ++++++++++++++ .../neo4j/helper/MappingRuleFactory.java | 94 ++++++++ .../connect/neo4j/helper/Neo4jClient.java | 126 ++++++++++ .../connect/neo4j/helper/Neo4jElement.java | 124 ++++++++++ .../connect/neo4j/helper/RecordConverter.java | 179 ++++++++++++++ .../connect/neo4j/helper/ValueType.java | 132 ++++++++++ .../neo4j/sink/Neo4jSinkConnector.java | 60 +++++ .../connect/neo4j/sink/Neo4jSinkTask.java | 72 ++++++ .../neo4j/sink/mapping/WriteMapper.java | 88 +++++++ .../neo4j/sink/write/NodeWriteProcessor.java | 72 ++++++ .../connect/neo4j/sink/write/Processor.java | 23 ++ .../write/RelationshipWriteProcessor.java | 76 ++++++ .../neo4j/sink/write/WriteProcessor.java | 57 +++++ .../neo4j/source/Neo4jSourceConnector.java | 97 ++++++++ .../connect/neo4j/source/Neo4jSourceTask.java | 137 +++++++++++ .../neo4j/source/Neo4jSourceTaskTest.java | 53 ++++ .../neo4j/source/mapping/ReadMapper.java | 106 ++++++++ .../neo4j/source/query/CqlQueryProcessor.java | 31 +++ .../neo4j/source/query/NodeQueryStrategy.java | 88 +++++++ .../neo4j/source/query/QueryRegistrar.java | 40 +++ .../query/RelationshipQueryStrategy.java | 87 +++++++ .../neo4j/source/Neo4jSourceTaskTest.java | 53 ++++ 29 files changed, 2792 insertions(+) create mode 100644 connectors/rocketmq-connect-neo4j/README.md create mode 100644 connectors/rocketmq-connect-neo4j/pom.xml create mode 100644 connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/config/Neo4jBaseConfig.java create mode 100644 connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/config/Neo4jConstants.java create mode 100644 connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/config/Neo4jSinkConfig.java create mode 100644 connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/config/Neo4jSourceConfig.java create mode 100644 connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/helper/LabelTypeEnum.java create mode 100644 connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/helper/MappingRule.java create mode 100644 connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/helper/MappingRuleFactory.java create mode 100644 connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/helper/Neo4jClient.java create mode 100644 connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/helper/Neo4jElement.java create mode 100644 connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/helper/RecordConverter.java create mode 100644 connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/helper/ValueType.java create mode 100644 connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/sink/Neo4jSinkConnector.java create mode 100644 connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/sink/Neo4jSinkTask.java create mode 100644 connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/sink/mapping/WriteMapper.java create mode 100644 connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/sink/write/NodeWriteProcessor.java create mode 100644 connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/sink/write/Processor.java create mode 100644 connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/sink/write/RelationshipWriteProcessor.java create mode 100644 connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/sink/write/WriteProcessor.java create mode 100644 connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/Neo4jSourceConnector.java create mode 100644 connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/Neo4jSourceTask.java create mode 100644 connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/Neo4jSourceTaskTest.java create mode 100644 connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/mapping/ReadMapper.java create mode 100644 connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/query/CqlQueryProcessor.java create mode 100644 connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/query/NodeQueryStrategy.java create mode 100644 connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/query/QueryRegistrar.java create mode 100644 connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/query/RelationshipQueryStrategy.java create mode 100644 connectors/rocketmq-connect-neo4j/src/test/java/org/apache/rocketmq/connect/neo4j/source/Neo4jSourceTaskTest.java diff --git a/connectors/rocketmq-connect-neo4j/README.md b/connectors/rocketmq-connect-neo4j/README.md new file mode 100644 index 00000000..219437f3 --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/README.md @@ -0,0 +1,228 @@ +### Neo4jSourceConnector fully-qualified name +org.apache.rocketmq.connect.neo4j.source.Neo4jSourceConnector + +**neo4j-source-connector node source** +``` +POST http://${runtime-ip}:${runtime-port}/connectors/neo4jSourceConnector +{ + "connector.class":"org.apache.rocketmq.connect.neo4j.source.Neo4jSourceConnector", + "neo4jHost":"localhost", + "neo4jPort":7687, + "neo4jDataBase":"test", + "neo4jUser":"test", + "neo4jPassword":"root123456", + "labelType":"node", + "topic":"nodeNeo4jTopic", + "labels":"Goods", + "column":[ + { + "name":"goodsId", + "type":"long", + "columnType":"primaryKey", + "valueExtract":"#{goodsId}" + }, + { + "name":"label", + "type":"string", + "columnType":"primaryLabel" + }, + { + "name":"goodsName", + "type":"string", + "columnType":"nodeProperty", + "valueExtract":"#{goodsName}" + } + ], + "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", + "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" +} +``` + +**neo4j-source-connector relationship source** +``` +POST http://${runtime-ip}:${runtime-port}/connectors/neo4jSourceConnector +{ + "connector.class":"org.apache.rocketmq.connect.neo4j.source.Neo4jSourceConnector", + "neo4jHost":"localhost", + "neo4jPort":7687, + "neo4jDataBase":"test", + "neo4jUser":"test", + "neo4jPassword":"root123456", + "labelType":"relationship", + "topic":"edgeNeo4jTopic", + "labels":"order_goods", + "column":[ + { + "name":"orderGoodsId", + "type":"long", + "columnType":"primaryKey", + "valueExtract":"#{orderGoodsId}" + }, + { + "name":"type", + "type":"string", + "columnType":"primaryLabel" + }, + { + "name":"orderId", + "type":"long", + "columnType":"srcPrimaryKey", + "valueExtract":"#{orderId}" + }, + { + "name":"Order", + "type":"string", + "columnType":"srcPrimaryLabel" + }, + { + "name":"goodsId", + "type":"long", + "columnType":"dstPrimaryKey", + "valueExtract":"#{goodsId}" + }, + { + "name":"Goods", + "type":"string", + "columnType":"dstPrimaryLabel" + }, + { + "name":"orderGoodsTitle", + "type":"string", + "columnType":"relationshipProperty", + "valueExtract":"#{orderGoodsTitle}" + } + ], + "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", + "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" +} +``` + +### Neo4jSinkConnector fully-qualified name +org.apache.rocketmq.connect.neo4j.sink.Neo4jSinkConnector + +**neo4j-sink-connector node sink** + +``` +POST http://${runtime-ip}:${runtime-port}/connectors/neo4jSinkConnector +{ + "connector.class":"org.apache.rocketmq.connect.neo4j.sink.Neo4jSinkConnector", + "neo4jHost":"localhost", + "neo4jPort":7687, + "neo4jDataBase":"neo4j", + "neo4jUser":"test", + "neo4jPassword":"root123456", + "labelType":"node", + "connect.topicnames":"nodeNeo4jTopic", + "labels":"Goods", + "column":[ + { + "name":"goodsId", + "type":"long", + "columnType":"primaryKey", + "valueExtract":"#{goodsId}" + }, + { + "name":"Goods", + "type":"string", + "columnType":"primaryLabel", + "valueExtract":"#{label}" + }, + { + "name":"goodsName", + "type":"string", + "columnType":"nodeProperty", + "valueExtract":"#{goodsName}" + } + ], + "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", + "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" +} +``` + +**neo4j-sink-connector relationship sink** + +``` +POST http://${runtime-ip}:${runtime-port}/connectors/neo4jSinkConnector +{ + "connector.class":"org.apache.rocketmq.connect.neo4j.sink.Neo4jSinkConnector", + "neo4jHost":"localhost", + "neo4jPort":7687, + "neo4jDataBase":"neo4j", + "neo4jUser":"test", + "neo4jPassword":"root123456", + "labelType":"relationship", + "connect.topicnames":"edgeNeo4jTopic", + "labels":"order_goods", + "column":[ + { + "name":"orderGoodsId", + "type":"long", + "columnType":"primaryKey", + "valueExtract":"#{orderGoodsId}" + }, + { + "name":"order_goods", + "type":"string", + "columnType":"primaryLabel", + "valueExtract":"#{type}" + }, + { + "name":"orderId", + "type":"long", + "columnType":"srcPrimaryKey", + "valueExtract":"#{orderId}" + }, + { + "name":"Order", + "type":"string", + "columnType":"srcPrimaryLabel", + "valueExtract":"#{Order}" + }, + { + "name":"goodsId", + "type":"long", + "columnType":"dstPrimaryKey", + "valueExtract":"#{goodsId}" + }, + { + "name":"Goods", + "type":"string", + "columnType":"dstPrimaryLabel", + "valueExtract":"#{Goods}" + }, + { + "name":"orderGoodsTitle", + "type":"string", + "columnType":"relationshipProperty", + "valueExtract":"#{orderGoodsTitle}" + } + ], + "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", + "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" +} +``` + +##### parameter configuration + +| parameter | effect | required | default | +|--------------------|-------------------------------------------------------------------|-------------------|---------| +| neo4jHost | The Host of the neo4j server | yes | null | +| neo4jPort | The Port of the neo4j server | yes | null | +| neo4jDataBase | The database to read or write | yes | null | +| neo4jUser | user | yes | null | +| neo4jPassword | password | yes | null | +| labelType | node ro relationship | yes | null | +| labels | node label ro relationship type, If multiple are separated by ',' | yes | null | +| name | mapper into ConnectRecord field name or from ConnectRecord to neo4j propertyKey | yes | null | +| type | column mapper value type | yes | null | +| columnType | neo4j data type primaryKey,primaryLabel,nodeProperty,nodeJsonProperty,srcPrimaryKey,srcPrimaryLabel,dstPrimaryKey,dstPrimaryLabel,relationshipProperty,relationshipJsonProperty,| yes | null | +| valueExtract | column mapper value extractor,if need extract value should start with'#{' end with '}',else value is fixed value | no | null | +| topic | RocketMQ topic for source connector to write into | yes (source only) | null | +| connect.topicnames | RocketMQ topic for sink connector to read from | yes (sink only) | null | + + + +参考文档: +https://neo4j.com/docs/cypher-manual/current/clauses +https://github.com/neo4j-contrib/neo4j-streams +https://github.com/alibaba/DataX diff --git a/connectors/rocketmq-connect-neo4j/pom.xml b/connectors/rocketmq-connect-neo4j/pom.xml new file mode 100644 index 00000000..c3233b05 --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/pom.xml @@ -0,0 +1,203 @@ + + + + + 4.0.0 + + org.apache.rocketmq + rocketmq-connect-neo4j + 1.0-SNAPSHOT + + connect-neo4j + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + jira + https://issues.apache.org/jira/browse/RocketMQ + + + + + + org.codehaus.mojo + versions-maven-plugin + 2.3 + + + org.codehaus.mojo + clirr-maven-plugin + 2.7 + + + org.apache.maven.plugins + maven-compiler-plugin + 3.6.1 + + ${maven.compiler.source} + ${maven.compiler.target} + ${maven.compiler.source} + true + true + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.19.1 + + -Xms512m -Xmx1024m + always + + **/*Test.java + + + + + org.apache.maven.plugins + maven-site-plugin + 3.6 + + en_US + UTF-8 + UTF-8 + + + + org.apache.maven.plugins + maven-source-plugin + 3.0.1 + + + attach-sources + + jar + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 2.10.4 + + UTF-8 + en_US + io.openmessaging.internal + + + + aggregate + + aggregate + + site + + + + + org.apache.maven.plugins + maven-resources-plugin + 3.0.2 + + ${project.build.sourceEncoding} + + + + org.codehaus.mojo + findbugs-maven-plugin + 3.0.4 + + + org.apache.rat + apache-rat-plugin + 0.12 + + + README.md + README-CN.md + + + + + org.apache.maven.plugins + maven-assembly-plugin + 3.0.0 + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + + + 8 + 8 + UTF-8 + + + + io.openmessaging + openmessaging-connector + 0.1.4 + compile + + + + org.lz4 + lz4-java + 1.8.0 + + + + org.neo4j.driver + neo4j-java-driver + 4.4.0 + + + com.alibaba + fastjson + 1.2.83 + compile + + + junit + junit + RELEASE + test + + + org.slf4j + slf4j-api + 1.7.7 + + + + \ No newline at end of file diff --git a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/config/Neo4jBaseConfig.java b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/config/Neo4jBaseConfig.java new file mode 100644 index 00000000..8eadc765 --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/config/Neo4jBaseConfig.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.neo4j.config; + +import java.lang.reflect.Method; + +import io.openmessaging.KeyValue; + +public class Neo4jBaseConfig { + private String neo4jHost; + private String neo4jPort; + private String neo4jUser; + private String neo4jPassword; + private String neo4jDataBase; + private String topic; + private String column; + + public String getNeo4jDataBase() { + return neo4jDataBase; + } + + public void setNeo4jDataBase(String neo4jDataBase) { + this.neo4jDataBase = neo4jDataBase; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getNeo4jUser() { + return neo4jUser; + } + + public void setNeo4jUser(String neo4jUser) { + this.neo4jUser = neo4jUser; + } + + public String getNeo4jPassword() { + return neo4jPassword; + } + + public void setNeo4jPassword(String neo4jPassword) { + this.neo4jPassword = neo4jPassword; + } + + public String getNeo4jHost() { + return neo4jHost; + } + + public void setNeo4jHost(String neo4jHost) { + this.neo4jHost = neo4jHost; + } + + public String getNeo4jPort() { + return neo4jPort; + } + + public void setNeo4jPort(String neo4jPort) { + this.neo4jPort = neo4jPort; + } + + public String getColumn() { + return column; + } + + public void setColumn(String column) { + this.column = column; + } + + public void load(KeyValue props) { + properties2Object(props, this); + } + + private void properties2Object(final KeyValue p, final Object object) { + + Method[] methods = object.getClass().getMethods(); + for (Method method : methods) { + String mn = method.getName(); + if (mn.startsWith("set")) { + try { + String tmp = mn.substring(3); + String key = new StringBuilder(String.valueOf(tmp.charAt(0)).toLowerCase()).append(tmp.substring(1)) + .toString(); + + String property = p.getString(key); + if (property != null) { + Class[] pt = method.getParameterTypes(); + if (pt != null && pt.length > 0) { + String cn = pt[0].getSimpleName(); + Object arg; + if (cn.equals("int") || cn.equals("Integer")) { + arg = Integer.parseInt(property); + } else if (cn.equals("long") || cn.equals("Long")) { + arg = Long.parseLong(property); + } else if (cn.equals("double") || cn.equals("Double")) { + arg = Double.parseDouble(property); + } else if (cn.equals("boolean") || cn.equals("Boolean")) { + arg = Boolean.parseBoolean(property); + } else if (cn.equals("float") || cn.equals("Float")) { + arg = Float.parseFloat(property); + } else if (cn.equals("String")) { + arg = property; + } else { + continue; + } + method.invoke(object, arg); + } + } + } catch (Throwable ignored) { + } + } + } + } +} \ No newline at end of file diff --git a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/config/Neo4jConstants.java b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/config/Neo4jConstants.java new file mode 100644 index 00000000..26dc6234 --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/config/Neo4jConstants.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.connect.neo4j.config; + +public class Neo4jConstants { + public static final String NEO4J_HOST = "neo4jHost"; + + public static final String NEO4J_PORT = "neo4jPort"; + + public static final String NEO4J_USER = "neo4jUser"; + + public static final String NEO4J_PASSWORD = "neo4jPassword"; + + public static final String NEO4J_TOPIC = "topic"; + + public static final String NEO4J_OFFSET = "OFFSET"; + + public static final String NEO4J_PARTITION = "NEO4J_PARTITION"; + + public static final String NEO4J_DB = "neo4jDataBase"; + + public static final String LABEL_TYPE = "labelType"; + + public static final String LABELS = "labels"; + + public static final String LABEL = "label"; + + public static final int timeout_Seconds_Default = 30; + + public static final int MILLI_IN_A_SEC = 1000; + + public static final int retry_Count_Default = 3; + + public static final int MAX_NUMBER_SEND_CONNECT_RECORD_EACH_TIME = 2000; + + public static final String COLUMN = "column"; + public static final String COLUMN_NAME = "name"; + public static final String VALUE_TYPE = "type"; + public static final String COLUMN_TYPE = "columnType"; + public static final String VALUE_EXTRACT = "valueExtract"; + + public enum ColumnType { + /** + * node or relationship id + */ + primaryKey, + + /** + * node label or relationship type + */ + primaryLabel, + + /** + * node property + */ + nodeProperty, + + /** + * collects all node property to Json list + */ + nodeJsonProperty, + + /** + * start node id of relationship + */ + srcPrimaryKey, + + /** + * start node label of relationship + */ + srcPrimaryLabel, + + /** + * end node id of relationship + */ + dstPrimaryKey, + + /** + * end node label of relationship + */ + dstPrimaryLabel, + + /** + * relationship property + */ + relationshipProperty, + + /** + * collects all relationship property to Json list + */ + relationshipJsonProperty, + } +} \ No newline at end of file diff --git a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/config/Neo4jSinkConfig.java b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/config/Neo4jSinkConfig.java new file mode 100644 index 00000000..74a81c5b --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/config/Neo4jSinkConfig.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.neo4j.config; + +import java.util.HashSet; +import java.util.Set; + +public class Neo4jSinkConfig extends Neo4jBaseConfig { + private String labelType; + + public String getLabelType() { + return labelType; + } + + public void setLabelType(String labelType) { + this.labelType = labelType; + } + + public static final Set SINK_REQUEST_CONFIG = new HashSet() { + { + add(Neo4jConstants.NEO4J_HOST); + add(Neo4jConstants.NEO4J_PORT); + add(Neo4jConstants.NEO4J_USER); + add(Neo4jConstants.NEO4J_PASSWORD); + add(Neo4jConstants.NEO4J_DB); + add(Neo4jConstants.LABEL_TYPE); + } + }; +} \ No newline at end of file diff --git a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/config/Neo4jSourceConfig.java b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/config/Neo4jSourceConfig.java new file mode 100644 index 00000000..93a4af74 --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/config/Neo4jSourceConfig.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.neo4j.config; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.rocketmq.connect.neo4j.helper.LabelTypeEnum; + +public class Neo4jSourceConfig extends Neo4jBaseConfig { + private String labelType; + private String label; + + public static final Set REQUEST_CONFIG = new HashSet() { + { + add(Neo4jConstants.NEO4J_HOST); + add(Neo4jConstants.NEO4J_PORT); + add(Neo4jConstants.NEO4J_USER); + add(Neo4jConstants.NEO4J_PASSWORD); + add(Neo4jConstants.NEO4J_TOPIC); + add(Neo4jConstants.NEO4J_DB); + add(Neo4jConstants.LABEL_TYPE); + } + }; + + public String getTaskName() { + if (LabelTypeEnum.node.name().equals(labelType) || LabelTypeEnum.relationship.name().equals(labelType)) { + return getNeo4jDataBase() + "_" + labelType + "_" + label; + } + return getNeo4jDataBase(); + } + + public String getLabelType() { + return labelType; + } + + public void setLabelType(String labelType) { + this.labelType = labelType; + } + + public String getLabel() { + return label; + } + + public void setLabel(String label) { + this.label = label; + } + +} \ No newline at end of file diff --git a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/helper/LabelTypeEnum.java b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/helper/LabelTypeEnum.java new file mode 100644 index 00000000..b8278d49 --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/helper/LabelTypeEnum.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.neo4j.helper; + +import java.util.Arrays; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public enum LabelTypeEnum { + node, + relationship, + ; + private static Map map = + Arrays.stream(values()).collect(Collectors.toMap(LabelTypeEnum::name, Function.identity())); + + public static LabelTypeEnum nameOf(String name) { + return map.get(name); + } +} diff --git a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/helper/MappingRule.java b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/helper/MappingRule.java new file mode 100644 index 00000000..fc1d9e53 --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/helper/MappingRule.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.neo4j.helper; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.rocketmq.connect.neo4j.config.Neo4jConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.rocketmq.connect.neo4j.config.Neo4jConstants.*; + +public class MappingRule { + private static final Logger LOGGER = LoggerFactory.getLogger(MappingRule.class); + private boolean hasRelation = false; + private boolean hasProperty = false; + private LabelTypeEnum type = LabelTypeEnum.node; + + /** + * property names for property key-value + */ + private List propertyNames = new ArrayList<>(); + + private List columns = new ArrayList<>(); + + private Set columnTypeSet = new HashSet<>(); + + public void validateConfig() { + if (type == LabelTypeEnum.node) { + if (!columnTypeSet.contains(ColumnType.primaryKey) || !columnTypeSet.contains(ColumnType.primaryLabel)) { + LOGGER.error("node config need ColumnType primaryKey and primaryLabel"); + throw new RuntimeException("node config need ColumnType primaryKey and primaryLabel"); + } + } else { + if (!columnTypeSet.contains(ColumnType.primaryKey) || !columnTypeSet.contains(ColumnType.primaryLabel) + || !columnTypeSet.contains(ColumnType.srcPrimaryKey) || !columnTypeSet + .contains(ColumnType.srcPrimaryLabel) || !columnTypeSet.contains(ColumnType.dstPrimaryKey) + || !columnTypeSet.contains(ColumnType.dstPrimaryLabel)) { + LOGGER.error("relationship config need ColumnType primaryKey and primaryLabel and srcPrimaryKey and " + + "srcPrimaryLabel and dstPrimaryKey and dstPrimaryLabel"); + throw new RuntimeException( + "relationship config need ColumnType primaryKey and primaryLabel and srcPrimaryKey and " + + "srcPrimaryLabel and dstPrimaryKey and dstPrimaryLabel"); + } + } + } + + void addColumn(ColumnType columnType, ValueType type, String name, String valueExtract) { + ColumnMappingRule rule = new ColumnMappingRule(); + rule.setColumnType(columnType); + rule.setName(name); + rule.setValueType(type); + rule.setValueExtract(valueExtract); + columnTypeSet.add(columnType); + + if (columnType == ColumnType.nodeProperty || columnType == Neo4jConstants.ColumnType.relationshipProperty) { + propertyNames.add(name); + hasProperty = true; + } + + boolean hasTo = columnType == ColumnType.dstPrimaryKey || columnType == ColumnType.dstPrimaryLabel; + boolean hasFrom = columnType == ColumnType.srcPrimaryKey || columnType == ColumnType.srcPrimaryLabel; + if (hasTo || hasFrom) { + hasRelation = true; + } + + columns.add(rule); + } + + void addJsonColumn(ColumnType columnType) { + ColumnMappingRule rule = new ColumnMappingRule(); + rule.setColumnType(columnType); + rule.setName("json"); + rule.setValueType(ValueType.STRING); + + if (!propertyNames.isEmpty()) { + throw new RuntimeException("JsonProperties should be only property"); + } + + columns.add(rule); + hasProperty = true; + } + + public boolean isHasRelation() { + return hasRelation; + } + + public void setHasRelation(boolean hasRelation) { + this.hasRelation = hasRelation; + } + + public boolean isHasProperty() { + return hasProperty; + } + + public void setHasProperty(boolean hasProperty) { + this.hasProperty = hasProperty; + } + + public LabelTypeEnum getType() { + return type; + } + + public void setType(LabelTypeEnum type) { + this.type = type; + } + + public List getPropertyNames() { + return propertyNames; + } + + public void setPropertyNames(List propertyNames) { + this.propertyNames = propertyNames; + } + + public List getColumns() { + return columns; + } + + public void setColumns(List columns) { + this.columns = columns; + } + + public static class ColumnMappingRule { + private String name = null; + + private ValueType valueType = null; + + private ColumnType columnType = null; + + private String valueExtract = null; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public ValueType getValueType() { + return valueType; + } + + public void setValueType(ValueType valueType) { + this.valueType = valueType; + } + + public ColumnType getColumnType() { + return columnType; + } + + public void setColumnType(ColumnType columnType) { + this.columnType = columnType; + } + + public String getValueExtract() { + return valueExtract; + } + + public void setValueExtract(String valueExtract) { + this.valueExtract = valueExtract; + } + } +} diff --git a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/helper/MappingRuleFactory.java b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/helper/MappingRuleFactory.java new file mode 100644 index 00000000..a2e205ac --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/helper/MappingRuleFactory.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.neo4j.helper; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; + +import org.apache.rocketmq.connect.neo4j.config.Neo4jBaseConfig; +import org.apache.rocketmq.connect.neo4j.config.Neo4jConstants; +import org.apache.rocketmq.connect.neo4j.config.Neo4jConstants.*; + +public class MappingRuleFactory { + private static final MappingRuleFactory instance = new MappingRuleFactory(); + + public static MappingRuleFactory getInstance() { + return instance; + } + + public MappingRule create(Neo4jBaseConfig config, LabelTypeEnum exportType) { + MappingRule rule = new MappingRule(); + + rule.setType(exportType); + String columnStr = config.getColumn(); + final JSONArray configObject = JSON.parseArray(columnStr); + final int size = configObject.size(); + for (int i = 0; i < size; i++) { + final JSONObject columnValue = configObject.getJSONObject(i); + final String columnName = columnValue.getString(Neo4jConstants.COLUMN_NAME); + String type = columnValue.getString(Neo4jConstants.VALUE_TYPE); + String cType = columnValue.getString(Neo4jConstants.COLUMN_TYPE); + String valueExtract = columnValue.getString(Neo4jConstants.VALUE_EXTRACT); + + ColumnType columnType; + try { + columnType = ColumnType.valueOf(cType); + } catch (NullPointerException | IllegalArgumentException e) { + throw new RuntimeException("columnType config error"); + } + + if (exportType == LabelTypeEnum.node) { + // only id/label/property column allow when node + if (columnType != ColumnType.primaryKey && columnType != ColumnType.primaryLabel + && columnType != ColumnType.nodeProperty && columnType != ColumnType.nodeJsonProperty) { + throw new RuntimeException("only id/label/property column allow when node"); + } + } else if (exportType == LabelTypeEnum.relationship) { + // relationship + if (columnType != ColumnType.primaryKey && columnType != ColumnType.primaryLabel + && columnType != ColumnType.srcPrimaryKey && columnType != ColumnType.srcPrimaryLabel + && columnType != ColumnType.dstPrimaryKey && columnType != ColumnType.dstPrimaryLabel + && columnType != ColumnType.relationshipProperty + && columnType != ColumnType.relationshipJsonProperty) { + throw new RuntimeException("relationship check"); + } + } + if (columnType == ColumnType.relationshipProperty || columnType == ColumnType.nodeProperty + || columnType == ColumnType.primaryKey || columnType == ColumnType.dstPrimaryKey + || columnType == ColumnType.srcPrimaryKey) { + + ValueType propType = ValueType.fromShortName(type); + + if (propType == null) { + throw new RuntimeException("UNSUPPORTED TYPE"); + } + rule.addColumn(columnType, propType, columnName, valueExtract); + } else if (columnType == ColumnType.nodeJsonProperty || columnType == ColumnType.relationshipJsonProperty) { + rule.addColumn(columnType, ValueType.STRING, columnName, valueExtract); + } else { + rule.addColumn(columnType, ValueType.STRING, columnName, valueExtract); + } + } + return rule; + } + + public static boolean isPrimitive(Object value) { + return value == null || value instanceof Boolean || value instanceof Number || value instanceof String; + } +} \ No newline at end of file diff --git a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/helper/Neo4jClient.java b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/helper/Neo4jClient.java new file mode 100644 index 00000000..a5a9e4f8 --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/helper/Neo4jClient.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.neo4j.helper; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.rocketmq.connect.neo4j.config.Neo4jBaseConfig; +import org.neo4j.driver.*; +import org.neo4j.driver.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Neo4jClient { + private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jClient.class); + + private Neo4jBaseConfig neo4jBaseConfig; + private Driver driver; + + public Neo4jClient(Neo4jBaseConfig config) { + neo4jBaseConfig = config; + createDriver(config); + } + + private Driver createDriver(Neo4jBaseConfig config) { + this.driver = GraphDatabase + .driver("bolt://" + config.getNeo4jHost() + ":" + config.getNeo4jPort() + "/" + config.getNeo4jDataBase(), + AuthTokens.basic(config.getNeo4jUser(), config.getNeo4jPassword())); + return this.driver; + } + + public void insert(String cql) { + try (Session session = driver.session()) { + session.writeTransaction(tx -> { + final Result result = tx.run(cql); + return null; + }); + } catch (Exception e) { + LOGGER.error(cql + " has error", e); + } + } + + public List query(String query) { + try (Session session = driver.session()) { + TransactionConfig txConfig = TransactionConfig.builder().withTimeout(Duration.ofSeconds(5)).build(); + final List recordList = session.readTransaction(tx -> { + List list = new ArrayList<>(); + final Result result = tx.run(query); + while (result.hasNext()) { + final Record record = result.next(); + list.add(record); + } + return list; + }, txConfig); + return recordList; + } catch (Exception e) { + LOGGER.error(query + " has error", e); + } + return Collections.emptyList(); + } + + public boolean ping() { + try (Session session = driver.session()) { + session.readTransaction(tx -> { + final Result result = + tx.run("use " + neo4jBaseConfig.getNeo4jDataBase() + " MATCH (a) RETURN a LIMIT 1"); + final List list = result.list(); + return "ok"; + }); + } catch (Exception e) { + LOGGER.error("unable to ping to neo4j server.", e); + return false; + } catch (Throwable throwable) { + LOGGER.error("unable to ping to neo4j server.", throwable); + return false; + } + return true; + } + + public List getAllLabels() { + try (Session session = driver.session()) { + final List labelList = session.readTransaction(tx -> { + final Result result = tx.run("use " + neo4jBaseConfig.getNeo4jDataBase() + " CALL db.labels()"); + return result.list().stream().map(record -> record.get("label").asString()) + .collect(Collectors.toList()); + }); + return labelList; + } catch (Exception e) { + LOGGER.error("CALL db.labels() has error", e); + } + return Collections.emptyList(); + } + + public List getAllType() { + try (Session session = driver.session()) { + final List labelList = session.readTransaction(tx -> { + final Result result = + tx.run("use " + neo4jBaseConfig.getNeo4jDataBase() + " CALL db.relationshipTypes()"); + return result.list().stream().map(record -> record.get("relationshipType").asString()) + .collect(Collectors.toList()); + }); + return labelList; + } catch (Exception e) { + LOGGER.error("CALL db.relationshipTypes() has error", e); + } + return Collections.emptyList(); + } +} \ No newline at end of file diff --git a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/helper/Neo4jElement.java b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/helper/Neo4jElement.java new file mode 100644 index 00000000..0c54bdc7 --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/helper/Neo4jElement.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.neo4j.helper; + +import java.util.HashMap; +import java.util.Map; + +public class Neo4jElement { + String primaryKey = null; + String primaryValue = null; + String label = null; + String toPrimaryKey = null; + String toPrimaryValue = null; + String fromPrimaryKey = null; + String fromPrimaryValue = null; + String toLabel = null; + String fromLabel = null; + + Map properties = new HashMap<>(); + + public Map getProperties() { + return properties; + } + + public void setProperties(Map properties) { + this.properties = properties; + } + + public Neo4jElement() { + } + + public Neo4jElement(String primaryKey, String primaryValue, String label) { + this.primaryKey = primaryKey; + this.primaryValue = primaryValue; + this.label = label; + } + + public String getLabel() { + return label; + } + + public void setLabel(String label) { + this.label = label; + } + + public String getToLabel() { + return toLabel; + } + + public void setToLabel(String toLabel) { + this.toLabel = toLabel; + } + + public String getFromLabel() { + return fromLabel; + } + + public void setFromLabel(String fromLabel) { + this.fromLabel = fromLabel; + } + + public String getPrimaryKey() { + return primaryKey; + } + + public void setPrimaryKey(String primaryKey) { + this.primaryKey = primaryKey; + } + + public String getPrimaryValue() { + return primaryValue; + } + + public void setPrimaryValue(String primaryValue) { + this.primaryValue = primaryValue; + } + + public String getToPrimaryKey() { + return toPrimaryKey; + } + + public void setToPrimaryKey(String toPrimaryKey) { + this.toPrimaryKey = toPrimaryKey; + } + + public String getToPrimaryValue() { + return toPrimaryValue; + } + + public void setToPrimaryValue(String toPrimaryValue) { + this.toPrimaryValue = toPrimaryValue; + } + + public String getFromPrimaryKey() { + return fromPrimaryKey; + } + + public void setFromPrimaryKey(String fromPrimaryKey) { + this.fromPrimaryKey = fromPrimaryKey; + } + + public String getFromPrimaryValue() { + return fromPrimaryValue; + } + + public void setFromPrimaryValue(String fromPrimaryValue) { + this.fromPrimaryValue = fromPrimaryValue; + } +} diff --git a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/helper/RecordConverter.java b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/helper/RecordConverter.java new file mode 100644 index 00000000..b0ce049d --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/helper/RecordConverter.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.connect.neo4j.helper; + +import java.lang.reflect.Array; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.*; +import java.util.stream.Collectors; + +import org.neo4j.driver.types.Node; +import org.neo4j.driver.types.Point; +import org.neo4j.driver.types.Relationship; + +import io.openmessaging.connector.api.data.*; + +public class RecordConverter { + + public static Map asNodeMap(Node node) { + final long id = node.id(); + final Iterable labels = node.labels(); + List labelList = new ArrayList<>(); + for (String label : labels) { + labelList.add(label); + } + final Map asMap = node.asMap(); + Map valueMap = new HashMap<>(asMap); + valueMap.put("", id); + valueMap.put("", labelsMerge(labelList)); + return valueMap; + } + + public static Map asRelationshipMap(Relationship relationship) { + final long startNodeId = relationship.startNodeId(); + final long endNodeId = relationship.endNodeId(); + final String type = relationship.type(); + final long id = relationship.id(); + + final Map asMap = relationship.asMap(); + Map valueMap = new HashMap<>(asMap); + valueMap.put("", id); + valueMap.put("", type); + valueMap.put("", startNodeId); + valueMap.put("", endNodeId); + return valueMap; + } + + public static Schema map2StructSchema(Map map) { + final SchemaBuilder structBuilder = SchemaBuilder.struct().optional(); + map.forEach((k, v) -> { + final Schema valueSchema = neo4jValueSchema(v); + if (valueSchema != null) { + structBuilder.field(k, valueSchema); + } + }); + if (structBuilder.fields().isEmpty()) { + return null; + } else { + return structBuilder.build(); + } + } + + public static Struct buildStruct(Schema schema, Map columnMap) { + Struct struct = new Struct(schema); + final List fields = schema.getFields(); + for (Field field : fields) { + struct.put(field, columnMap.get(field.getName())); + } + return struct; + } + + private static Schema neo4jValueSchema(Object value) { + if (value == null) { + return null; + } + if (value instanceof String) { + return SchemaBuilder.string().build(); + } + if (value instanceof Integer) { + return SchemaBuilder.int32().build(); + } + if (value instanceof Long) { + return SchemaBuilder.int64().build(); + } + if (value instanceof Float) { + return SchemaBuilder.float32().build(); + } + if (value instanceof Double) { + return SchemaBuilder.float64().build(); + } + if (value instanceof Boolean) { + return SchemaBuilder.bool().build(); + } + if (value instanceof LocalDateTime || value instanceof LocalDate || value instanceof Date) { + return SchemaBuilder.date().build(); + } + if (value instanceof Collection) { + Collection collection = (Collection)value; + if (collection.size() > 0) { + final Object v = collection.stream().findFirst().orElse(null); + Schema valueSchema = neo4jValueSchema(v); + return Optional.ofNullable(valueSchema).map(x -> SchemaBuilder.array(valueSchema).build()).orElse(null); + } else { + return null; + } + } + + if (value instanceof Array) { + Object[] array = (Object[])value; + if (array.length > 0) { + final Object v = array[0]; + Schema valueSchema = neo4jValueSchema(v); + return Optional.ofNullable(valueSchema).map(x -> SchemaBuilder.array(valueSchema).build()).orElse(null); + } else { + return null; + } + } + if (value instanceof Map) { + Map map = (Map)value; + if (map.isEmpty()) { + return SchemaBuilder.map(SchemaBuilder.string().build(), SchemaBuilder.string().optional().build()) + .optional().build(); + } else { + final Set valueTypes = + map.values().stream().map(i -> i.getClass().getName()).collect(Collectors.toSet()); + if (valueTypes.size() == 1) { + final Object firstV = map.values().stream().findFirst().orElse(null); + final Schema valueSchema = neo4jValueSchema(firstV); + return SchemaBuilder.map(SchemaBuilder.string().build(), valueSchema).build(); + } else { + map2StructSchema(map); + } + } + } + if (value instanceof Point) { + final SchemaBuilder structBuilder = SchemaBuilder.struct().optional(); + structBuilder.field("srid", SchemaBuilder.int32().build()); + structBuilder.field("x", SchemaBuilder.float64().build()); + structBuilder.field("y", SchemaBuilder.float64().build()); + structBuilder.field("z", SchemaBuilder.float64().build()); + return structBuilder.build(); + } + if (value instanceof Node) { + final Map nodeMap = asNodeMap((Node)value); + return map2StructSchema(nodeMap); + } + if (value instanceof Relationship) { + final Map relationshipMap = asRelationshipMap((Relationship)value); + return map2StructSchema(relationshipMap); + } else { + return SchemaBuilder.string().build(); + } + } + + private static String labelsMerge(List labelList) { + if (labelList == null || labelList.isEmpty()) { + return ""; + } + StringBuilder stringBuilder = new StringBuilder(); + for (String label : labelList) { + stringBuilder.append(":").append(label); + } + return stringBuilder.substring(1); + } +} \ No newline at end of file diff --git a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/helper/ValueType.java b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/helper/ValueType.java new file mode 100644 index 00000000..b4ab5cea --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/helper/ValueType.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.neo4j.helper; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; + +import com.alibaba.fastjson.JSONObject; + +public enum ValueType { + /** + * transfer gdb element object value to DataX Column data + *

+ * int, long -> LongColumn + * float, double -> DoubleColumn + * bool -> BooleanColumn + * string -> StringColumn + */ + INT(Integer.class, "int", ValueTypeHolder::longColumnMapper), + INTEGER(Integer.class, "integer", ValueTypeHolder::longColumnMapper), + LONG(Long.class, "long", ValueTypeHolder::longColumnMapper), + DOUBLE(Double.class, "double", ValueTypeHolder::doubleColumnMapper), + FLOAT(Float.class, "float", ValueTypeHolder::doubleColumnMapper), + BOOLEAN(Boolean.class, "boolean", ValueTypeHolder::boolColumnMapper), + STRING(String.class, "string", ValueTypeHolder::stringColumnMapper), + ; + + private Class type = null; + private String shortName = null; + private Function columnFunc = null; + + ValueType(Class type, String name, Function columnFunc) { + this.type = type; + this.shortName = name; + this.columnFunc = columnFunc; + + ValueTypeHolder.shortName2type.put(shortName, this); + } + + public static ValueType fromShortName(String name) { + return ValueTypeHolder.shortName2type.get(name); + } + + public Object applyObject(Object value) { + if (value == null) { + return null; + } + return columnFunc.apply(value); + } + + private static class ValueTypeHolder { + private static Map shortName2type = new HashMap<>(); + + private static Long longColumnMapper(Object o) { + long v; + if (o instanceof Integer) { + v = (int)o; + } else if (o instanceof Long) { + v = (long)o; + } else if (o instanceof String) { + v = Long.valueOf((String)o); + } else { + throw new RuntimeException("Failed to cast " + o.getClass() + " to Long"); + } + + return v; + } + + private static Double doubleColumnMapper(Object o) { + double v; + if (o instanceof Integer) { + v = (double)(int)o; + } else if (o instanceof Long) { + v = (double)(long)o; + } else if (o instanceof Float) { + v = (double)(float)o; + } else if (o instanceof Double) { + v = (double)o; + } else if (o instanceof String) { + v = Double.valueOf((String)o); + } else { + throw new RuntimeException("Failed to cast " + o.getClass() + " to Double"); + } + + return v; + } + + private static Boolean boolColumnMapper(Object o) { + boolean v; + if (o instanceof Integer) { + v = ((int)o != 0); + } else if (o instanceof Long) { + v = ((long)o != 0); + } else if (o instanceof Boolean) { + v = (boolean)o; + } else if (o instanceof String) { + v = Boolean.valueOf((String)o); + } else { + throw new RuntimeException("Failed to cast " + o.getClass() + " to Boolean"); + } + + return v; + } + + private static String stringColumnMapper(Object o) { + if (o instanceof String) { + return (String)o; + } else if (MappingRuleFactory.isPrimitive(o)) { + return String.valueOf(o); + } else { + return JSONObject.toJSONString(o); + } + } + } + +} diff --git a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/sink/Neo4jSinkConnector.java b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/sink/Neo4jSinkConnector.java new file mode 100644 index 00000000..ecd9a410 --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/sink/Neo4jSinkConnector.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.neo4j.sink; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.rocketmq.connect.neo4j.config.Neo4jSinkConfig; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.Task; +import io.openmessaging.connector.api.component.task.sink.SinkConnector; + +public class Neo4jSinkConnector extends SinkConnector { + private KeyValue keyValue; + + @Override + public List taskConfigs(int maxTasks) { + List configs = new ArrayList<>(); + for (int i = 0; i < maxTasks; i++) { + configs.add(this.keyValue); + } + return configs; + } + + @Override + public Class taskClass() { + return Neo4jSinkTask.class; + } + + @Override + public void start(KeyValue keyValue) { + for (String requestKey : Neo4jSinkConfig.SINK_REQUEST_CONFIG) { + if (!keyValue.containsKey(requestKey)) { + throw new RuntimeException("Request config key: " + requestKey); + } + } + this.keyValue = keyValue; + } + + @Override + public void stop() { + this.keyValue = null; + } +} \ No newline at end of file diff --git a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/sink/Neo4jSinkTask.java b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/sink/Neo4jSinkTask.java new file mode 100644 index 00000000..f41d7fea --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/sink/Neo4jSinkTask.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.neo4j.sink; + +import java.util.List; + +import org.apache.rocketmq.connect.neo4j.config.Neo4jSinkConfig; +import org.apache.rocketmq.connect.neo4j.helper.LabelTypeEnum; +import org.apache.rocketmq.connect.neo4j.helper.Neo4jClient; +import org.apache.rocketmq.connect.neo4j.sink.write.NodeWriteProcessor; +import org.apache.rocketmq.connect.neo4j.sink.write.Processor; +import org.apache.rocketmq.connect.neo4j.sink.write.RelationshipWriteProcessor; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.sink.SinkTask; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.errors.ConnectException; + +public class Neo4jSinkTask extends SinkTask { + private Neo4jSinkConfig config; + private Neo4jClient client; + private Processor processor; + + @Override + public void put(List sinkRecords) throws ConnectException { + if (sinkRecords == null || sinkRecords.size() < 1) { + return; + } + for (ConnectRecord connectRecord : sinkRecords) { + processor.write(connectRecord); + } + } + + @Override + public void start(KeyValue keyValue) { + this.config = new Neo4jSinkConfig(); + this.config.load(keyValue); + LabelTypeEnum labelTypeEnum = LabelTypeEnum.nameOf(config.getLabelType()); + if (labelTypeEnum == null) { + throw new RuntimeException("labelType only support node or relationship"); + } + this.client = new Neo4jClient(this.config); + if (!client.ping()) { + throw new RuntimeException("Cannot connect to neo4j server!"); + } + if (LabelTypeEnum.node == labelTypeEnum) { + processor = new NodeWriteProcessor(config, client); + } else { + processor = new RelationshipWriteProcessor(config, client); + } + } + + @Override + public void stop() { + this.client = null; + } +} \ No newline at end of file diff --git a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/sink/mapping/WriteMapper.java b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/sink/mapping/WriteMapper.java new file mode 100644 index 00000000..eaf6b1d4 --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/sink/mapping/WriteMapper.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.neo4j.sink.mapping; + +import java.util.function.BiConsumer; + +import org.apache.rocketmq.connect.neo4j.helper.Neo4jElement; +import org.apache.rocketmq.connect.neo4j.helper.MappingRule; +import org.apache.rocketmq.connect.neo4j.helper.ValueType; + +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.data.Field; +import io.openmessaging.connector.api.data.Schema; +import io.openmessaging.connector.api.data.Struct; + +public class WriteMapper { + public static BiConsumer getMapper(MappingRule rule) { + return (record, neo4jElement) -> { + final Schema recordSchema = record.getSchema(); + final Struct struct = (Struct)record.getData(); + + rule.getColumns().forEach(columnMappingRule -> { + Object value = null; + ValueType type = columnMappingRule.getValueType(); + String name = columnMappingRule.getName(); + boolean extractFixedValue = false; + final String valueExtract = columnMappingRule.getValueExtract(); + if (valueExtract.startsWith("#{") && valueExtract.endsWith("}")) { + final String val = valueExtract.substring(2, valueExtract.length() - 1); + final Field field = recordSchema.getField(val); + if (field != null) { + value = struct.get(field); + } + } else { + value = valueExtract; + extractFixedValue = true; + } + switch (columnMappingRule.getColumnType()) { + case primaryKey: + neo4jElement.setPrimaryKey(name); + neo4jElement.setPrimaryValue(String.valueOf(value)); + break; + case primaryLabel: + neo4jElement.setLabel(String.valueOf(value)); + break; + case dstPrimaryKey: + neo4jElement.setToPrimaryKey(name); + neo4jElement.setToPrimaryValue(String.valueOf(value)); + break; + case srcPrimaryKey: + neo4jElement.setFromPrimaryKey(name); + neo4jElement.setFromPrimaryValue(String.valueOf(value)); + break; + case srcPrimaryLabel: + neo4jElement.setFromLabel(String.valueOf(value)); + break; + case dstPrimaryLabel: + neo4jElement.setToLabel(String.valueOf(value)); + break; + case nodeProperty: + case relationshipProperty: + case nodeJsonProperty: + case relationshipJsonProperty: + if (value != null) { + neo4jElement.getProperties().put(name, type.applyObject(value)); + } + default: + break; + } + }); + }; + } +} \ No newline at end of file diff --git a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/sink/write/NodeWriteProcessor.java b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/sink/write/NodeWriteProcessor.java new file mode 100644 index 00000000..13d7fdec --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/sink/write/NodeWriteProcessor.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.neo4j.sink.write; + +import java.util.Map; + +import org.apache.rocketmq.connect.neo4j.config.Neo4jSinkConfig; +import org.apache.rocketmq.connect.neo4j.helper.Neo4jClient; +import org.apache.rocketmq.connect.neo4j.helper.Neo4jElement; + +import io.openmessaging.connector.api.data.*; + +public class NodeWriteProcessor extends WriteProcessor { + private Neo4jSinkConfig neo4jSinkConfig; + + public NodeWriteProcessor(Neo4jSinkConfig config, Neo4jClient neo4jClient) { + super(config, neo4jClient); + this.neo4jSinkConfig = config; + } + + //MERGE (keanu:Person {name: 'Keanu Reeves'}) + //ON CREATE + // SET keanu.created = timestamp() + //ON MATCH + // SET keanu.lastSeen = timestamp() + //RETURN keanu.name, keanu.created, keanu.lastSeen + protected String buildCql(Neo4jElement neo4jElement, Schema schema) { + StringBuilder cql = new StringBuilder(); + cql.append("use ").append(neo4jSinkConfig.getNeo4jDataBase()).append(" "); + cql.append("merge (n:").append(neo4jElement.getLabel()).append("{").append(neo4jElement.getPrimaryKey()) + .append(":").append(neo4jElement.getPrimaryValue()).append("})"); + if (!neo4jElement.getProperties().isEmpty()) { + cql.append(" on create set ").append(setProperty(neo4jElement.getProperties(), schema)); + cql.append(" on match set ").append(setProperty(neo4jElement.getProperties(), schema)); + } + return cql.toString(); + } + + protected String setProperty(Map property, Schema recordSchema) { + StringBuilder cql = new StringBuilder(); + for (String key : property.keySet()) { + final Object o = property.get(key); + final Field field = recordSchema.getField(key); + if (field.getSchema().getFieldType() == FieldType.STRING) { + cql.append("n.").append(key).append(" = ").append(setStringValue((String)o)); + } else { + cql.append("n.").append(key).append(" = ").append(o); + } + cql.append(","); + } + return cql.substring(0, cql.length() - 1); + } + + protected String setStringValue(String v) { + return "'" + v + "'"; + } +} \ No newline at end of file diff --git a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/sink/write/Processor.java b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/sink/write/Processor.java new file mode 100644 index 00000000..c56c7104 --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/sink/write/Processor.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.connect.neo4j.sink.write; + +import io.openmessaging.connector.api.data.ConnectRecord; + +public interface Processor { + void write(ConnectRecord record); +} diff --git a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/sink/write/RelationshipWriteProcessor.java b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/sink/write/RelationshipWriteProcessor.java new file mode 100644 index 00000000..5e6e5b31 --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/sink/write/RelationshipWriteProcessor.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.neo4j.sink.write; + +import java.util.Map; + +import org.apache.rocketmq.connect.neo4j.config.Neo4jSinkConfig; +import org.apache.rocketmq.connect.neo4j.helper.Neo4jClient; +import org.apache.rocketmq.connect.neo4j.helper.Neo4jElement; + +import io.openmessaging.connector.api.data.Field; +import io.openmessaging.connector.api.data.FieldType; +import io.openmessaging.connector.api.data.Schema; + +public class RelationshipWriteProcessor extends WriteProcessor { + private Neo4jSinkConfig neo4jSinkConfig; + + public RelationshipWriteProcessor(Neo4jSinkConfig config, Neo4jClient neo4jClient) { + super(config, neo4jClient); + this.neo4jSinkConfig = config; + } + + protected String buildCql(Neo4jElement neo4jElement, Schema schema) { + StringBuilder cql = new StringBuilder(); + cql.append("use ").append(neo4jSinkConfig.getNeo4jDataBase()).append(" "); + cql.append("merge (source:").append(neo4jElement.getFromLabel()).append("{") + .append(neo4jElement.getFromPrimaryKey()).append(":").append(neo4jElement.getFromPrimaryValue()) + .append("}) "); + cql.append("merge (target:").append(neo4jElement.getToLabel()).append("{") + .append(neo4jElement.getToPrimaryKey()).append(":").append(neo4jElement.getToPrimaryValue()).append("}) "); + cql.append("merge (source)-[:").append(neo4jElement.getLabel()).append("{") + .append(addProperty(schema, neo4jElement)).append("}]").append("->(target)"); + return cql.toString(); + } + + protected String addProperty(Schema schema, Neo4jElement neo4jElement) { + StringBuilder cql = new StringBuilder(); + cql.append(setValue(neo4jElement.getPrimaryKey(), neo4jElement.getPrimaryValue(), schema)).append(","); + final Map properties = neo4jElement.getProperties(); + if (!properties.isEmpty()) { + for (String key : properties.keySet()) { + final Object o = properties.get(key); + cql.append(setValue(key, o, schema)); + cql.append(","); + } + } + return cql.substring(0, cql.length() - 1); + } + + protected String setValue(String key, Object value, Schema schema) { + final Field field = schema.getField(key); + if (field == null) { + return key + ":" + value; + } + if (field.getSchema().getFieldType() == FieldType.STRING) { + return key + ":'" + value + "'"; + } else { + return key + ":" + value; + } + } +} \ No newline at end of file diff --git a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/sink/write/WriteProcessor.java b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/sink/write/WriteProcessor.java new file mode 100644 index 00000000..1014e4b3 --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/sink/write/WriteProcessor.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.neo4j.sink.write; + +import org.apache.rocketmq.connect.neo4j.config.Neo4jSinkConfig; +import org.apache.rocketmq.connect.neo4j.helper.LabelTypeEnum; +import org.apache.rocketmq.connect.neo4j.helper.Neo4jClient; +import org.apache.rocketmq.connect.neo4j.helper.Neo4jElement; +import org.apache.rocketmq.connect.neo4j.sink.mapping.WriteMapper; +import org.apache.rocketmq.connect.neo4j.helper.MappingRule; +import org.apache.rocketmq.connect.neo4j.helper.MappingRuleFactory; + +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.data.Schema; + +public abstract class WriteProcessor implements Processor { + private Neo4jSinkConfig neo4jSinkConfig; + private Neo4jClient neo4jClient; + private MappingRule mappingRule; + + public WriteProcessor(Neo4jSinkConfig config, Neo4jClient neo4jClient) { + this.neo4jSinkConfig = config; + this.neo4jClient = neo4jClient; + final LabelTypeEnum labelTypeEnum = LabelTypeEnum.nameOf(neo4jSinkConfig.getLabelType()); + if (labelTypeEnum == null) { + throw new RuntimeException("label type only support node or relationship"); + } + mappingRule = MappingRuleFactory.getInstance().create(neo4jSinkConfig, labelTypeEnum); + mappingRule.validateConfig(); + } + + public void write(ConnectRecord record) { + final Schema recordSchema = record.getSchema(); + Neo4jElement neo4jElement = new Neo4jElement(); + WriteMapper.getMapper(mappingRule).accept(record, neo4jElement); + String cql = buildCql(neo4jElement, recordSchema); + neo4jClient.insert(cql); + } + + protected abstract String buildCql(Neo4jElement neo4jElement, Schema schema); + +} \ No newline at end of file diff --git a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/Neo4jSourceConnector.java b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/Neo4jSourceConnector.java new file mode 100644 index 00000000..f0597aa2 --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/Neo4jSourceConnector.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.neo4j.source; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.rocketmq.connect.neo4j.config.Neo4jConstants; +import org.apache.rocketmq.connect.neo4j.config.Neo4jSourceConfig; +import org.apache.rocketmq.connect.neo4j.helper.LabelTypeEnum; +import org.apache.rocketmq.connect.neo4j.helper.Neo4jClient; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.Task; +import io.openmessaging.connector.api.component.task.source.SourceConnector; +import io.openmessaging.internal.DefaultKeyValue; + +public class Neo4jSourceConnector extends SourceConnector { + private KeyValue keyValue; + private Neo4jClient neo4jClient; + + @Override + public List taskConfigs(int maxTasks) { + final String type = keyValue.getString(Neo4jConstants.LABEL_TYPE); + final String labels = keyValue.getString(Neo4jConstants.LABELS); + List labelList = null; + if (labels != null && !labels.equals("")) { + final String[] split = labels.split(","); + labelList = Arrays.asList(split); + } + if (labelList == null) { + if (LabelTypeEnum.node.name().equals(type)) { + labelList = neo4jClient.getAllLabels(); + } else { + labelList = neo4jClient.getAllType(); + } + } + // 按标签分配任务 + List configs = new ArrayList<>(); + for (int i = 0; i < labelList.size(); i++) { + final String label = labelList.get(i); + KeyValue config = new DefaultKeyValue(); + for (String key : keyValue.keySet()) { + config.put(key, keyValue.getString(key)); + } + config.put(Neo4jConstants.LABEL, label); + configs.add(config); + } + return configs; + } + + @Override + public Class taskClass() { + return Neo4jSourceTask.class; + } + + @Override + public void start(KeyValue keyValue) { + for (String requestKey : Neo4jSourceConfig.REQUEST_CONFIG) { + if (!keyValue.containsKey(requestKey)) { + throw new RuntimeException("Request config key: " + requestKey); + } + } + Neo4jSourceConfig config = new Neo4jSourceConfig(); + config.load(keyValue); + final LabelTypeEnum labelTypeEnum = LabelTypeEnum.nameOf(config.getLabelType()); + if (labelTypeEnum == null) { + throw new RuntimeException("labelType is only support node or relationship!"); + } + neo4jClient = new Neo4jClient(config); + if (!neo4jClient.ping()) { + throw new RuntimeException("Cannot connect to neo4j server!"); + } + this.keyValue = keyValue; + } + + @Override + public void stop() { + this.keyValue = null; + } +} \ No newline at end of file diff --git a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/Neo4jSourceTask.java b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/Neo4jSourceTask.java new file mode 100644 index 00000000..c5fa6d54 --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/Neo4jSourceTask.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.neo4j.source; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.rocketmq.connect.neo4j.config.Neo4jConstants; +import org.apache.rocketmq.connect.neo4j.config.Neo4jSourceConfig; +import org.apache.rocketmq.connect.neo4j.helper.Neo4jClient; +import org.apache.rocketmq.connect.neo4j.source.query.CqlQueryProcessor; +import org.apache.rocketmq.connect.neo4j.source.query.QueryRegistrar; + +import org.neo4j.driver.Record; +import org.neo4j.driver.util.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.source.SourceTask; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.data.RecordOffset; +import io.openmessaging.connector.api.data.RecordPartition; +import io.openmessaging.connector.api.data.Struct; +import io.openmessaging.internal.DefaultKeyValue; + +public class Neo4jSourceTask extends SourceTask { + private static final Logger log = LoggerFactory.getLogger(Neo4jSourceTask.class); + + private Neo4jSourceConfig config; + + private Neo4jClient client; + + @Override + public List poll() { + List res = new ArrayList<>(); + long offset = readRecordOffset(); + String cql = buildCql(offset); + try { + final List recordList = client.query(cql); + for (Record record : recordList) { + final ConnectRecord connectRecord = neo4jRecord2ConnectRecord(record); + res.add(connectRecord); + } + } catch (Exception e) { + log.error(String.format("Fail to poll data from neo4j! offset=%d", offset)); + } + return res; + } + + @Override + public void start(KeyValue keyValue) { + this.config = new Neo4jSourceConfig(); + this.config.load(keyValue); + this.client = new Neo4jClient(this.config); + if (!client.ping()) { + throw new RuntimeException("Cannot connect to neo4j server!"); + } + QueryRegistrar.register(config); + } + + @Override + public void stop() { + this.client = null; + } + + // build cypher query + private String buildCql(long offset) { + final String queryType = config.getLabelType(); + final CqlQueryProcessor cqlQueryProcessor = QueryRegistrar.querySqlBuilder(queryType); + return cqlQueryProcessor.buildCql(offset); + } + + private long readRecordOffset() { + final RecordOffset positionInfo = + this.sourceTaskContext.offsetStorageReader().readOffset(buildRecordPartition(config.getNeo4jDataBase())); + if (positionInfo == null) { + return 0; + } + Object offset = positionInfo.getOffset().get(config.getTaskName() + "_" + Neo4jConstants.NEO4J_OFFSET); + return offset == null ? 0 : Long.parseLong(offset.toString()); + } + + private ConnectRecord neo4jRecord2ConnectRecord(Record record) { + final String queryType = config.getLabelType(); + final CqlQueryProcessor cqlQueryProcessor = QueryRegistrar.querySqlBuilder(queryType); + final Pair structPair = cqlQueryProcessor.buildStruct(record); + final Long id = structPair.key(); + final Struct struct = structPair.value(); + final ConnectRecord connectRecord = + new ConnectRecord(buildRecordPartition(config.getTaskName()), buildRecordOffset(id), + System.currentTimeMillis(), struct.getSchema(), struct); + + connectRecord.setExtensions(this.buildExtensions()); + return connectRecord; + } + + private RecordPartition buildRecordPartition(String partitionValue) { + Map partitionMap = new HashMap<>(); + partitionMap.put(Neo4jConstants.NEO4J_PARTITION, partitionValue); + return new RecordPartition(partitionMap); + } + + private KeyValue buildExtensions() { + KeyValue keyValue = new DefaultKeyValue(); + String topicName = config.getTopic(); + if (topicName == null || topicName.equals("")) { + String connectorName = this.sourceTaskContext.getConnectorName(); + topicName = config.getTaskName() + "_" + connectorName; + } + keyValue.put(Neo4jConstants.NEO4J_TOPIC, topicName); + return keyValue; + } + + private RecordOffset buildRecordOffset(long offset) { + Map offsetMap = new HashMap<>(); + offsetMap.put(config.getTaskName() + "_" + Neo4jConstants.NEO4J_OFFSET, offset); + return new RecordOffset(offsetMap); + } +} \ No newline at end of file diff --git a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/Neo4jSourceTaskTest.java b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/Neo4jSourceTaskTest.java new file mode 100644 index 00000000..66d2f26c --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/Neo4jSourceTaskTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.neo4j.source; + +import org.apache.rocketmq.connect.neo4j.config.Neo4jBaseConfig; +import org.apache.rocketmq.connect.neo4j.config.Neo4jConstants; +import org.apache.rocketmq.connect.neo4j.config.Neo4jSourceConfig; +import org.apache.rocketmq.connect.neo4j.helper.Neo4jClient; + +import io.openmessaging.KeyValue; +import io.openmessaging.internal.DefaultKeyValue; + +public class Neo4jSourceTaskTest { + private static final String host= "localhost"; + private static final Integer port = 7687; + private static final String db = "test"; + private static final String user = "test"; + private static final String password = "root123456"; + + public void testClient() { + Neo4jBaseConfig neo4jBaseConfig = new Neo4jSourceConfig(); + KeyValue config = new DefaultKeyValue(); + config.put(Neo4jConstants.NEO4J_HOST, host); + config.put(Neo4jConstants.NEO4J_PORT, port); + config.put(Neo4jConstants.NEO4J_USER, user); + config.put(Neo4jConstants.NEO4J_PASSWORD, password); + config.put(Neo4jConstants.NEO4J_DB, db); + neo4jBaseConfig.load(config); + Neo4jClient client = new Neo4jClient(neo4jBaseConfig); + final boolean ping = client.ping(); + System.out.println(ping); + } + + public static void main(String[] args) { + Neo4jSourceTaskTest neo4jSourceTaskTest = new Neo4jSourceTaskTest(); + neo4jSourceTaskTest.testClient(); + } +} \ No newline at end of file diff --git a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/mapping/ReadMapper.java b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/mapping/ReadMapper.java new file mode 100644 index 00000000..f47b0ccc --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/mapping/ReadMapper.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.connect.neo4j.source.mapping; + +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Function; + +import org.apache.rocketmq.connect.neo4j.helper.LabelTypeEnum; +import org.apache.rocketmq.connect.neo4j.helper.MappingRule; +import org.apache.rocketmq.connect.neo4j.helper.ValueType; + +public class ReadMapper { + + public static BiConsumer, Map> getMapper(MappingRule rule) { + return (neo4jElement, record) -> rule.getColumns().forEach(columnMappingRule -> { + Object value = null; + ValueType type = columnMappingRule.getValueType(); + String name = columnMappingRule.getName(); + final String valueExtract = columnMappingRule.getValueExtract(); + + switch (columnMappingRule.getColumnType()) { + case dstPrimaryKey: + final Map targetMap = (Map)neo4jElement.get(""); + value = extractValue(valueExtract, "", targetMap::get, type); + record.put(name, value); + break; + case srcPrimaryKey: + final Map sourceMap = (Map)neo4jElement.get(""); + value = extractValue(valueExtract, "", sourceMap::get, type); + record.put(name, value); + break; + case primaryKey: + final LabelTypeEnum labelTypeEnum = rule.getType(); + if (LabelTypeEnum.node == labelTypeEnum) { + value = extractValue(valueExtract, "", neo4jElement::get, type); + } else { + value = extractValue(valueExtract, "", neo4jElement::get, type); + } + + record.put(name, value); + break; + case primaryLabel: + final LabelTypeEnum exportType = rule.getType(); + if (LabelTypeEnum.node == exportType) { + value = neo4jElement.get(""); + } else { + value = neo4jElement.get(""); + } + record.put(name, value); + break; + case dstPrimaryLabel: + final Map dstTargetMap = (Map)neo4jElement.get(""); + value = extractValue(valueExtract, "", dstTargetMap::get, type); + record.put(name, value); + break; + case srcPrimaryLabel: + final Map srcSourceMap = (Map)neo4jElement.get(""); + value = extractValue(valueExtract, "", srcSourceMap::get, type); + record.put(name, value); + break; + case nodeProperty: + case relationshipProperty: + value = extractValue(valueExtract, name, neo4jElement::get, type); + record.put(name, value); + break; + case relationshipJsonProperty: + case nodeJsonProperty: + value = extractValue(valueExtract, name, neo4jElement::get, type); + record.put(name, type.applyObject(value)); + break; + default: + break; + } + }); + } + + private static Object extractValue(String valueExtract, String defaultExtract, Function function, + ValueType valueType) { + if (valueExtract == null) { + return function.apply(defaultExtract); + } else { + if (valueExtract.startsWith("#{") && valueExtract.endsWith("}")) { + final String val = valueExtract.substring(2, valueExtract.length() - 1); + return function.apply(val); + } else { + return valueType.applyObject(valueExtract); + } + } + } + +} diff --git a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/query/CqlQueryProcessor.java b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/query/CqlQueryProcessor.java new file mode 100644 index 00000000..e2105697 --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/query/CqlQueryProcessor.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.neo4j.source.query; + +import org.neo4j.driver.Record; +import org.neo4j.driver.util.Pair; + +import io.openmessaging.connector.api.data.Struct; + +public interface CqlQueryProcessor { + String buildCql(long offset); + + Pair buildStruct(Record record); + + String queryType(); +} diff --git a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/query/NodeQueryStrategy.java b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/query/NodeQueryStrategy.java new file mode 100644 index 00000000..207d4f39 --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/query/NodeQueryStrategy.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.neo4j.source.query; + +import java.util.*; + +import org.apache.rocketmq.connect.neo4j.config.Neo4jConstants; +import org.apache.rocketmq.connect.neo4j.config.Neo4jSourceConfig; +import org.apache.rocketmq.connect.neo4j.helper.LabelTypeEnum; +import org.apache.rocketmq.connect.neo4j.helper.RecordConverter; +import org.apache.rocketmq.connect.neo4j.source.mapping.ReadMapper; +import org.apache.rocketmq.connect.neo4j.helper.MappingRule; +import org.apache.rocketmq.connect.neo4j.helper.MappingRuleFactory; +import org.neo4j.driver.Record; +import org.neo4j.driver.internal.InternalPair; +import org.neo4j.driver.types.Node; +import org.neo4j.driver.util.Pair; + +import io.openmessaging.connector.api.data.Schema; +import io.openmessaging.connector.api.data.Struct; + +// match (n:Person) where id(n) > 10000 return n order by id(n) limit 1000; +// match (n:Person) return n order by id(n) skip 10000 limit 1000; +// Performance optimization of the above two query statements with the same function + +public class NodeQueryStrategy implements CqlQueryProcessor { + private final Neo4jSourceConfig neo4jSourceConfig; + private final MappingRule mappingRule; + + public NodeQueryStrategy(Neo4jSourceConfig neo4jSourceConfig) { + this.neo4jSourceConfig = neo4jSourceConfig; + final LabelTypeEnum labelTypeEnum = LabelTypeEnum.nameOf(neo4jSourceConfig.getLabelType()); + this.mappingRule = MappingRuleFactory.getInstance().create(neo4jSourceConfig, labelTypeEnum); + mappingRule.validateConfig(); + } + + @Override + public String buildCql(long offset) { + StringBuilder cql = new StringBuilder(); + cql.append("use ").append(neo4jSourceConfig.getNeo4jDataBase()).append(" "); + cql.append("match (n:").append(neo4jSourceConfig.getLabel()).append(")"); + cql.append("where id(n) >=").append(offset); + cql.append(" return n ").append(" limit ").append(Neo4jConstants.MAX_NUMBER_SEND_CONNECT_RECORD_EACH_TIME); + return cql.toString(); + } + + public Pair buildStruct(Record record) { + final Map nMap = record.asMap(); + final Object value = nMap.get("n"); + if (!(value instanceof Node)) { + throw new RuntimeException("cypher query Inconsistent"); + } + Node node = (Node)value; + final Iterable labels = node.labels(); + final Map nodeMap = RecordConverter.asNodeMap(node); + Map columnMap = new LinkedHashMap<>(); + ReadMapper.getMapper(mappingRule).accept(nodeMap, columnMap); + final Schema valueSchema = RecordConverter.map2StructSchema(columnMap); + StringBuilder labelName = new StringBuilder(); + final Iterator iterator = labels.iterator(); + while (iterator.hasNext()) { + labelName.append("_").append(iterator.next()); + } + valueSchema.setName(labelName.substring(1)); + Struct struct = RecordConverter.buildStruct(valueSchema, columnMap); + return InternalPair.of(node.id(), struct); + } + + @Override + public String queryType() { + return LabelTypeEnum.node.name(); + } +} \ No newline at end of file diff --git a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/query/QueryRegistrar.java b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/query/QueryRegistrar.java new file mode 100644 index 00000000..30315a0a --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/query/QueryRegistrar.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.neo4j.source.query; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.rocketmq.connect.neo4j.config.Neo4jSourceConfig; + +public class QueryRegistrar { + private static final Map registrar = new HashMap<>(); + + public static void register(Neo4jSourceConfig neo4jSourceConfig) { + doRegister(new NodeQueryStrategy(neo4jSourceConfig)); + doRegister(new RelationshipQueryStrategy(neo4jSourceConfig)); + } + + static void doRegister(CqlQueryProcessor cqlQueryProcessor) { + registrar.put(cqlQueryProcessor.queryType(), cqlQueryProcessor); + } + + public static CqlQueryProcessor querySqlBuilder(String queryType) { + return registrar.get(queryType); + } +} \ No newline at end of file diff --git a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/query/RelationshipQueryStrategy.java b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/query/RelationshipQueryStrategy.java new file mode 100644 index 00000000..03f25ac1 --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/query/RelationshipQueryStrategy.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.connect.neo4j.source.query; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.rocketmq.connect.neo4j.config.Neo4jConstants; +import org.apache.rocketmq.connect.neo4j.config.Neo4jSourceConfig; +import org.apache.rocketmq.connect.neo4j.helper.LabelTypeEnum; +import org.apache.rocketmq.connect.neo4j.helper.RecordConverter; +import org.apache.rocketmq.connect.neo4j.source.mapping.ReadMapper; +import org.apache.rocketmq.connect.neo4j.helper.MappingRule; +import org.apache.rocketmq.connect.neo4j.helper.MappingRuleFactory; +import org.neo4j.driver.Record; +import org.neo4j.driver.internal.InternalPair; +import org.neo4j.driver.types.Node; +import org.neo4j.driver.types.Relationship; +import org.neo4j.driver.util.Pair; + +import io.openmessaging.connector.api.data.Schema; +import io.openmessaging.connector.api.data.Struct; + +public class RelationshipQueryStrategy implements CqlQueryProcessor { + private Neo4jSourceConfig neo4jSourceConfig; + private MappingRule mappingRule; + + public RelationshipQueryStrategy(Neo4jSourceConfig config) { + this.neo4jSourceConfig = config; + final LabelTypeEnum labelTypeEnum = LabelTypeEnum.nameOf(neo4jSourceConfig.getLabelType()); + this.mappingRule = MappingRuleFactory.getInstance().create(neo4jSourceConfig, labelTypeEnum); + mappingRule.validateConfig(); + } + + @Override + public String buildCql(long offset) { + StringBuilder cql = new StringBuilder(); + cql.append("use ").append(neo4jSourceConfig.getNeo4jDataBase()).append(" "); + cql.append("match (source)-[r:").append(neo4jSourceConfig.getLabel()).append("]->(target)"); + cql.append(" where id(r) >=").append(offset); + cql.append(" return source , r, target ").append(" limit ") + .append(Neo4jConstants.MAX_NUMBER_SEND_CONNECT_RECORD_EACH_TIME); + return cql.toString(); + } + + @Override + public Pair buildStruct(Record record) { + final Map recordMap = record.asMap(); + final Object value = recordMap.get("r"); + if (!(value instanceof Relationship)) { + throw new RuntimeException("cypher query Inconsistent"); + } + final Node source = (Node)recordMap.get("source"); + final Map sourceMap = RecordConverter.asNodeMap(source); + final Node target = (Node)recordMap.get("target"); + final Map targetMap = RecordConverter.asNodeMap(target); + Relationship relationship = (Relationship)value; + final Map relationshipMap = RecordConverter.asRelationshipMap(relationship); + relationshipMap.put("", sourceMap); + relationshipMap.put("", targetMap); + Map columnMap = new HashMap<>(); + ReadMapper.getMapper(mappingRule).accept(relationshipMap, columnMap); + final Schema valueSchema = RecordConverter.map2StructSchema(columnMap); + valueSchema.setName(relationship.type()); + Struct struct = RecordConverter.buildStruct(valueSchema, columnMap); + return InternalPair.of(relationship.id(), struct); + } + + @Override + public String queryType() { + return LabelTypeEnum.relationship.name(); + } +} \ No newline at end of file diff --git a/connectors/rocketmq-connect-neo4j/src/test/java/org/apache/rocketmq/connect/neo4j/source/Neo4jSourceTaskTest.java b/connectors/rocketmq-connect-neo4j/src/test/java/org/apache/rocketmq/connect/neo4j/source/Neo4jSourceTaskTest.java new file mode 100644 index 00000000..66d2f26c --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/src/test/java/org/apache/rocketmq/connect/neo4j/source/Neo4jSourceTaskTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.neo4j.source; + +import org.apache.rocketmq.connect.neo4j.config.Neo4jBaseConfig; +import org.apache.rocketmq.connect.neo4j.config.Neo4jConstants; +import org.apache.rocketmq.connect.neo4j.config.Neo4jSourceConfig; +import org.apache.rocketmq.connect.neo4j.helper.Neo4jClient; + +import io.openmessaging.KeyValue; +import io.openmessaging.internal.DefaultKeyValue; + +public class Neo4jSourceTaskTest { + private static final String host= "localhost"; + private static final Integer port = 7687; + private static final String db = "test"; + private static final String user = "test"; + private static final String password = "root123456"; + + public void testClient() { + Neo4jBaseConfig neo4jBaseConfig = new Neo4jSourceConfig(); + KeyValue config = new DefaultKeyValue(); + config.put(Neo4jConstants.NEO4J_HOST, host); + config.put(Neo4jConstants.NEO4J_PORT, port); + config.put(Neo4jConstants.NEO4J_USER, user); + config.put(Neo4jConstants.NEO4J_PASSWORD, password); + config.put(Neo4jConstants.NEO4J_DB, db); + neo4jBaseConfig.load(config); + Neo4jClient client = new Neo4jClient(neo4jBaseConfig); + final boolean ping = client.ping(); + System.out.println(ping); + } + + public static void main(String[] args) { + Neo4jSourceTaskTest neo4jSourceTaskTest = new Neo4jSourceTaskTest(); + neo4jSourceTaskTest.testClient(); + } +} \ No newline at end of file From 31eff5819fb90df76836aab536e9045aecd36d01 Mon Sep 17 00:00:00 2001 From: liuzongliang Date: Thu, 1 Jun 2023 14:13:16 +0800 Subject: [PATCH 2/2] Neo4jSourceConnector and Neo4jSinkConnector impl --- .../connect/neo4j/source/Neo4jSourceTask.java | 3 + .../neo4j/source/Neo4jSourceTaskTest.java | 53 ------------ .../connect/neo4j/sink/Neo4jSinkTaskTest.java | 86 +++++++++++++++++++ .../neo4j/source/Neo4jSourceTaskTest.java | 81 +++++++++++------ 4 files changed, 146 insertions(+), 77 deletions(-) delete mode 100644 connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/Neo4jSourceTaskTest.java create mode 100644 connectors/rocketmq-connect-neo4j/src/test/java/org/apache/rocketmq/connect/neo4j/sink/Neo4jSinkTaskTest.java diff --git a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/Neo4jSourceTask.java b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/Neo4jSourceTask.java index c5fa6d54..f5fc329a 100644 --- a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/Neo4jSourceTask.java +++ b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/Neo4jSourceTask.java @@ -89,6 +89,9 @@ private String buildCql(long offset) { } private long readRecordOffset() { + if (sourceTaskContext == null) { + return 0L; + } final RecordOffset positionInfo = this.sourceTaskContext.offsetStorageReader().readOffset(buildRecordPartition(config.getNeo4jDataBase())); if (positionInfo == null) { diff --git a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/Neo4jSourceTaskTest.java b/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/Neo4jSourceTaskTest.java deleted file mode 100644 index 66d2f26c..00000000 --- a/connectors/rocketmq-connect-neo4j/src/main/java/org/apache/rocketmq/connect/neo4j/source/Neo4jSourceTaskTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.connect.neo4j.source; - -import org.apache.rocketmq.connect.neo4j.config.Neo4jBaseConfig; -import org.apache.rocketmq.connect.neo4j.config.Neo4jConstants; -import org.apache.rocketmq.connect.neo4j.config.Neo4jSourceConfig; -import org.apache.rocketmq.connect.neo4j.helper.Neo4jClient; - -import io.openmessaging.KeyValue; -import io.openmessaging.internal.DefaultKeyValue; - -public class Neo4jSourceTaskTest { - private static final String host= "localhost"; - private static final Integer port = 7687; - private static final String db = "test"; - private static final String user = "test"; - private static final String password = "root123456"; - - public void testClient() { - Neo4jBaseConfig neo4jBaseConfig = new Neo4jSourceConfig(); - KeyValue config = new DefaultKeyValue(); - config.put(Neo4jConstants.NEO4J_HOST, host); - config.put(Neo4jConstants.NEO4J_PORT, port); - config.put(Neo4jConstants.NEO4J_USER, user); - config.put(Neo4jConstants.NEO4J_PASSWORD, password); - config.put(Neo4jConstants.NEO4J_DB, db); - neo4jBaseConfig.load(config); - Neo4jClient client = new Neo4jClient(neo4jBaseConfig); - final boolean ping = client.ping(); - System.out.println(ping); - } - - public static void main(String[] args) { - Neo4jSourceTaskTest neo4jSourceTaskTest = new Neo4jSourceTaskTest(); - neo4jSourceTaskTest.testClient(); - } -} \ No newline at end of file diff --git a/connectors/rocketmq-connect-neo4j/src/test/java/org/apache/rocketmq/connect/neo4j/sink/Neo4jSinkTaskTest.java b/connectors/rocketmq-connect-neo4j/src/test/java/org/apache/rocketmq/connect/neo4j/sink/Neo4jSinkTaskTest.java new file mode 100644 index 00000000..08ee88e4 --- /dev/null +++ b/connectors/rocketmq-connect-neo4j/src/test/java/org/apache/rocketmq/connect/neo4j/sink/Neo4jSinkTaskTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.connect.neo4j.sink; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.rocketmq.connect.neo4j.config.Neo4jConstants; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.*; +import io.openmessaging.internal.DefaultKeyValue; + +public class Neo4jSinkTaskTest { + +// private static final String host= "localhost"; +// private static final Integer port = 7687; +// private static final String db = "tmp"; +// private static final String user = "test"; +// private static final String password = "root123456"; +// +// public static void main(String[] args) { +// List records = new ArrayList<>(); +// // build schema +// Schema schema = SchemaBuilder.struct() +// .name("tableName") +// .field("label",SchemaBuilder.string().build()) +// .field("recordId", SchemaBuilder.string().build()) +// .build(); +// +// +// for (int i = 0; i < 4; i++) { +// // build record +// String param0 = "Record"; +// Struct struct= new Struct(schema); +// struct.put("label", param0); +// struct.put("recordId",String.valueOf(i)); +// ConnectRecord record = new ConnectRecord( +// // offset partition +// // offset partition" +// new RecordPartition(new ConcurrentHashMap<>()), +// new RecordOffset(new HashMap<>()), +// System.currentTimeMillis(), +// schema, +// struct +// ); +// records.add(record); +// +// } +// Neo4jSinkTask task = new Neo4jSinkTask(); +// KeyValue config = new DefaultKeyValue(); +// config.put(Neo4jConstants.NEO4J_HOST, host); +// config.put(Neo4jConstants.NEO4J_PORT, port); +// config.put(Neo4jConstants.NEO4J_USER, user); +// config.put(Neo4jConstants.NEO4J_PASSWORD, password); +// config.put(Neo4jConstants.NEO4J_DB, db); +// config.put(Neo4jConstants.LABEL_TYPE, "node"); +// config.put(Neo4jConstants.COLUMN, "[\n" + " {\n" + " \"name\":\"recordId\",\n" +// + " \"type\":\"long\",\n" + " \"columnType\":\"primaryKey\",\n" +// + " \"valueExtract\":\"#{recordId}\"\n" + " },\n" + " {\n" +// + " \"name\":\"Goods\",\n" + " \"type\":\"string\",\n" +// + " \"columnType\":\"primaryLabel\",\n" + " \"valueExtract\":\"#{label}\"\n" +// + " }\n" + " ]"); +// +// task.start(config); +// task.put(records); +// +// } + +} \ No newline at end of file diff --git a/connectors/rocketmq-connect-neo4j/src/test/java/org/apache/rocketmq/connect/neo4j/source/Neo4jSourceTaskTest.java b/connectors/rocketmq-connect-neo4j/src/test/java/org/apache/rocketmq/connect/neo4j/source/Neo4jSourceTaskTest.java index 66d2f26c..f6cb8c00 100644 --- a/connectors/rocketmq-connect-neo4j/src/test/java/org/apache/rocketmq/connect/neo4j/source/Neo4jSourceTaskTest.java +++ b/connectors/rocketmq-connect-neo4j/src/test/java/org/apache/rocketmq/connect/neo4j/source/Neo4jSourceTaskTest.java @@ -17,37 +17,70 @@ package org.apache.rocketmq.connect.neo4j.source; +import java.util.List; + import org.apache.rocketmq.connect.neo4j.config.Neo4jBaseConfig; import org.apache.rocketmq.connect.neo4j.config.Neo4jConstants; import org.apache.rocketmq.connect.neo4j.config.Neo4jSourceConfig; import org.apache.rocketmq.connect.neo4j.helper.Neo4jClient; import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.ConnectRecord; import io.openmessaging.internal.DefaultKeyValue; public class Neo4jSourceTaskTest { - private static final String host= "localhost"; - private static final Integer port = 7687; - private static final String db = "test"; - private static final String user = "test"; - private static final String password = "root123456"; - - public void testClient() { - Neo4jBaseConfig neo4jBaseConfig = new Neo4jSourceConfig(); - KeyValue config = new DefaultKeyValue(); - config.put(Neo4jConstants.NEO4J_HOST, host); - config.put(Neo4jConstants.NEO4J_PORT, port); - config.put(Neo4jConstants.NEO4J_USER, user); - config.put(Neo4jConstants.NEO4J_PASSWORD, password); - config.put(Neo4jConstants.NEO4J_DB, db); - neo4jBaseConfig.load(config); - Neo4jClient client = new Neo4jClient(neo4jBaseConfig); - final boolean ping = client.ping(); - System.out.println(ping); - } - - public static void main(String[] args) { - Neo4jSourceTaskTest neo4jSourceTaskTest = new Neo4jSourceTaskTest(); - neo4jSourceTaskTest.testClient(); - } +// private static final String host= "localhost"; +// private static final Integer port = 7687; +// private static final String db = "test"; +// private static final String user = "test"; +// private static final String password = "root123456"; +// +// public void testClient() { +// Neo4jBaseConfig neo4jBaseConfig = new Neo4jSourceConfig(); +// KeyValue config = new DefaultKeyValue(); +// config.put(Neo4jConstants.NEO4J_HOST, host); +// config.put(Neo4jConstants.NEO4J_PORT, port); +// config.put(Neo4jConstants.NEO4J_USER, user); +// config.put(Neo4jConstants.NEO4J_PASSWORD, password); +// config.put(Neo4jConstants.NEO4J_DB, db); +// neo4jBaseConfig.load(config); +// Neo4jClient client = new Neo4jClient(neo4jBaseConfig); +// final boolean ping = client.ping(); +// System.out.println(ping); +// } +// +// public void testPoll() throws InterruptedException { +// Neo4jSourceTask task = new Neo4jSourceTask(); +// KeyValue config = new DefaultKeyValue(); +// config.put(Neo4jConstants.NEO4J_HOST, host); +// config.put(Neo4jConstants.NEO4J_PORT, port); +// config.put(Neo4jConstants.NEO4J_USER, user); +// config.put(Neo4jConstants.NEO4J_PASSWORD, password); +// config.put(Neo4jConstants.NEO4J_DB, db); +// config.put(Neo4jConstants.NEO4J_TOPIC, "nodeNeo4jTopic"); +// config.put(Neo4jConstants.LABEL_TYPE, "node"); +// config.put(Neo4jConstants.LABELS, "Goods"); +// config.put(Neo4jConstants.COLUMN, "[\n" + " {\n" + " \"name\":\"goodsId\",\n" +// + " \"type\":\"long\",\n" + " \"columnType\":\"primaryKey\",\n" +// + " \"valueExtract\":\"#{goodsId}\"\n" + " },\n" + " {\n" +// + " \"name\":\"label\",\n" + " \"type\":\"string\",\n" +// + " \"columnType\":\"primaryLabel\"\n" + " },\n" + " {\n" +// + " \"name\":\"goodsName\",\n" + " \"type\":\"string\",\n" +// + " \"columnType\":\"nodeProperty\",\n" + " \"valueExtract\":\"#{goodsName}\"\n" +// + " }\n" + " ]"); +// task.start(config); +// while (true) { +// final List connectRecordList = task.poll(); +// for(ConnectRecord record : connectRecordList) { +// System.out.println(record); +// } +// Thread.sleep(3000); +// } +// } +// +// public static void main(String[] args) throws InterruptedException { +// Neo4jSourceTaskTest neo4jSourceTaskTest = new Neo4jSourceTaskTest(); +// neo4jSourceTaskTest.testClient(); +// neo4jSourceTaskTest.testPoll(); +// } } \ No newline at end of file