Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #528] Add support for authentication, proxy, custom headers, query parameters, and request body to HTTP Connector #529

Merged
merged 1 commit into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 59 additions & 14 deletions connectors/rocketmq-connect-http/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,48 @@
Be responsible for consuming messages from producer and writing data to another web service system.
```

## rocketmq-connect-http 打包
```
mvn clean install -Dmaven.test.skip=true
```
## rocketmq-connect-http使用方法

1. 进入想要使用的connectors目录下(以rocketmq-connect-http目录为例),使用以下指令将插件进行打包
```shell
mvn clean package -Dmaven.test.skip=true
```
2. 打包好的插件以jar包的模式出现在`rocketmq-connect-http/target/`目录下

3. 在`distribution/conf`目录下找的对应的配置文件进行更新,对于standalone的启动方式,更新`connect-standalone.conf`文件中的`pluginPaths`变量

```
pluginPaths=(you plugin path)
```

相应的,使用distributed启动方式,则更新`connect-distributed.conf`中的变量
4. 创建并启动对应的`SourceConnector`以及`SinkConnector`


## rocketmq-connect-http 启动

* **http-sink-connector** 启动

```
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-http-sink-connector-name}
?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.http.sink.HttpSinkConnector","connect-topicname" : "${connect-topicname}","url":"${url}"}
POST http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-jdbc-source-connector-name}
{
"connector.class":"org.apache.rocketmq.connect.http.HttpSinkTask",
"url":"${url}",
"method":"${method}",
"connect.topicnames":"${connect.topicnames}"
}
```

例子
例子
```
http://localhost:8081/connectors/httpSinkConnector?config={"connector-class":"org.apache.rocketmq.connect.http.HttpSinkTask","connect-topicname" : "http-topic","url":"192.168.1.2"}
```
http://localhost:8081/connectors/httpConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
"connector-class":"org.apache.rocketmq.connect.http.sink.HttpSinkConnector","connect-topicname" : "http-topic","url":"192.168.1.2"}
```在请求中定义http header、query、body参数
http://127.0.0.1:8082/connectors/httpSinkConnector?config={"connector.class":"org.apache.rocketmq.connect.http.HttpSinkConnector","url":"http://localhost:8080/api","timeout":"6000","connect.topicnames":"fileTopic","headerParameters":"{\"header1k\":\"header1v\"}","method":"POST","queryParameters":"{\"queryk1\":\"queryv1\"}"}
```

更多参数见[rocketmq-connect-http 参数说明](#rocketmq-connect-http-参数说明)

>**注:** `rocketmq-http-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中

## rocketmq-connect-http 停止
Expand All @@ -35,8 +57,31 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-http-connector-name}/
## rocketmq-connect-http 参数说明
* **http-sink-connector 参数说明**

| KEY | TYPE | Must be filled | Description | Example
|-----|---------|----------------|-------------|------------------|
| url | String | YES | sink端 域名地址 | http://127.0.0.1 |
|connect-topicname | String | YES | sink需要处理数据消息topic | xxxx |

| KEY | TYPE | Must be filled | Description | Example |
|-----------------------|--------|----------------|------------------------------------------------|---------------------------|
| connect-topicname | String | YES | sink需要处理数据消息topic | fileTopic |
| url | String | YES | 目标端url地址 | http://localhost:8080/api |
| method | String | YES | http请求方法 | POST |
| body | String | No | http请求body字段,不填时默认使用事件的Data字段 | POST |
| headerParameters | String | NO | http请求header map动态参数Json字符串 | {"key1":"value1"} |
| fixedHeaderParameters | String | NO | http请求header map静态参数Json字符串 | {"key1":"value1"} |
| queryParameters | String | NO | http请求query map动态参数Json字符串 | {"key1":"value1"} |
| fixedQueryParameters | String | NO | http请求query map静态参数Json字符串 | {"key1":"value1"} |
| socks5UserName | String | NO | sock5代理用户名 | ***** |
| socks5Password | String | NO | sock5代理密码 | ***** |
| socks5Endpoint | String | NO | sock5代理地址 | http://localhost:7000 |
| timeout | String | NO | http请求超时时间(毫秒) | 3000 |
| concurrency | String | NO | http请求并发数 | 1 |
| authType | String | NO | 认证方式 (BASIC_AUTH/OAUTH_AUTH/API_KEY_AUTH/NONE) | BASIC_AUTH |
| basicUsername | String | NO | basic auth username | ***** |
| basicPassword | String | NO | basic auth password | ***** |
| apiKeyUsername | String | NO | api key auth username | ***** |
| apiKeyPassword | String | NO | api key auth password | ***** |
| oAuthEndpoint | String | NO | oauth 地址 | http://localhost:7000 |
| oAuthHttpMethod | String | NO | oauth http请求方法 | GET |
| oAuthClientId | String | NO | oauth client id | xxxx |
| oAuthClientSecret | String | NO | oauth client secret | xxxx |
| oAuthHeaderParameters | String | NO | oauth header map参数Json字符串 | {"key1":"value1"} |
| oAuthQueryParameters | String | NO | oauth query map参数Json字符串 | {"key1":"value1"} |
| oAuthBody | String | NO | oauth body参数 | bodyData |
| token | String | NO | http请求token,如果非空,会添加到http请求的header中,key为token | xxxx |
78 changes: 78 additions & 0 deletions connectors/rocketmq-connect-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,49 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.1.2</version>
<executions>
<execution>
<id>verify</id>
<phase>verify</phase>
<configuration>
<configLocation>../../style/rmq_checkstyle.xml</configLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
<includeTestSourceDirectory>false</includeTestSourceDirectory>
<includeTestResources>false</includeTestResources>
</configuration>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.1.2</version>
<executions>
<execution>
<id>verify</id>
<phase>verify</phase>
<configuration>
<configLocation>../../style/rmq_checkstyle.xml</configLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
<includeTestSourceDirectory>false</includeTestSourceDirectory>
<includeTestResources>false</includeTestResources>
</configuration>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
Expand Down Expand Up @@ -197,6 +240,41 @@
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.9</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.25.Final</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>4.4.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
<scope>compile</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.connect.http;

import org.apache.rocketmq.connect.http.constant.HttpConstant;

import java.util.HashSet;
import java.util.Set;

public class HttpConfig {
public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
{
add(HttpConstant.URL);
add(HttpConstant.METHOD);
}
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.connect.http;

import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.Task;
import io.openmessaging.connector.api.component.task.sink.SinkConnector;
import io.openmessaging.connector.api.errors.ConnectException;

import java.util.ArrayList;
import java.util.List;

/**
* http sink connector
*/
public class HttpSinkConnector extends SinkConnector {

@Override
public Class<? extends Task> taskClass() {
return HttpSinkTask.class;
}

private KeyValue config;

@Override public void validate(KeyValue config) {
for (String requestKey : HttpConfig.REQUEST_CONFIG) {
if (!config.containsKey(requestKey)) {
throw new ConnectException("Request config key: " + requestKey);
}
}
}

@Override public void start(KeyValue config) {
this.config = config;
}

@Override public void stop() {
this.config = null;
}


@Override public List<KeyValue> taskConfigs(int maxTasks) {
List<KeyValue> config = new ArrayList<>();
config.add(this.config);
return config;
}
}
Loading
Loading