这是一份关于 “短链接访问统计系统”(基于 RocketMQ)的笔记,整合了我们之前讨论的所有核心知识点、代码逻辑、设计思想和技术细节,方便你系统复习和查阅。
短链接访问统计系统(基于 RocketMQ)笔记
一、系统核心目标
- 核心功能:记录短链接的每一次访问,并进行多维度的统计分析(PV/UV/UIP、地域、设备、浏览器、操作系统等)。
- 核心挑战:
- 高性能:短链接跳转是核心入口,必须保证用户访问速度快。
- 高并发:可能面临瞬间大量的访问请求(如热点链接)。
- 数据可靠:每一次访问的统计数据都不能丢失,也不能重复计算。
- 系统解耦:统计功能不能影响核心的跳转功能。
二、技术架构与核心组件
整个系统采用生产者 - 消费者(Producer-Consumer)模式,核心组件如下:
生产者(Producer):
ShortLinkStatsSaveProducer- 角色:在短链接被访问时,负责收集访问数据并发送到消息队列。
- 核心任务:将同步的统计入库操作,转化为异步的消息发送。
消息队列(Message Queue):RocketMQ
- 角色:作为生产者和消费者之间的 “桥梁”,存储和转发消息。
- 核心任务:解耦、削峰填谷、保证消息可靠传输。
消费者(Consumer):
ShortLinkStatsSaveConsumer- 角色:监听消息队列,消费统计消息,并将数据持久化到数据库。
- 核心任务:执行耗时的统计入库操作,保证数据最终一致性。
幂等处理器(Idempotent Handler):
MessageQueueIdempotentHandler- 角色:基于 Redis 实现,防止同一条消息被重复消费,导致统计数据重复。
- 核心任务:保证消费的幂等性。
分布式锁(Distributed Lock):Redisson RReadWriteLock
- 角色:在消费者入库时,保证并发场景下数据的一致性。
- 核心任务:防止在统计过程中,短链接的 GID 被修改,导致数据归属错误。
三、核心流程详解
1. 生产者流程 (ShortLinkStatsSaveProducer)
java
运行
public void send(Map<String, String> producerMap) { // 1. 生成唯一的消息Key(UUID),用于幂等性保证 String keys = UUID.randomUUID().toString(); producerMap.put("keys", keys); // 2. 构建RocketMQ消息,设置消息体和消息头 Message<Map<String, String>> build = MessageBuilder .withPayload(producerMap) // 消息体:包含统计数据 .setHeader(MessageConst.PROPERTY_KEYS, keys) // 消息头:设置消息Key .build(); try { // 3. 同步发送消息到指定的Topic SendResult sendResult = rocketMQTemplate.syncSend(statsSaveTopic, build, 2000L); log.info("消息发送成功,ID: {}, Keys: {}", sendResult.getMsgId(), keys); } catch (Throwable ex) { log.error("消息发送失败", ex); // 可扩展:发送失败后的重试或告警逻辑 } }- 关键操作:
- 生成消息 Key:使用
UUID.randomUUID(),确保每条消息的唯一性,是实现幂等的基础。 - 同步发送 (
syncSend):最可靠的发送方式。生产者会等待 Broker 返回发送结果,确保消息至少被 Broker 接收一次。 - 设置超时时间:
2000L(2 秒),防止生产者无限期阻塞。
- 生成消息 Key:使用
2. 消费者流程 (ShortLinkStatsSaveConsumer)
java
运行
@Override public void onMessage(Map<String, String> producerMap) { String keys = producerMap.get("keys"); // ========== 核心步骤1:幂等校验 ========== if (!messageQueueIdempotentHandler.isMessageProcessed(keys)) { if (messageQueueIdempotentHandler.isAccomplish(keys)) { return; // 消息已处理完成,直接返回 } throw new ServiceException("消息处理中,需要重试"); // 触发MQ重试 } try { String fullShortUrl = producerMap.get("fullShortUrl"); if (StrUtil.isNotBlank(fullShortUrl)) { String gid = producerMap.get("gid"); ShortLinkStatsRecordDTO statsRecord = JSON.parseObject(producerMap.get("statsRecord"), ShortLinkStatsRecordDTO.class); // ========== 核心步骤2:执行业务逻辑 ========== actualSaveShortLinkStats(fullShortUrl, gid, statsRecord); } } catch (Throwable ex) { log.error("消费异常", ex); try { // ========== 核心步骤3:异常处理 ========== messageQueueIdempotentHandler.delMessageProcessed(keys); // 删除幂等标识,允许重试 } catch (Throwable remoteEx) { log.error("删除幂等标识失败", remoteEx); } throw ex; // 抛出异常,触发RocketMQ重试 } // ========== 核心步骤4:标记完成 ========== messageQueueIdempotentHandler.setAccomplish(keys); // 标记消息处理完成 }- 关键操作:
- 幂等校验:通过
MessageQueueIdempotentHandler确保消息只被处理一次。 - 执行业务逻辑:调用
actualSaveShortLinkStats方法,将统计数据入库。 - 异常处理:
- 消费失败时,必须删除幂等标识,否则消息将无法被重试。
- 抛出异常,触发 RocketMQ 的重试机制。
- 标记完成:消费成功后,标记消息为 “已完成”,防止后续重复消费。
- 幂等校验:通过
3. 实际入库逻辑 (actualSaveShortLinkStats)
java
运行
public void actualSaveShortLinkStats(String fullShortUrl, String gid, ShortLinkStatsRecordDTO statsRecord) { // ========== 核心步骤1:加分布式读锁 ========== RReadWriteLock readWriteLock = redissonClient.getReadWriteLock(String.format(LOCK_GID_UPDATE_KEY, fullShortUrl)); RLock rLock = readWriteLock.readLock(); rLock.lock(); // 加锁 try { // 1. 补全GID(如果生产者未传入) if (StrUtil.isBlank(gid)) { ShortLinkGotoDO shortLinkGotoDO = shortLinkGotoMapper.selectOne(Wrappers.lambdaQuery(ShortLinkGotoDO.class) .eq(ShortLinkGotoDO::getFullShortUrl, fullShortUrl)); gid = shortLinkGotoDO.getGid(); } // 2. 解析时间维度(小时、星期) int hour = DateUtil.hour(new Date(), true); int weekValue = DateUtil.dayOfWeekEnum(new Date()).getIso8601Value(); // ========== 核心步骤2:多维度统计入库 ========== // a. PV/UV/UIP统计 LinkAccessStatsDO linkAccessStatsDO = LinkAccessStatsDO.builder()...build(); linkAccessStatsMapper.shortLinkStats(linkAccessStatsDO); // 自定义的增量更新方法 // b. 地域统计(调用高德API) Map<String, Object> localeParamMap = new HashMap<>(); localeParamMap.put("key", statsLocaleAmapKey); localeParamMap.put("ip", statsRecord.getRemoteAddr()); String localeResultStr = HttpUtil.get(AMAP_REMOTE_URL, localeParamMap); // ... 解析结果并入库 ... // c. 操作系统、浏览器、设备、网络等统计(类似) LinkOsStatsDO linkOsStatsDO = LinkOsStatsDO.builder()...build(); linkOsStatsMapper.shortLinkOsState(linkOsStatsDO); // ... // d. 原始访问日志 LinkAccessLogsDO linkAccessLogsDO = LinkAccessLogsDO.builder()...build(); linkAccessLogsMapper.insert(linkAccessLogsDO); // e. 更新短链接核心表的总统计 shortLinkMapper.incrementStats(gid, fullShortUrl, 1, ...); // f. 今日统计 LinkStatsTodayDO linkStatsTodayDO = LinkStatsTodayDO.builder()...build(); linkStatsTodayMapper.shortLinkTodayState(linkStatsTodayDO); } catch (Throwable ex) { log.error("统计入库异常", ex); } finally { // ========== 核心步骤3:释放锁 ========== rLock.unlock(); // 最终释放锁,避免死锁 } }- 关键操作:
- 加分布式读锁:
- 锁的 Key:
LOCK_GID_UPDATE_KEY + fullShortUrl,保证锁的粒度是单个短链接,避免全局锁。 - 读锁 (Read Lock):
- 允许多个读操作并发执行(多个统计线程可以同时处理同一个短链接)。
- 阻塞写操作(修改 GID 的操作),保证在统计过程中 GID 不会被修改。
- 锁的 Key:
- 多维度统计入库:
- 增量更新:大部分统计表(如
link_access_stats)使用自定义的shortLinkStats方法,实现 “不存在则插入,存在则更新(累加)” 的逻辑,避免全量更新的性能问题。 - 原始日志:
link_access_logs表直接插入原始访问记录,用于后续的明细查询和数据分析。
- 增量更新:大部分统计表(如
- 释放锁:在
finally块中释放锁,确保无论代码是否异常,锁都能被释放,防止死锁。
- 加分布式读锁:
4. 幂等处理器 (MessageQueueIdempotentHandler)
基于 Redis 的SETNX(setIfAbsent)命令实现,保证分布式环境下的原子性。
| 方法名 | 作用 | Redis Key | Value | 核心逻辑 |
|---|---|---|---|---|
isMessageProcessed | 判断消息是否可被处理 | short-link:idempotent:{keys} | 0 | 使用setIfAbsent尝试设置 Key。-true:Key 不存在,消息可处理,设置 Value 为0(处理中)。-false:Key 已存在,消息不可处理。 |
isAccomplish | 判断消息是否处理完成 | short-link:idempotent:{keys} | 1 | 检查 Key 对应的 Value 是否为1。 |
setAccomplish | 标记消息处理完成 | short-link:idempotent:{keys} | 1 | 将 Value 设置为1,并设置过期时间。 |
delMessageProcessed | 删除幂等标识 | short-link:idempotent:{keys} | - | 删除 Key,允许消息被重试。 |
- 核心思想:先占坑,后处理。
- 处理前,用
SETNX占坑(Value=0)。 - 处理中,其他线程看到坑被占,要么等待要么拒绝。
- 处理成功,将坑标记为完成(
Value=1)。 - 处理失败,把坑让出来(删除 Key)。
- 处理前,用
四、为什么选择 RocketMQ?
- 异步解耦:将耗时的统计入库操作从同步的跳转流程中剥离出来,极大提升了核心接口的响应速度。
- 削峰填谷:面对突发的高并发访问,RocketMQ 可以缓冲大量消息,避免直接冲击数据库,保证系统稳定。
- 消息可靠性:
- 生产者同步发送:确保消息至少被 Broker 接收一次。
- Broker 持久化:消息存储在磁盘,即使 Broker 宕机,消息也不会丢失。
- 消费者重试机制:消费失败时,RocketMQ 会自动重试,保证消息最终被处理。
- 负载均衡:RocketMQ 的消费者组(Consumer Group)机制,可以轻松实现多个消费者实例共同消费一个 Topic 的消息,提高处理能力。
- 可扩展性:
- 水平扩展:可以通过增加 Broker 节点和消费者实例来提升系统的吞吐量。
- 功能扩展:RocketMQ 支持定时消息、事务消息等高级特性,便于未来功能扩展。
五、核心技术亮点与设计模式
- 读写锁分离:
- 使用 Redisson 的
RReadWriteLock,统计入库时加读锁,修改 GID 时加写锁。 - 好处:允许多个统计操作并发执行,同时保证 GID 不被并发修改,兼顾了性能和数据一致性。
- 使用 Redisson 的
- 幂等性设计:
- 基于 Redis 的
SETNX命令,是分布式系统中实现幂等的经典方案。 - 好处:有效防止了因网络抖动或 MQ 重试导致的重复消费问题,保证了统计数据的准确性。
- 基于 Redis 的
- 增量更新:
- 统计表的
INSERT OR UPDATE操作(如linkAccessStatsMapper.shortLinkStats)。 - 好处:相比先查询后更新,减少了一次数据库交互,提升了入库性能。
- 统计表的
- 最小锁粒度:
- 锁的 Key 是
fullShortUrl,而不是全局锁。 - 好处:只对同一个短链接的操作进行同步,不同短链接的操作互不影响,最大化了并发性能。
- 锁的 Key 是
- 异常处理与重试:
- 消费失败时,删除幂等标识并抛出异常,触发 MQ 重试。
- 好处:保证了消息的最终一致性,即使中间环节出错,数据也不会丢失。
六、总结
这套短链接统计系统是一个高性能、高可用、高并发的分布式系统设计典范。它巧妙地运用了 RocketMQ 实现异步解耦和削峰填谷,通过 Redis 实现了分布式锁和幂等性保证,最终达到了 “用户访问快、统计数据准、系统运行稳” 的目标。
核心设计思想可以概括为:将同步操作异步化,将串行操作并行化,在性能和数据一致性之间找到最佳平衡点。