Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support TLS Grpc communication between clusters. #11549

Merged
merged 50 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
9ecd699
Fix exception code error.(#10925)
stone-98 Aug 7, 2023
dfd6139
Merge branch 'develop' of https://github.com/alibaba/nacos into develop
stone-98 Aug 7, 2023
c4a030d
[ISSUE #11456]Add RpcClusterClientTlsConfig.java.
stone-98 Dec 20, 2023
284c315
[ISSUE #11456]Add cluster rpc tls config.
stone-98 Dec 20, 2023
7ded5ba
[ISSUE #11456]Add RpcClusterClientTlsConfig UT.
stone-98 Dec 20, 2023
57f9d40
[ISSUE #11456]Add cluster server tls.
stone-98 Dec 23, 2023
0eb0159
[ISSUE #11456]Remove supportCommunicationTypes.
stone-98 Dec 24, 2023
e1a755d
[ISSUE #11456]Fix unit testing and indentation handling
stone-98 Dec 24, 2023
2498821
[ISSUE #11456]Indentation handling
stone-98 Dec 24, 2023
153ad0a
[ISSUE #11456]Fix unit test and rpc constants.
stone-98 Dec 24, 2023
3851794
[ISSUE #11456]Fix unit test.
stone-98 Dec 24, 2023
51801ee
[ISSUE #11456]Optimize code.
stone-98 Dec 25, 2023
b189613
[ISSUE #11456]Fix check style.
stone-98 Dec 25, 2023
0e2532b
[ISSUE #11456]Add unit test.
stone-98 Dec 25, 2023
76b48ff
[ISSUE #11456]Fix check style.
stone-98 Dec 25, 2023
898cb40
[ISSUE #11456]Update unit test.
stone-98 Dec 25, 2023
be40ccd
[ISSUE #11456]Fix unit test.
stone-98 Dec 30, 2023
6ff0122
[ISSUE #11456]Add License.
stone-98 Dec 30, 2023
7329b6c
[ISSUE #11456]Fix unit test.
stone-98 Dec 30, 2023
4a31a7a
[ISSUE #11456]Fix unit test.
stone-98 Dec 30, 2023
24a43f6
[ISSUE #11456]Rename class.
stone-98 Dec 30, 2023
911f81e
[ISSUE #11456]Optimize code.
stone-98 Jan 7, 2024
14134c1
[ISSUE #11456]Handling indentation issues.
stone-98 Jan 8, 2024
962ad7d
[ISSUE #11456]Handling indentation issues.
stone-98 Jan 8, 2024
fac7255
[ISSUE #11456]Handling indentation issues.
stone-98 Jan 8, 2024
f26156f
[ISSUE #11456]Optimize code.
stone-98 Jan 10, 2024
2c74341
[ISSUE #11456]Fix unit test.
stone-98 Jan 10, 2024
162abb7
[ISSUE #11456]Fix unit testing and compatibility handling.
stone-98 Jan 10, 2024
664f9e0
[ISSUE #11456]Support TLS GRPC communication between clusters.
stone-98 Dec 20, 2023
bec9c2f
Merge branch 'develop-#issue-11456' of https://github.com/stone-98/na…
stone-98 Jan 22, 2024
34d63bd
[ISSUE #11456] Fix bugs.
stone-98 Jan 22, 2024
c84a635
[ISSUE #11456]Fix bugs.
stone-98 Jan 22, 2024
ae57a37
Merge branch 'develop-#issue-11456' of https://github.com/stone-98/na…
stone-98 Jan 24, 2024
f64503a
[ISSUE #11456]Adjusting parameter names (compatibility considerations).
stone-98 Feb 2, 2024
3aafce3
Merge branch 'develop' of https://github.com/stone-98/nacos into deve…
stone-98 Feb 2, 2024
66dd055
Merge branch 'develop' into develop-#issue-11456
stone-98 Feb 8, 2024
9f02f4e
[ISSUE #11456]Resolve conflict.
stone-98 Feb 8, 2024
b5971a7
[ISSUE #11456]Remove ProtocolNegotiatorBuilderManager and abstract Pr…
stone-98 Feb 21, 2024
5a0c9d7
[ISSUE #11456]Remove CommunicationType.java.
stone-98 Mar 12, 2024
6f71327
Merge branch 'develop' into develop-#issue-11456
stone-98 Mar 12, 2024
ae72fbc
[ISSUE #11456]Optimize code.
stone-98 Mar 12, 2024
072f350
[ISSUE #11456]Revert author.
stone-98 Mar 12, 2024
9b0bec3
Splitting RpcTlsConfigFactory.
stone-98 Apr 8, 2024
12f1652
Split RpcConstants.
stone-98 Apr 8, 2024
0fa28f0
Divided RpcTlsConfigFactory, adjusted cluster parameters to "nacos.re…
stone-98 Apr 11, 2024
13d4409
Merge branch 'develop' into develop-#issue-11456
stone-98 Apr 23, 2024
e2ac17d
check style.
stone-98 Apr 23, 2024
fa689c0
check style.
stone-98 Apr 25, 2024
2165ee1
Merge branch 'develop' of https://github.com/alibaba/nacos into devel…
stone-98 Apr 25, 2024
82b0510
unit test.
stone-98 Apr 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,29 @@

import org.junit.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class RpcScheduledExecutorTest {

private static final String NAME = "test.rpc.thread";

Map<String, String> threadNameMap = new HashMap<>();
Map<String, String> threadNameMap = new ConcurrentHashMap<>();

@Test
public void testRpcScheduledExecutor() throws InterruptedException {
RpcScheduledExecutor executor = new RpcScheduledExecutor(2, NAME);
CountDownLatch latch = new CountDownLatch(2);
executor.submit(new TestRunner(1, latch));
executor.submit(new TestRunner(2, latch));
latch.await(1, TimeUnit.SECONDS);
boolean await = latch.await(1, TimeUnit.SECONDS);
assertTrue(await);
assertEquals(2, threadNameMap.size());
assertEquals(NAME + ".0", threadNameMap.get("1"));
assertEquals(NAME + ".1", threadNameMap.get("2"));
}

private class TestRunner implements Runnable {
Expand All @@ -56,13 +56,8 @@ public TestRunner(int id, CountDownLatch latch) {

@Override
public void run() {
try {
threadNameMap.put(String.valueOf(id), Thread.currentThread().getName());
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException ignored) {
} finally {
latch.countDown();
}
threadNameMap.put(String.valueOf(id), Thread.currentThread().getName());
latch.countDown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.RpcClientFactory;
import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig;
import com.alibaba.nacos.common.remote.client.RpcClientTlsConfigFactory;
import com.alibaba.nacos.common.remote.client.ServerListFactory;
import com.alibaba.nacos.common.utils.ConnLabelsUtils;
import com.alibaba.nacos.common.utils.ConvertUtils;
Expand Down Expand Up @@ -128,6 +129,8 @@ public class ClientWorker implements Closeable {
*/
private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<>(new HashMap<>());

private final DefaultLabelsCollectorManager defaultLabelsCollectorManager = new DefaultLabelsCollectorManager();

private Map<String, String> appLables = new HashMap<>();

private final ConfigFilterChainManager configFilterChainManager;
Expand Down Expand Up @@ -579,8 +582,6 @@ public boolean isHealthServer() {
return agent.isHealthServer();
}

private static DefaultLabelsCollectorManager defaultLabelsCollectorManager = new DefaultLabelsCollectorManager();

public class ConfigRpcTransportClient extends ConfigTransportClient {

Map<String, ExecutorService> multiTaskExecutor = new HashMap<>();
Expand Down Expand Up @@ -1088,18 +1089,19 @@ private boolean checkListenCache(Map<String, List<CacheData>> listenCachesMap) t

private RpcClient ensureRpcClient(String taskId) throws NacosException {
synchronized (ClientWorker.this) {

Map<String, String> labels = getLabels();
Map<String, String> newLabels = new HashMap<>(labels);
newLabels.put("taskId", taskId);
RpcClientTlsConfig clientTlsConfig = RpcClientTlsConfigFactory.getInstance()
.createSdkConfig(properties);
RpcClient rpcClient = RpcClientFactory.createClient(uuid + "_config-" + taskId, getConnectionType(),
newLabels, this.properties, RpcClientTlsConfig.properties(this.properties));
newLabels, clientTlsConfig);
if (rpcClient.isWaitInitiated()) {
initRpcClientHandler(rpcClient);
rpcClient.setTenant(getTenant());
rpcClient.start();
}

return rpcClient;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.RpcClientFactory;
import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig;
import com.alibaba.nacos.common.remote.client.RpcClientTlsConfigFactory;
import com.alibaba.nacos.common.remote.client.ServerListFactory;
import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacos.common.utils.JacksonUtils;
Expand Down Expand Up @@ -104,7 +104,7 @@ public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, Se
labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);
labels.put(Constants.APPNAME, AppNameUtils.getAppName());
this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels,
RpcClientTlsConfig.properties(properties.asProperties()));
RpcClientTlsConfigFactory.getInstance().createSdkConfig(properties.asProperties()));
this.redoService = new NamingGrpcRedoService(this, properties);
NAMING_LOGGER.info("Create naming rpc client for uuid->{}", uuid);
start(serverListFactory, serviceInfoHolder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void before() {
any(RpcClientTlsConfig.class))).thenReturn(rpcClient);
rpcClientFactoryMockedStatic.when(
() -> RpcClientFactory.createClient(anyString(), any(ConnectionType.class), any(Map.class),
any(Properties.class), any(RpcClientTlsConfig.class))).thenReturn(rpcClient);
any(RpcClientTlsConfig.class))).thenReturn(rpcClient);
localConfigInfoProcessorMockedStatic = Mockito.mockStatic(LocalConfigInfoProcessor.class);
Properties properties = new Properties();
properties.put(PropertyKeyConst.NAMESPACE, TEST_NAMESPACE);
Expand Down Expand Up @@ -149,8 +149,8 @@ public void testAddListenerWithoutTenant() throws NacosException {
public void receiveConfigInfo(String configInfo) {
}
};
clientWorker.addListeners(dataId, group, Arrays.asList(listener));

clientWorker.addListeners(dataId, group, Collections.singletonList(listener));
List<Listener> listeners = clientWorker.getCache(dataId, group).getListeners();
Assert.assertEquals(1, listeners.size());
Assert.assertEquals(listener, listeners.get(0));
Expand Down Expand Up @@ -180,8 +180,8 @@ public void receiveConfigInfo(String configInfo) {

String dataId = "a";
String group = "b";
clientWorker.addTenantListeners(dataId, group, Arrays.asList(listener));

clientWorker.addTenantListeners(dataId, group, Collections.singletonList(listener));
List<Listener> listeners = clientWorker.getCache(dataId, group).getListeners();
Assert.assertEquals(1, listeners.size());
Assert.assertEquals(listener, listeners.get(0));
Expand All @@ -191,7 +191,7 @@ public void receiveConfigInfo(String configInfo) {
Assert.assertEquals(0, listeners.size());

String content = "d";
clientWorker.addTenantListenersWithContent(dataId, group, content, null, Arrays.asList(listener));
clientWorker.addTenantListenersWithContent(dataId, group, content, null, Collections.singletonList(listener));
listeners = clientWorker.getCache(dataId, group).getListeners();
Assert.assertEquals(1, listeners.size());
Assert.assertEquals(listener, listeners.get(0));
Expand Down Expand Up @@ -418,10 +418,10 @@ public void testHandleClientMetricsReqeust() throws Exception {
String metricValues = jsonNode.get("metricValues")
.get(ClientConfigMetricRequest.MetricsKey.build(ClientConfigMetricRequest.MetricsKey.CACHE_DATA,
GroupKey.getKeyTenant(dataId, group, tenant)).toString()).textValue();
int colonIndex = metricValues.toString().lastIndexOf(":");

int colonIndex = metricValues.lastIndexOf(":");
Assert.assertEquals(content, metricValues.substring(0, colonIndex));
Assert.assertEquals(md5, metricValues.substring(colonIndex + 1, metricValues.toString().length()));
Assert.assertEquals(md5, metricValues.substring(colonIndex + 1, metricValues.length()));

}

Expand All @@ -441,7 +441,7 @@ public void testGeConfigConfigNotFound() throws NacosException {
Mockito.when(rpcClient.request(any(ConfigQueryRequest.class), anyLong())).thenReturn(configQueryResponse);

ConfigResponse configResponse = clientWorker.getServerConfig(dataId, group, tenant, 100, true);
Assert.assertEquals(null, configResponse.getContent());
Assert.assertNull(configResponse.getContent());
localConfigInfoProcessorMockedStatic.verify(
() -> LocalConfigInfoProcessor.saveSnapshot(eq(clientWorker.getAgentName()), eq(dataId), eq(group),
eq(tenant), eq(null)), times(1));
Expand Down Expand Up @@ -476,7 +476,7 @@ public void testShutdown() throws NacosException, NoSuchFieldException, IllegalA
Properties prop = new Properties();
ConfigFilterChainManager filter = new ConfigFilterChainManager(new Properties());
ServerListManager agent = Mockito.mock(ServerListManager.class);

final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(prop);
ClientWorker clientWorker = new ClientWorker(filter, agent, nacosClientProperties);
clientWorker.shutdown();
Expand All @@ -485,8 +485,8 @@ public void testShutdown() throws NacosException, NoSuchFieldException, IllegalA
ConfigTransportClient o = (ConfigTransportClient) agent1.get(clientWorker);
Assert.assertTrue(o.executor.isShutdown());
agent1.setAccessible(false);
Assert.assertEquals(null, clientWorker.getAgentName());

Assert.assertNull(clientWorker.getAgentName());
}

@Test
Expand Down Expand Up @@ -552,13 +552,13 @@ public void receiveConfigInfo(String configInfo) {
configContext.setGroup(group);
configContext.setTenant(tenant);
ConfigChangeBatchListenResponse response = new ConfigChangeBatchListenResponse();
response.setChangedConfigs(Arrays.asList(configContext));
response.setChangedConfigs(Collections.singletonList(configContext));

RpcClient rpcClientInner = Mockito.mock(RpcClient.class);
Mockito.when(rpcClientInner.isWaitInitiated()).thenReturn(true, false);
rpcClientFactoryMockedStatic.when(
() -> RpcClientFactory.createClient(anyString(), any(ConnectionType.class), any(Map.class),
any(Properties.class), any(RpcClientTlsConfig.class))).thenReturn(rpcClientInner);
any(RpcClientTlsConfig.class))).thenReturn(rpcClientInner);
// mock listen and remove listen request
Mockito.when(rpcClientInner.request(any(ConfigBatchListenRequest.class), anyLong()))
.thenReturn(response, response);
Expand Down Expand Up @@ -620,20 +620,20 @@ public void testIsHealthServer() throws NacosException, NoSuchFieldException, Il
Properties prop = new Properties();
ConfigFilterChainManager filter = new ConfigFilterChainManager(new Properties());
ServerListManager agent = Mockito.mock(ServerListManager.class);

final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(prop);
ClientWorker clientWorker = new ClientWorker(filter, agent, nacosClientProperties);
ClientWorker.ConfigRpcTransportClient client = Mockito.mock(ClientWorker.ConfigRpcTransportClient.class);
Mockito.when(client.isHealthServer()).thenReturn(Boolean.TRUE);

Field declaredField = ClientWorker.class.getDeclaredField("agent");
declaredField.setAccessible(true);
declaredField.set(clientWorker, client);
Assert.assertEquals(true, clientWorker.isHealthServer());

Assert.assertTrue(clientWorker.isHealthServer());

Mockito.when(client.isHealthServer()).thenReturn(Boolean.FALSE);
Assert.assertEquals(false, clientWorker.isHealthServer());
assertFalse(clientWorker.isHealthServer());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,30 +121,41 @@ public static RpcClient createClient(String clientName, ConnectionType connectio
}

/**
* create a rpc client.
* Creates an RPC client for cluster communication with default thread pool settings.
*
* @param clientName client name.
* @param connectionType client type.
* @return rpc client.
* @param clientName The name of the client.
* @param connectionType The type of client connection.
* @param labels Additional labels for RPC-related attributes.
* @return An RPC client for cluster communication.
*/
public static RpcClient createClusterClient(String clientName, ConnectionType connectionType,
Map<String, String> labels) {
return createClusterClient(clientName, connectionType, null, null, labels);
}

/**
* Creates an RPC client for cluster communication with TLS configuration.
*
* @param clientName The name of the client.
* @param connectionType The type of client connection.
* @param labels Additional labels for RPC-related attributes.
* @param tlsConfig TLS configuration for secure communication.
* @return An RPC client for cluster communication with TLS configuration.
*/
public static RpcClient createClusterClient(String clientName, ConnectionType connectionType,
Map<String, String> labels, RpcClientTlsConfig tlsConfig) {
return createClusterClient(clientName, connectionType, null, null, labels, tlsConfig);
}

/**
* create a rpc client.
* Creates an RPC client for cluster communication with custom thread pool settings.
*
* @param clientName client name.
* @param connectionType client type.
* @param threadPoolCoreSize grpc thread pool core size
* @param threadPoolMaxSize grpc thread pool max size
* @return rpc client.
* @param clientName The name of the client.
* @param connectionType The type of client connection.
* @param threadPoolCoreSize The core size of the gRPC thread pool.
* @param threadPoolMaxSize The maximum size of the gRPC thread pool.
* @param labels Additional labels for RPC-related attributes.
* @return An RPC client for cluster communication with custom thread pool settings.
*/
public static RpcClient createClusterClient(String clientName, ConnectionType connectionType,
Integer threadPoolCoreSize, Integer threadPoolMaxSize, Map<String, String> labels) {
Expand All @@ -162,7 +173,6 @@ public static RpcClient createClusterClient(String clientName, ConnectionType co
* @param tlsConfig tlsConfig.
* @return
*/

public static RpcClient createClusterClient(String clientName, ConnectionType connectionType,
Integer threadPoolCoreSize, Integer threadPoolMaxSize, Map<String, String> labels,
RpcClientTlsConfig tlsConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,68 +18,10 @@

import com.alibaba.nacos.common.remote.TlsConfig;

import java.util.Properties;

/**
* gRPC config for sdk.
*
* @author githubcheng2978
*/
public class RpcClientTlsConfig extends TlsConfig {

/**
* get tls config from properties.
* @param properties Properties.
* @return tls of config.
*/
public static RpcClientTlsConfig properties(Properties properties) {
RpcClientTlsConfig tlsConfig = new RpcClientTlsConfig();
if (properties.containsKey(RpcConstants.RPC_CLIENT_TLS_ENABLE)) {
tlsConfig.setEnableTls(Boolean.parseBoolean(
properties.getProperty(RpcConstants.RPC_CLIENT_TLS_ENABLE)));
}

if (properties.containsKey(RpcConstants.RPC_CLIENT_TLS_PROVIDER)) {
tlsConfig.setSslProvider(properties.getProperty(RpcConstants.RPC_CLIENT_TLS_PROVIDER));
}

if (properties.containsKey(RpcConstants.RPC_CLIENT_MUTUAL_AUTH)) {
tlsConfig.setMutualAuthEnable(Boolean.parseBoolean(
properties.getProperty(RpcConstants.RPC_CLIENT_MUTUAL_AUTH)));
}

if (properties.containsKey(RpcConstants.RPC_CLIENT_TLS_PROTOCOLS)) {
tlsConfig.setProtocols(properties.getProperty(RpcConstants.RPC_CLIENT_TLS_PROTOCOLS));
}

if (properties.containsKey(RpcConstants.RPC_CLIENT_TLS_CIPHERS)) {
tlsConfig.setCiphers(properties.getProperty(RpcConstants.RPC_CLIENT_TLS_CIPHERS));
}

if (properties.containsKey(RpcConstants.RPC_CLIENT_TLS_TRUST_COLLECTION_CHAIN_PATH)) {
tlsConfig.setTrustCollectionCertFile(properties.getProperty(RpcConstants.RPC_CLIENT_TLS_TRUST_COLLECTION_CHAIN_PATH));
}

if (properties.containsKey(RpcConstants.RPC_CLIENT_TLS_CERT_CHAIN_PATH)) {
tlsConfig.setCertChainFile(properties.getProperty(RpcConstants.RPC_CLIENT_TLS_CERT_CHAIN_PATH));
}

if (properties.containsKey(RpcConstants.RPC_CLIENT_TLS_CERT_KEY)) {
tlsConfig.setCertPrivateKey(properties.getProperty(RpcConstants.RPC_CLIENT_TLS_CERT_KEY));
}

if (properties.containsKey(RpcConstants.RPC_CLIENT_TLS_TRUST_ALL)) {
tlsConfig.setTrustAll(Boolean.parseBoolean(properties.getProperty(RpcConstants.RPC_CLIENT_TLS_TRUST_ALL)));
}

if (properties.containsKey(RpcConstants.RPC_CLIENT_TLS_TRUST_PWD)) {
tlsConfig.setCertPrivateKeyPassword(properties.getProperty(RpcConstants.RPC_CLIENT_TLS_TRUST_PWD));
}

if (properties.containsKey(RpcConstants.RPC_CLIENT_TLS_PROVIDER)) {
tlsConfig.setSslProvider(properties.getProperty(RpcConstants.RPC_CLIENT_TLS_PROVIDER));
}
return tlsConfig;
}

}