Skip to content

Latest commit

 

History

History
268 lines (170 loc) · 9.13 KB

案例-计数器.md

File metadata and controls

268 lines (170 loc) · 9.13 KB

计数器


业务场景:

社区作为一个轻互动论坛,必不可少会涉及很多计数统计,比如个人主页要展示用户相关一些计数(他的粉丝数、发贴数),feed信息流会展示每个贴子查看数、点赞数和评论数。如果每个计数都作为一个独立的个体,独立管理,每次用的时候就要从不同地方去取,维护成本肯定很高。

为了更好的管理,从页面功能角度对这些计数做了分类,分为用户维度、贴子维度、小组维度,而每个维度下又有各自的子项计数。

  • 用户维度(关注数、粉丝数、发贴数、评论数、铜钱数、订阅小组数)
  • 贴子维度(查看数、评论数、点赞数、收藏数)
  • 小组相关(订阅数、贴子数、精华数)
  • 消息相关(回复与&、赞、新粉丝数、私信往来的用户)

以用户维度的计数为例:

数据最终是持久到mysql存储,为了提升性能,中间会有一层redis,使用Hash数据结构

1)写操作

任何一个子项动作都会触发缓存的写操作,比如A关注了B,对于A用户来讲,A的关注数要增加1,其它子项计数不变;对于B用户来讲,B的粉丝数要增加1,其它子项计数不变。

 public void updateMemberCount(MemberEditCountParam memberEditCountParam) {
        MemberCountUpdateParam updateParam = new MemberCountUpdateParam();
        updateParam.setUid(memberEditCountParam.getUid());
        updateParam.setCopperCount(memberEditCountParam.getCopperCount());
        updateParam.setPostCount(memberEditCountParam.getPostCount());
        updateParam.setFollowCount(memberEditCountParam.getFollowCount());
        updateParam.setFansCount(memberEditCountParam.getFansCount());
        updateParam.setReplyCount(memberEditCountParam.getReplyCount());
        updateParam.setTags(memberEditCountParam.getTagCount());
        memberCountDao.updateMemberCount(updateParam);
        memberCacheManager.editMemberCount(updateParam);
    }

先写DB,再写cache。对于DB,通过mysql自身的行锁机制解决数据并发问题,‘posts = posts + #{postCount}’。但对于cache的维护,比较麻烦一些

常规思路:


value =  hincrBy(String key, String field, long value) ;
   	|
	|
	|
	|
if value ==1 , del cache

  • 当缓存为空时,hincrBy会返回1;
  • 如果用户某一个子项的计数为0,即使预热到cache,hincrBy 也会返回1。
  • 所以根据是否等于1决定del cache,在用户量很大的情况下,cache的效率会比较差(只有所有的子项都大于0,且已预热到cache里才能避开这种情况)。另外如果 del cache 操作完成之前有读操作,返回的可能是脏数据

解决思路:

引入一个较大阈值,区分cache为空还是已经预热但值为0的情况

protected long incrCount(String hashName, String member, long by) {
        long newValue = 0;
        try {
        // 线程安全
            newValue = getRedisClient().hincrBy(hashName, member, by);
        } catch (RedisException e) {
            logger.error("[AbstractCacheManager.incrCount.hincrBy] invoke error!", e);
        }

//COUNT_OFFSET为一个并发数,可以设定一个较大值,为了解决初始时cache为空,预热cache时又有其它
并发请求干扰带来的脏数据
        if (newValue >= COUNT_OFFSET) {
            return newValue - COUNT_OFFSET;
        } else {
            try {
                getRedisClient().del(hashName);
            } catch (RedisException e) {
                logger.error("[AbstractCacheManager.incrCount.del] invoke error!", e);
            }
            return Long.MIN_VALUE;
        }
    }

注意:

  • 删除或者初始化cache,将key作为一个整体(非单个子项)来操作,减少复杂性
  • 为了便于维护,cache的预热放在查询阶段,如果cache为空,预热到cache中。

2)读操作

批量查多个用户的用户维度计数。

    public List<SimpleMemberCountModel> batchGetSimpleMemberCountModel(List<Long> uidList) {
        if (CollectionUtils.isEmpty(uidList)) {
            return Collections.emptyList();
        }

        List<String> keyList = uidList.stream()
                .map(this::getKeyMemberCount).collect(Collectors.toList());

	// key:用户,value:不同子项的计数
        Map<String, Map<String, Long>> countMaps = super.batchGetCountMap(keyList);
        List<SimpleMemberCountModel> countModelList = Lists.newArrayListWithCapacity(countMaps.size());
        for (int i = 0; i < uidList.size(); ++i) {
            Map<String, Long> countMap = countMaps.get(keyList.get(i));
            if (countMap != null) {
                final Long fansCount = countMap.get(FIELD_FANS_COUNT);
                final Long postCount = countMap.get(FIELD_POST_COUNT);
                final Long followCount = countMap.get(FIELD_FOLLOW_COUNT);
                final Long replyCount = countMap.get(FIELD_REPLY_COUNT);
                final Long tags = countMap.get(FIELD_TAGS_COUNT);
                if (fansCount != null && postCount != null && followCount != null && replyCount != null && tags != null) {
                    SimpleMemberCountModel countModel = new SimpleMemberCountModel();
                    Long uid = uidList.get(i);
                    countModel.setUid(uid);
                    countModel.setFansCount(fansCount);
                    countModel.setPostcount(postCount);
                    countModel.setFollowCount(followCount);
                    countModel.setReplyCount(replyCount);
                    countModel.setTagCount(tags);
                    countModelList.add(countModel);
                }
            }
        }

        return countModelList;
    }

如果缓存为空,从DB查询数据并预热到缓存中。

public void batchSaveSimpleMemberCountModel(List<SimpleMemberCountModel> countModelList) {

        Map<String , Map<String, Long>> batchCountMap = Maps.newHashMapWithExpectedSize(countModelList.size());
        countModelList.forEach(countModel -> {
            String hash = getKeyMemberCount(countModel.getUid());
            Map<String, Long> countMap = Maps.newHashMapWithExpectedSize(6);
            countMap.put(FIELD_UID, countModel.getUid());
            countMap.put(FIELD_FANS_COUNT, countModel.getFansCount());
            countMap.put(FIELD_POST_COUNT, countModel.getPostcount());
            countMap.put(FIELD_FOLLOW_COUNT, countModel.getFollowCount());
            countMap.put(FIELD_REPLY_COUNT, countModel.getReplyCount());
            countMap.put(FIELD_TAGS_COUNT, countModel.getTagCount());
            batchCountMap.put(hash, countMap);
        });
        // 缓存有效期12个小时
        super.batchSaveCountHash(batchCountMap, HALF_DAY_EXPIRE);
    }
    //初始化计数器hash
    protected void batchSaveCountHash(Map<String, Map<String, Long> >batchCountMap, int ttl) {
        if (batchCountMap == null || batchCountMap.size() == 0) {
            return;
        }

        Map<Node, List<String>> nodeMap = getNodeMap(batchCountMap.keySet());

        Map<String, Map<String, String>> batchStrMap = Maps.newHashMapWithExpectedSize(batchCountMap.size());
        batchCountMap.forEach((key, countMap) -> {

            //组装map
            Map<String, String> strMap = Maps.newHashMap();
            //缓存里存的计数是实际计数+计数器基数
            countMap.forEach((field, count) -> strMap.put(field, String.valueOf(count + COUNT_OFFSET)));
            batchStrMap.put(key, strMap);
        });

        //对于同一个节点上的map, 批量执行
        nodeMap.forEach((node, keyList) -> {
            try {
                getRedisClient().pipelined(new PipelineBlock() {
                    @Override
                    public void execute() {

                        keyList.forEach(key -> {
                            Map<String, String> strMap = batchStrMap.get(key);
                            //先删除key, 再设置每一个field, 最后设置超时时间
                            try {
                                del(key);
                                hmset(key, strMap);
                                expire(key, ttl);
                            } catch (RedisException e) {
                                logger.error("[AbstractCacheManager.batchSaveCountHash.execute] invoke error!", e);

                            }
                        });
                    }

                    @Override
                    public Node getTargetNode() {
                        return node;
                    }
                });
            } catch (RedisException e) {
                logger.error("[AbstractCacheManager.batchSaveCountHash] invoke error!", e);
            }
        });
    }

注意:时间久了,缓存中的数据可能会存在与DB不一致情况,目前设置的缓存有效期是12个小时,然后缓存会失效,需再次从数据库同步数据

3)计数的准确性

时间久了,数据库表里的计数值也未必准确,比如关注数,每天凌晨会有定时任务,借助关注表计算准确的数字更新到数据库和缓存中。