mqtt-plus 架构解析(三):Payload 序列化与反序列化,为什么要拆成两条链

张开发
2026/4/13 5:07:15 15 分钟阅读

分享文章

mqtt-plus 架构解析(三):Payload 序列化与反序列化,为什么要拆成两条链
mqtt-plus 架构解析三Payload 序列化与反序列化为什么要拆成两条链摘要在 MQTT 场景里入站和出站看起来都在做“payload 转换”但它们其实不是同一个问题。本文围绕PayloadConverter、PayloadSerializer、DefaultMqttTemplate和 starter 中的内置实现解释 mqtt-plus 为什么把反序列化和序列化拆成两条独立链以及这种拆分对扩展性、默认行为和工程边界到底意味着什么。项目地址项目地址https://github.com/mqttplus/mqtt-plus配套的示例工程https://github.com/mqttplus/mqtt-plus-examples如果你对这个方向感兴趣欢迎关注、试用也欢迎一起交流 issue 和 PR。如果这篇文章对你有帮助欢迎点赞、收藏也欢迎给项目一个 Star。只要开始在业务里发和收结构化消息payload 转换几乎一定会变成一个框架问题。收消息时你关心的是 bytes 怎么变成String、byte[]或某个业务对象发消息时你关心的是String、byte[]或某个 Java 对象怎么变成最终上网线的 bytes。表面上看这像是同一个问题的两个方向但一旦进入框架设计差别会非常明显入站是按 listener 的payloadType决定怎么转出站是按当前 publish 的对象类型决定怎么序列化入站发生在路由链内部出站发生在MqttTemplate往 adapter 下发之前也正因为如此mqtt-plus 最后没有做“一个通用 payload 接口包打天下”而是走向了两条链。一、这篇文章到底想回答什么这一篇只回答三个问题为什么PayloadConverter和PayloadSerializer不能合成一个接口mqtt-plus 的入站链和出站链分别发生在什么位置、解决什么问题为什么 Jackson 这类具体序列化能力放在 starter 层而不是 core 层如果只先记住一句话可以记这句在 mqtt-plus 里“bytes - 目标参数类型”和“对象 - byte[]”不是同一种抽象它们触发的时机、依赖的上下文和扩展方式都不一样所以应该拆成两条链而不是勉强统一成一个接口。这条边界一旦接受后面很多设计就会顺下来包括为什么DefaultMqttTemplate要接管出站序列化以及为什么 core 层可以始终不直接依赖 JSON 实现。二、为什么不是一个接口搞定所有 payload 转换如果只看名字PayloadConverter和PayloadSerializer很像但从职责角度看它们处理的是两个完全不同的方向PayloadConverter解决的是入站反序列化byte[] - targetTypePayloadSerializer解决的是出站序列化sourceType - byte[]这两个方向的最大区别不在“一个进来一个出去”而在它们依赖的信息不同。入站反序列化时框架已经通过路由找到了某个MqttListenerDefinition因此知道这个 listener 希望收到的payloadType是什么。换句话说入站链是“目标类型驱动”的。出站序列化时框架根本没有 listener 上下文它只拿到了MqttTemplate.publish(brokerId, topic, payload)里的当前对象因此只能根据payload.getClass()去决定怎么序列化。也就是说出站链是“源类型驱动”的。出站序列化链Java Object / String / byte[]PayloadSerializer.supports(sourceType)?serialize(payload)byte[]Adapter.publish(...)入站反序列化链MQTT payload bytesPayloadConverter.supports(targetType)?convert(payload, targetType)Listener Method Argument这张图最想表达的其实是入站和出站并不共享同一份上下文。入站知道“我要变成谁”出站只知道“我现在是谁”一旦把这两件事硬塞进一个接口里接口签名会开始别扭调用时机也会混乱。与其造一个看似统一、实际语义很杂的抽象不如从一开始就承认它们是两类问题。三、入站链PayloadConverter 是怎么工作的PayloadConverter的接口很简单publicinterfacePayloadConverter{booleansupports(Class?targetType);Objectconvert(byte[]payload,Class?targetType);}这个接口的关键不是方法少而是它把入站链的判断方式钉得很死按目标类型匹配。也就是说框架先知道某个 listener 期望的payloadType然后遍历 converter 链找第一个supports(targetType)的实现来完成转换。这个顺序在第 2 篇里其实已经埋了伏笔DefaultMqttMessageRouter是先 resolve 到 listener再在循环中执行convertPayload(payload, definition.getPayloadType())。当前 starter 里默认提供了 3 类入站转换实现ByteArrayPayloadConverter目标类型是byte[]时直接返回 cloneStringPayloadConverter目标类型是String时按字符串解码JacksonPayloadConverter除String和byte[]外的其他类型交给ObjectMapper.readValue(...)从这里可以看出入站链本质上是一个“按 targetType 逐个试”的责任链。它不是拿到 bytes 后先猜一种格式去解再想办法塞给所有 listener而是每个 listener 都带着自己的payloadType进入转换链。这也解释了为什么在第 2 篇里同一条消息可以同时被String、byte[]和 POJO listener 消费。因为入站链从一开始就不是“消息级别统一反序列化”而是“listener 级别独立反序列化”。四、出站链PayloadSerializer 为什么要放到 DefaultMqttTemplate 里v1.1.0 之前mqtt-plus 的 publish 侧本质上只把对象往 adapter 层传adapter 再决定怎么处理。这样做的一个直接问题是如果 payload 是一个 POJO底层很容易退化成String.valueOf(payload)也就是发出去的不是结构化 JSON而是对象的toString()。PayloadSerializer的引入就是为了把这件事往上提一层统一在DefaultMqttTemplate里解决。接口本身也很简单publicinterfacePayloadSerializer{booleansupports(Class?sourceType);byte[]serialize(Objectpayload);}它和PayloadConverter的不同点非常明显判断条件不再是targetType而是sourceType。因为 publish 侧没有 listener 上下文框架只能根据当前对象的类型去选序列化器。真正的关键逻辑在DefaultMqttTemplate.serializePayload(...)里顺序大致是这样的payload null发出 UTF-8 bytes 的nullpayload instanceof byte[]直接原样返回payload instanceof String按 UTF-8 编码遍历PayloadSerializer链找第一个supports(payload.getClass())的实现如果都不支持最后退回String.valueOf(payload).getBytes(...)YesNoYesNoYesNoYesNoOutgoing payloadpayload null ?UTF-8 bytes ofull\byte[] ?Return as-isString ?UTF-8 encodeFirst PayloadSerializer supports(sourceType)?serializer.serialize(payload)Fallback to String.valueOf(...)这张图想强调的是出站链不是 adapter 的工作而是 template 的工作。这样做的好处很直接所有 adapter 看到的都是统一的byte[]Paho 和 Spring Integration 不需要各自维护一套 publish 侧序列化逻辑扩展 JSON、Protobuf、Avro 这类格式时扩展点稳定且一致设计决策mqtt-plus 把 publish 侧序列化放在DefaultMqttTemplate而不是下沉到 adapter 层。这样做的核心目的是让所有 adapter 都工作在统一的byte[]语义上避免“不同 adapter 各自决定对象怎么变 bytes”的分裂行为。五、starter 为什么内置 3 组实现而且 Jackson 是条件装配的看 starter 的默认实现其实很能说明这套设计的边界感。默认内置的 3 组实现分别是方向类名supports 逻辑处理方式入站ByteArrayPayloadConverterbyte[].class返回 clone入站StringPayloadConverterString.classbytes 转字符串入站JacksonPayloadConverter非String/ 非byte[]ObjectMapper.readValue(...)出站ByteArrayPayloadSerializerbyte[].class原样返回出站StringPayloadSerializerString.classUTF-8 编码出站JacksonPayloadSerializer非byte[]/ 非StringObjectMapper.writeValueAsBytes(...)这里最值得注意的不是“默认支持了 JSON”而是Jackson 的支持没有进入 core而是停留在 starter 的条件装配里。在MqttPlusAutoConfiguration里payloadConverters()会先注册 byte[] 和 String converter如果 classpath 里存在 Jackson再动态补JacksonPayloadConverterpayloadSerializerChain()也是同样的思路byte[]、String 一定有Jackson 序列化器只有在存在时才加入链路这意味着 mqtt-plus 的立场非常明确core 层定义的是“可以扩展 payload 转换”这个能力starter 层才决定“在 Spring Boot 默认环境下给你哪些内置实现”这样做的一个非常现实的好处是core 不必因为“大家都爱 JSON”而直接引入 JSON 依赖。框架对 JSON 友好但不被 JSON 绑死。设计决策Jackson 这类具体序列化能力放在 starter 层做条件装配而不是直接放进 core。这样 core 才能保持稳定、最小依赖而 Spring Boot 用户又能在默认情况下拿到足够实用的 JSON 体验。六、这套双链设计给扩展带来了什么一旦入站链和出站链分开扩展方式就会清晰很多。如果你想支持一种新的消息格式比如 Protobuf 或 Avro本质上有三种可能只关心入站那就实现一个PayloadConverter只关心出站那就实现一个PayloadSerializer两边都要那就各实现一个分别接到两条链上这比“写一个万能转换器”更可控因为你不会被迫同时处理自己并不需要的方向。很多业务场景里本来就只需要其中一边。另外starter 在构造mqttPlusPayloadSerializerChain时还把内置和自定义实现区分开了用户自定义 serializer 先加入链内置 serializer 再按固定顺序补到后面Jackson 仍然保持条件存在这会让扩展语义非常稳定如果你明确注册了自己的 serializer框架会优先尊重你而不是突然被内置 Jackson 抢走控制权。从框架维护角度看这也是双链设计的另一个价值每条链的默认行为、优先级和扩展点都可以单独演进而不用担心“动一个方向另一个方向一起被带歪”。七、小结这一篇真正想说明的不是“mqtt-plus 多了一个PayloadSerializer接口”而是payload 转换在框架里本来就应该分成两种问题。可以把结论压缩成这几条PayloadConverter解决的是入站byte[] - targetType它依赖 listener 的payloadType。PayloadSerializer解决的是出站sourceType - byte[]它依赖当前 publish 对象的类型。DefaultMqttTemplate接管 publish 侧序列化后adapter 层终于可以统一只处理byte[]语义。starter 提供 byte[]、String、Jackson 三组默认实现但把 Jackson 约束在条件装配层不让 core 被具体格式依赖拖住。这种双链设计看起来比“一个接口通吃”更啰嗦但它换来了更清晰的责任边界和更稳定的扩展模型。下一篇会继续沿着这条链往外扩进入另一个很适合做框架扩展、但也最容易做乱的点MqttMessageInterceptor到底应该如何设计成一条稳定的拦截器链。系列导航本文是mqtt-plus 架构解析系列的第 3/10 篇。#主题链接1总览分层架构与设计哲学链接2消息路由一条 MQTT 消息如何到达你的MqttListener链接3Payload 序列化与反序列化双链设计的取舍本文4拦截器链MqttMessageInterceptor的扩展点设计链接5错误处理ErrorAction聚合策略的设计逻辑链接6多 Broker 管理如何让一个应用同时连接多个 MQTT 服务链接7动态订阅与重连恢复Reconciler的协调机制链接8Spring Boot 自动装配零件是怎么被粘合起来的链接9测试体系MqttTestTemplate与EmbeddedBroker的设计链接10从内部项目到开源框架mqtt-plus 的抽取过程与决策链接上一篇消息路由一条 MQTT 消息如何到达你的MqttListener下一篇拦截器链MqttMessageInterceptor的扩展点设计

更多文章