定州市网站建设_网站建设公司_RESTful_seo优化
2025/12/29 1:55:09 网站建设 项目流程

这是一份关于 “短链接访问统计系统”(基于 RocketMQ)的笔记,整合了我们之前讨论的所有核心知识点、代码逻辑、设计思想和技术细节,方便你系统复习和查阅。


短链接访问统计系统(基于 RocketMQ)笔记

一、系统核心目标
  • 核心功能:记录短链接的每一次访问,并进行多维度的统计分析(PV/UV/UIP、地域、设备、浏览器、操作系统等)。
  • 核心挑战
    1. 高性能:短链接跳转是核心入口,必须保证用户访问速度快。
    2. 高并发:可能面临瞬间大量的访问请求(如热点链接)。
    3. 数据可靠:每一次访问的统计数据都不能丢失,也不能重复计算。
    4. 系统解耦:统计功能不能影响核心的跳转功能。
二、技术架构与核心组件

整个系统采用生产者 - 消费者(Producer-Consumer)模式,核心组件如下:

  1. 生产者(Producer)ShortLinkStatsSaveProducer

    • 角色:在短链接被访问时,负责收集访问数据并发送到消息队列。
    • 核心任务:将同步的统计入库操作,转化为异步的消息发送。
  2. 消息队列(Message Queue):RocketMQ

    • 角色:作为生产者和消费者之间的 “桥梁”,存储和转发消息。
    • 核心任务:解耦、削峰填谷、保证消息可靠传输。
  3. 消费者(Consumer)ShortLinkStatsSaveConsumer

    • 角色:监听消息队列,消费统计消息,并将数据持久化到数据库。
    • 核心任务:执行耗时的统计入库操作,保证数据最终一致性。
  4. 幂等处理器(Idempotent Handler)MessageQueueIdempotentHandler

    • 角色:基于 Redis 实现,防止同一条消息被重复消费,导致统计数据重复。
    • 核心任务:保证消费的幂等性。
  5. 分布式锁(Distributed Lock):Redisson RReadWriteLock

    • 角色:在消费者入库时,保证并发场景下数据的一致性。
    • 核心任务:防止在统计过程中,短链接的 GID 被修改,导致数据归属错误。
三、核心流程详解
1. 生产者流程 (ShortLinkStatsSaveProducer)

java

运行

public void send(Map<String, String> producerMap) { // 1. 生成唯一的消息Key(UUID),用于幂等性保证 String keys = UUID.randomUUID().toString(); producerMap.put("keys", keys); // 2. 构建RocketMQ消息,设置消息体和消息头 Message<Map<String, String>> build = MessageBuilder .withPayload(producerMap) // 消息体:包含统计数据 .setHeader(MessageConst.PROPERTY_KEYS, keys) // 消息头:设置消息Key .build(); try { // 3. 同步发送消息到指定的Topic SendResult sendResult = rocketMQTemplate.syncSend(statsSaveTopic, build, 2000L); log.info("消息发送成功,ID: {}, Keys: {}", sendResult.getMsgId(), keys); } catch (Throwable ex) { log.error("消息发送失败", ex); // 可扩展:发送失败后的重试或告警逻辑 } }
  • 关键操作
    • 生成消息 Key:使用UUID.randomUUID(),确保每条消息的唯一性,是实现幂等的基础。
    • 同步发送 (syncSend):最可靠的发送方式。生产者会等待 Broker 返回发送结果,确保消息至少被 Broker 接收一次。
    • 设置超时时间2000L(2 秒),防止生产者无限期阻塞。
2. 消费者流程 (ShortLinkStatsSaveConsumer)

java

运行

@Override public void onMessage(Map<String, String> producerMap) { String keys = producerMap.get("keys"); // ========== 核心步骤1:幂等校验 ========== if (!messageQueueIdempotentHandler.isMessageProcessed(keys)) { if (messageQueueIdempotentHandler.isAccomplish(keys)) { return; // 消息已处理完成,直接返回 } throw new ServiceException("消息处理中,需要重试"); // 触发MQ重试 } try { String fullShortUrl = producerMap.get("fullShortUrl"); if (StrUtil.isNotBlank(fullShortUrl)) { String gid = producerMap.get("gid"); ShortLinkStatsRecordDTO statsRecord = JSON.parseObject(producerMap.get("statsRecord"), ShortLinkStatsRecordDTO.class); // ========== 核心步骤2:执行业务逻辑 ========== actualSaveShortLinkStats(fullShortUrl, gid, statsRecord); } } catch (Throwable ex) { log.error("消费异常", ex); try { // ========== 核心步骤3:异常处理 ========== messageQueueIdempotentHandler.delMessageProcessed(keys); // 删除幂等标识,允许重试 } catch (Throwable remoteEx) { log.error("删除幂等标识失败", remoteEx); } throw ex; // 抛出异常,触发RocketMQ重试 } // ========== 核心步骤4:标记完成 ========== messageQueueIdempotentHandler.setAccomplish(keys); // 标记消息处理完成 }
  • 关键操作
    • 幂等校验:通过MessageQueueIdempotentHandler确保消息只被处理一次。
    • 执行业务逻辑:调用actualSaveShortLinkStats方法,将统计数据入库。
    • 异常处理
      • 消费失败时,必须删除幂等标识,否则消息将无法被重试。
      • 抛出异常,触发 RocketMQ 的重试机制。
    • 标记完成:消费成功后,标记消息为 “已完成”,防止后续重复消费。
3. 实际入库逻辑 (actualSaveShortLinkStats)

java

运行

public void actualSaveShortLinkStats(String fullShortUrl, String gid, ShortLinkStatsRecordDTO statsRecord) { // ========== 核心步骤1:加分布式读锁 ========== RReadWriteLock readWriteLock = redissonClient.getReadWriteLock(String.format(LOCK_GID_UPDATE_KEY, fullShortUrl)); RLock rLock = readWriteLock.readLock(); rLock.lock(); // 加锁 try { // 1. 补全GID(如果生产者未传入) if (StrUtil.isBlank(gid)) { ShortLinkGotoDO shortLinkGotoDO = shortLinkGotoMapper.selectOne(Wrappers.lambdaQuery(ShortLinkGotoDO.class) .eq(ShortLinkGotoDO::getFullShortUrl, fullShortUrl)); gid = shortLinkGotoDO.getGid(); } // 2. 解析时间维度(小时、星期) int hour = DateUtil.hour(new Date(), true); int weekValue = DateUtil.dayOfWeekEnum(new Date()).getIso8601Value(); // ========== 核心步骤2:多维度统计入库 ========== // a. PV/UV/UIP统计 LinkAccessStatsDO linkAccessStatsDO = LinkAccessStatsDO.builder()...build(); linkAccessStatsMapper.shortLinkStats(linkAccessStatsDO); // 自定义的增量更新方法 // b. 地域统计(调用高德API) Map<String, Object> localeParamMap = new HashMap<>(); localeParamMap.put("key", statsLocaleAmapKey); localeParamMap.put("ip", statsRecord.getRemoteAddr()); String localeResultStr = HttpUtil.get(AMAP_REMOTE_URL, localeParamMap); // ... 解析结果并入库 ... // c. 操作系统、浏览器、设备、网络等统计(类似) LinkOsStatsDO linkOsStatsDO = LinkOsStatsDO.builder()...build(); linkOsStatsMapper.shortLinkOsState(linkOsStatsDO); // ... // d. 原始访问日志 LinkAccessLogsDO linkAccessLogsDO = LinkAccessLogsDO.builder()...build(); linkAccessLogsMapper.insert(linkAccessLogsDO); // e. 更新短链接核心表的总统计 shortLinkMapper.incrementStats(gid, fullShortUrl, 1, ...); // f. 今日统计 LinkStatsTodayDO linkStatsTodayDO = LinkStatsTodayDO.builder()...build(); linkStatsTodayMapper.shortLinkTodayState(linkStatsTodayDO); } catch (Throwable ex) { log.error("统计入库异常", ex); } finally { // ========== 核心步骤3:释放锁 ========== rLock.unlock(); // 最终释放锁,避免死锁 } }
  • 关键操作
    • 加分布式读锁
      • 锁的 KeyLOCK_GID_UPDATE_KEY + fullShortUrl,保证锁的粒度是单个短链接,避免全局锁。
      • 读锁 (Read Lock)
        • 允许多个读操作并发执行(多个统计线程可以同时处理同一个短链接)。
        • 阻塞写操作(修改 GID 的操作),保证在统计过程中 GID 不会被修改。
    • 多维度统计入库
      • 增量更新:大部分统计表(如link_access_stats)使用自定义的shortLinkStats方法,实现 “不存在则插入,存在则更新(累加)” 的逻辑,避免全量更新的性能问题。
      • 原始日志link_access_logs表直接插入原始访问记录,用于后续的明细查询和数据分析。
    • 释放锁:在finally块中释放锁,确保无论代码是否异常,锁都能被释放,防止死锁。
4. 幂等处理器 (MessageQueueIdempotentHandler)

基于 Redis 的SETNXsetIfAbsent)命令实现,保证分布式环境下的原子性。

方法名作用Redis KeyValue核心逻辑
isMessageProcessed判断消息是否可被处理short-link:idempotent:{keys}0使用setIfAbsent尝试设置 Key。-true:Key 不存在,消息可处理,设置 Value 为0(处理中)。-false:Key 已存在,消息不可处理。
isAccomplish判断消息是否处理完成short-link:idempotent:{keys}1检查 Key 对应的 Value 是否为1
setAccomplish标记消息处理完成short-link:idempotent:{keys}1将 Value 设置为1,并设置过期时间。
delMessageProcessed删除幂等标识short-link:idempotent:{keys}-删除 Key,允许消息被重试。
  • 核心思想先占坑,后处理
    1. 处理前,用SETNX占坑(Value=0)。
    2. 处理中,其他线程看到坑被占,要么等待要么拒绝。
    3. 处理成功,将坑标记为完成(Value=1)。
    4. 处理失败,把坑让出来(删除 Key)。
四、为什么选择 RocketMQ?
  1. 异步解耦:将耗时的统计入库操作从同步的跳转流程中剥离出来,极大提升了核心接口的响应速度。
  2. 削峰填谷:面对突发的高并发访问,RocketMQ 可以缓冲大量消息,避免直接冲击数据库,保证系统稳定。
  3. 消息可靠性
    • 生产者同步发送:确保消息至少被 Broker 接收一次。
    • Broker 持久化:消息存储在磁盘,即使 Broker 宕机,消息也不会丢失。
    • 消费者重试机制:消费失败时,RocketMQ 会自动重试,保证消息最终被处理。
  4. 负载均衡:RocketMQ 的消费者组(Consumer Group)机制,可以轻松实现多个消费者实例共同消费一个 Topic 的消息,提高处理能力。
  5. 可扩展性
    • 水平扩展:可以通过增加 Broker 节点和消费者实例来提升系统的吞吐量。
    • 功能扩展:RocketMQ 支持定时消息、事务消息等高级特性,便于未来功能扩展。
五、核心技术亮点与设计模式
  1. 读写锁分离
    • 使用 Redisson 的RReadWriteLock,统计入库时加读锁,修改 GID 时加写锁
    • 好处:允许多个统计操作并发执行,同时保证 GID 不被并发修改,兼顾了性能和数据一致性。
  2. 幂等性设计
    • 基于 Redis 的SETNX命令,是分布式系统中实现幂等的经典方案。
    • 好处:有效防止了因网络抖动或 MQ 重试导致的重复消费问题,保证了统计数据的准确性。
  3. 增量更新
    • 统计表的INSERT OR UPDATE操作(如linkAccessStatsMapper.shortLinkStats)。
    • 好处:相比先查询后更新,减少了一次数据库交互,提升了入库性能。
  4. 最小锁粒度
    • 锁的 Key 是fullShortUrl,而不是全局锁。
    • 好处:只对同一个短链接的操作进行同步,不同短链接的操作互不影响,最大化了并发性能。
  5. 异常处理与重试
    • 消费失败时,删除幂等标识并抛出异常,触发 MQ 重试。
    • 好处:保证了消息的最终一致性,即使中间环节出错,数据也不会丢失。
六、总结

这套短链接统计系统是一个高性能、高可用、高并发的分布式系统设计典范。它巧妙地运用了 RocketMQ 实现异步解耦和削峰填谷,通过 Redis 实现了分布式锁和幂等性保证,最终达到了 “用户访问快、统计数据准、系统运行稳” 的目标。

核心设计思想可以概括为:将同步操作异步化,将串行操作并行化,在性能和数据一致性之间找到最佳平衡点。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询