Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

通过内部测试,使用超低延时服务通信这一块的特色,已经自行添加了服务调用模式 #73

Open
pc859107393 opened this issue Jul 12, 2020 · 19 comments
Labels
enhancement New feature or request

Comments

@pc859107393
Copy link

pc859107393 commented Jul 12, 2020

因为项目的特殊要求,对服务通信这一块要求很高,前面springcloud已经挂掉了。因为累积技术栈的原因,选型了这个项目。
在这个项目中,我们自行添加了粗粒度的服务调用模式,举例来说,我们项目中要求一些特殊数据分片多线程上传,但是在微服务场景,会导致数据分发到不同的微服务,但是因为需要实时处理的缘故,再次聚合数据会导致处理时间跟不上,所以添加了调用控制,当某个服务被标注为固定模式的时候,其他服务调用的时候会选择某个节点,当再一定时间(三分钟) 不使用又会释放掉该节点同时绑定了服务节点的上下线通知。还有更多的一些需求没完善。需要提交pr吗?

@linux-china
Copy link
Collaborator

linux-china commented Jul 15, 2020

非常欢迎啊 ;) 这个特性非常好,非常有用。我讲述一下RSocket的对这个实现的原理:

  • RSocket Channel: Broker会在requester和provider之间创建一个channel,接下来发送的payload不会到其他service provider上,都是点对点的。 但是如果是长时间的操作,可能有问题,主要是service provider可能下线。 如果客户端多线程上传,如果还用到channel的话,就是requestChannel(Publisher payloads) 这个payloads调整为Processor就可以,但是要注意放入Payload的顺序。
  • RoutingMetadata: 包括endpoint支持 https://github.com/alibaba/alibaba-rsocket-broker/wiki/RSocket-Routing 目前的问题是routing是有cache的,可以考虑根据Reactor Context中的变量信息动态创建路由,然后加上endpoint。 endpoint之前目的是给调试用的,当然做请求定向也没有问题,可能需要做一些代码调整。
  • 另外就是添加一个metadata,标明需要路由到特定service provider上,这个可能涉及到一定的匹配算法。

不知道你是如何实现,这个特性还是非常有用的,非常欢迎PR。 这两天在写RSocket PHP和Dart SDK,回复有点慢啦 :)

@linux-china linux-china added the enhancement New feature or request label Jul 16, 2020
@pc859107393
Copy link
Author

我这边主要是在服务注解上面添加了标示固定,然后在broker提供服务id这里添加对应的id缓存,同时适配了服务上下线这一块的处理。前面两个方法我也考虑过,但是主要还是倾向于在ServiceRoutingSelector对应的实现类实现这一块的功能,本来想在BinaryRoutingMetadata中再添加请求固定标示,这样就能精准控制服务的使用。

@linux-china
Copy link
Collaborator

linux-china commented Jul 16, 2020

BinaryRoutingMetadata,这个可以添加的,就是多个字符串,你可以选择一个固定tag表示这个含义。

@linux-china
Copy link
Collaborator

linux-china commented Jul 16, 2020

@pc859107393 哪里能看到你调整后的代码? 有对应的git repo吗? 谢谢 :) 我已经在 GSVRoutingMetadata 和 ServiceMapping 添加了sticky 字段,表明这个服务调用会采用 Sticky Session机制。 具体的实现原理目前考虑如下:

  • 当sticky service发起调用时,routing metadata中会包含sticky=1
  • RSocket Broker在进行路由时,会判断sticky标识。 如果为sticky,则会在connection attributes中查找该service绑定service provider id,如果没有找到,则会进行路由查找,然后查找的provider id回存到connection attributes提供给下次使用。

当然这里还需要处理一下service provider下线的情况,如果对应的serivce provider id没有找到,会执行重新查找路由和回存的过程。

@pc859107393
Copy link
Author

pc859107393 commented Jul 16, 2020

https://gitee.com/pc_859107393/alibaba-rsocket-broker 我已经fork到gitee了,在dev_base_cheng这个分支上面,还在整理,想调整一下BinaryRoutingMetadata在上面添加一个服务标识之类的东西。目前这个是只在ServiceRoutingSelector上面做了固定检测。另外,我们这边私有maven仓库,把原项目的版本提升了,希望理解,哈哈。

@linux-china
Copy link
Collaborator

linux-china commented Jul 16, 2020

收到,没有关系。 你要的这一特性非常不错,应该是sticky session。 我增加了Sticky session实现, 对应的文档这里 https://github.com/alibaba/alibaba-rsocket-broker/wiki/RSocket-Routing 如果有不对之处,欢迎指正 :)

这一特性和之前版本有兼容问题,请使用最新的客户端 + broker,不然可能导致broker解析元信息错误。

@pc859107393
Copy link
Author

ok,我这边合并一下 看看这个特性

@pc859107393
Copy link
Author

我看了一下你那边的特性,我前面提到的需求都实现了,然后你那边貌似没有做长时间不使用的服务释放。我是直接使用了loadingCache设置了超时时间三分钟。

@linux-china
Copy link
Collaborator

@pc859107393 刚好问你一问题,我也考虑这种绑定关系释放问题,如果一直都需要sticky session特性,而且一直都有请求发送,重新绑定会涉及一定的消息不一致,这个你目前如何考虑的? 我还没有好的解决方法。

@pc859107393
Copy link
Author

我也一直在想这个问题,因为目前最困难的是我们没办法让框架自动感知到某次请求要需要命中哪一个服务,而且切换服务这一个是和具体的业务场景深度耦合的,特别是在分片数据实时处理的情况下(比如说我这个文件数据我需要对内容进行整理,那肯定是需要在在这中间一直命中同一个服务),所以最大的问题还是回到了原点,我们无法让程序感知到下次是否需要切换。所以目前我也没有好的办法。 我也在想有没有某种数据特征采样的可以实现但是没找到合适的办法。或者说定义一个需要自定义的适配器,然后让需要使用这个高级切换的自行实现对应的适配器。

@pc859107393
Copy link
Author

就是说,我有一个适配器,你需要固定调用的时候还要做到灵活切换,则必须实现这个适配器并回馈一个采样值。在需要固定的区间内,我们一直使用这个相同的采样值,当采样值不同的时候我们再去重新实现服务切换。

@pc859107393
Copy link
Author

我仔细的想了一下,我们在服务消费者注册的时候已经添加了sticky标志,那么我们再弄一个全局开关控制sticky的高级模式,然后将生产者的参数实现实现某个接口(返回一个固定链路的唯一标志),然后我们在服务调用链路中把消费者的id改成固定标志,然后当固定标志发生变化的时候我们再去重新分配一个目标服务就行了。

@linux-china
Copy link
Collaborator

linux-china commented Jul 18, 2020

目前我想的策略是: 如果你要做sticky session,你需要将响应的API合并到一个interface中,这个和CRQS读写分离原则差不多,99%的interface都是非session保持的。 如果有一个session interface,包含一个函数方法,如

interface FileStorageService {
   void saveFile(String fileName, int offset, byte[] content); 
}

关键是不知道何时断开FileStorageService和某一Service Provider绑定,请求一直在进行,Broker也不知道一个整体请求的终点在哪里,关键是如果非常多的请求都在进行,也没法断开,断开就会有数据不一致的问题。

@pc859107393
Copy link
Author

我想的是在我们的Service中需要调用的接口参数必须是一个实体对象,然后这个对象中必须实现某个接口返回操作标记(这个标记每个完整的文件上传都是同一个唯一标志),然后我们在消费者注册的时候必须添加粘性标志以及开启高级粘性特性。然后这时候我们就可以在服务调用的时候获取这个唯一标志,然后通过这个唯一标志跟远程服务id绑定就可以实现调用了。 这样做引入了强耦合同时每一个完整的文件上传就算是某个消费者分段传输的只要唯一标志一定,就不会出现服务调用错误的问题。

@pc859107393
Copy link
Author

pc859107393 commented Jul 24, 2020

我已经完成这一块的代码了,已经经过测试在同一组请求中可以实现服务的固定调用,可以达到对应的效果,同时这一组数据传输完成后,下一次不同的一组请求还是能够进行负载均衡的。另外我看了一下我还需要设计固定服务后,固定服务的限流这一块的东西。就是说我指定某个服务支持每秒并发多少组有效请求,在一些特殊场景下这些东西还是免不了,这周我先整理一下然后先提一个pr上来,后面的我再想想看看有没有好的解决办法。我那边的代码也是同步到我在gitee上面的仓库中的。

@pc859107393
Copy link
Author

pc859107393 commented Jul 29, 2020

目前我这边我看了一下,我们的设计思路还是有一些出入。 我这边服务黏性调用,我设计的是由服务提供方来配置,所以我有一个ServiceType的枚举加在RSocketService这个注解上面。然后在开发服务提供方的时候,如果没有在 @RSocketService 这个注解里面配置服务模式为ServiceType.FIXED,那么黏性调用无效,如果仅仅在服务提供方配置了FIXED,却没有在消费者创建处添加FIXED,那么服务不会进行负载均衡,会一直固定调用同一个服务。 只有在满足消费者和服务提供者两处都配置了 FIXED 才会实现自动负载均衡。

大概代码如下:

先是提供服务类型:

/**
 * 服务调用模式,固定或者自动切换
 */
public enum ServiceType {

    /**
     * 固定服务调用
     */
    FIXED(1),
    /**
     * 动态服务调用
     */
    CHANGE(2);

    private final int code;

    ServiceType(int code) {
        this.code = code;
    }

    public static ServiceType getType(int code) {
        return code == 1 ? FIXED : CHANGE;
    }

    public int getCode() {
        return code;
    }
}

紧接着在RSocketService中添加服务调用模式,默认是自动调用(不需要黏性),如下:

/**
 * rsocket Service annotation
 *
 * @author leijuan
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface RSocketService {
    /**
     * 省略部分代码
     */


    /**
     * 服务调用模式
     *
     * @return
     */
    ServiceType type() default ServiceType.CHANGE;
}

紧接着在 ServiceRoutingSelector 以及实现类上面添加了根据请求缓存查找目标服务,此处会缓存来源服务的id和目标服务的id(当且仅当被调用服务标注了FIXED的时候)。 其他的一些操作和你那边的修改基本是一致的。

到上面这一步的时候已经实现了基本的服务黏性调用。

但是要实现消费者还能负载均衡,首先我们需要知道一个调用方的一个类似操作标志的东西,所以我创建了一个抽象类用来返回一个请求的唯一标志,被调用方的服务方法中传递参数必须包含这个实体。

/**
 * 高级用法的实现接口
 *
 * @author cheng
 */
public abstract class UniqueModel {

    /**
     * 抽象函数,子类必须实现
     *
     * @return
     */
    public abstract String getUniqueTag();

}

紧接着,我在RSocketRequesterRpcProxy中,实现了对唯一标志的编排,当服务是为黏性调用的时候,锁定被调用方服务方法和调用方提供的唯一ID,大概代码片段如下:

/**
 * RSocket requester RPC proxy for remote service
 *
 * @author leijuan
 */
public class RSocketRequesterRpcProxy implements InvocationHandler {

    private ServiceType type = ServiceType.CHANGE;

    protected LoadingCache<String, ReactiveMethodMetadata> methodMetadataCache = CacheBuilder.newBuilder()
            .expireAfterAccess(3, TimeUnit.MINUTES)
            .initialCapacity(4096)
            .build(new CacheLoader<String, ReactiveMethodMetadata>() {
                @Override
                public ReactiveMethodMetadata load(String key) throws Exception {
                    return null;
                }
            });


    @Override
    @RuntimeType
    public Object invoke(@This Object proxy, @Origin Method method, @AllArguments Object[] allArguments) throws Throwable {
        MutableContext mutableContext = new MutableContext();
        ReactiveMethodMetadata methodMetadata = null;
        String tag = null;
        if (allArguments.length > 0 && type.equals(ServiceType.FIXED)) {
            for (Object argument : allArguments) {
                if (argument instanceof UniqueModel) {
                    tag = ((UniqueModel) argument).getUniqueTag();
                    if (StringUtils.isNotBlank(tag))
                        tag = String.format("%s$%s", method.toString(), tag);
                    methodMetadata = methodMetadataCache.getIfPresent(tag);
                    log.info("第一次缓存:{}", tag);
                    break;
                }
            }
        }

        if (null == methodMetadata) {
            if (null != tag && type.equals(ServiceType.FIXED)) {
                methodMetadataCache.put(tag, new ReactiveMethodMetadata(group, service, version,
                        method, encodingType, this.acceptEncodingTypes, endpoint, sourceUri, tag, type));
                log.info("生成缓存标识并获取请求标识:{}", uuid);
                methodMetadata = methodMetadataCache.get(tag);
            } else {
                if (!methodMetadataMap.containsKey(method)) {
                    methodMetadataMap.put(method, new ReactiveMethodMetadata(group, service, version,
                            method, encodingType, this.acceptEncodingTypes, endpoint, sourceUri, null == tag ? uuid : tag, type));
                    log.info("获取固定请求标识:{}", uuid);
                }
                methodMetadata = methodMetadataMap.get(method);
            }
        }
        mutableContext.put(ReactiveMethodMetadata.class, methodMetadata);
        /*剩下代码基本无变化,当然ReactiveMethodMetadata也是做了对应的一点点微调,将handlerId的值设置为唯一标识计算后的值*/
   }
}

这样改动后基本上是满足业务需求的,还有其他的关联的东西,目前没检查。

后续我这边还需要再定制服务限流的一些东西,主要来说我们的一些设计思路不太一样,你看看有没有更好的实现或者说我这样的变更里面会不会有其他的一些问题。

@linux-china
Copy link
Collaborator

@pc859107393 我先看一下你的思路,然后整理一下 :)

@tingyanshen
Copy link

目前我想的策略是: 如果你要做sticky session,你需要将响应的API合并到一个interface中,这个和CRQS读写分离原则差不多,99%的interface都是非session保持的。 如果有一个session interface,包含一个函数方法,如

interface FileStorageService {
   void saveFile(String fileName, int offset, byte[] content); 
}

关键是不知道何时断开FileStorageService和某一Service Provider绑定,请求一直在进行,Broker也不知道一个整体请求的终点在哪里,关键是如果非常多的请求都在进行,也没法断开,断开就会有数据不一致的问题。

建议 是否需要在这里考虑添加 conversation 的机制。conversation 的开始和结束状态由请求方控制,以满足开发者不同的需求。
在 broker 扩展 metadata 路由协议即可支持。

至于你们在讨论的 broker 对绑定服务的状态处理和分配,应当都是由于使用 java interface 实现 rpc 导致的问题。

@tingyanshen
Copy link

conversation 可以是 broker 的扩展服务, 可以由简单的 broker 默认处理,也可以由开发者提供服务的 conversation-alloc(分配管理服务)处理。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants