浙江省网站建设_网站建设公司_Logo设计_seo优化
2026/1/13 15:24:34 网站建设 项目流程

文章目录

    • 一、Work Queue(工作队列模式)
      • 引入依赖
      • 添加配置
      • 编写生产者代码
      • 编写消费者代码
      • 运行结果
    • 二、Publish/Subscribe(发布/订阅模式)
      • 编写生产者代码
      • 编写消费者代码
    • 消费者另一种写法
    • 三、Routing(路由模式)
      • 编写生产者代码
      • 编写消费者代码
    • 四、Topics(通配符模式)
      • 编写生产者代码
      • 编写消费者代码


Spring 官方: Spring AMQP
RabbitMQ 官方: RabbitMQ tutorial - “Hello World!” | RabbitMQ

一、Work Queue(工作队列模式)

步骤:(后面其它模式也是如此)

  1. 引入依赖

  2. 编写 yml 配置文件,基本信息配置

  3. 编写生产者代码

  4. 编写消费者代码

    1. 定义监听类,使用@RabbitListener注解完成队列监听
  5. 运行观察结果

引入依赖

<!--Spring MVC相关依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--RabbitMQ相关依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

添加配置

# 配置RabbitMQ的基本信息spring:rabbitmq:host:127.0.0.1port:5672username:lirenpassword:123123virtual-host:lirendada

编写生产者代码

常量类:

publicclassConstants{publicstaticfinalStringWORK_QUEUE="work_queue";}

然后在 config 包中声明队列:(注意包要导对~)

importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.core.QueueBuilder;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitMQConfig{@Bean("workQueue")publicQueueWorkQueue(){returnQueueBuilder.durable(Constants.WORK_QUEUE).build();}}

最后在需要发送消息的地方调用RabbitTemplate发送消息:

@RequestMapping("/producer")@RestControllerpublicclassProducerController{@AutowiredprivateRabbitTemplaterabbitTemplate;@RequestMapping("/work")publicStringwork(){for(inti=0;i<10;i++){// 使用内置交换机发送消息, routingKey和队列名称保持一致rabbitTemplate.convertAndSend("",Constants.WORK_QUEUE,"hello spring amqp: work...");}return"发送成功";}}

编写消费者代码

定义监听类,用于消费队列中的消息:(注意包要导对~)

importcom.liren.springbootrabbitmq.constant.Constants;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassWorkListener{@RabbitListener(queues=Constants.WORK_QUEUE)publicvoidworkqueue1(Messagemessage){System.out.println("workqueue1 ["+Constants.WORK_QUEUE+"]收到消息:"+message);}@RabbitListener(queues=Constants.WORK_QUEUE)publicvoidworkqueue2(Stringmessage){System.out.println("workqueue2 ["+Constants.WORK_QUEUE+"]收到消息:"+message);}}

@RabbitListener是 Spring 框架中用于监听 RabbitMQ 队列的注解,通过使用这个注解,可以定义一个方法,以便从 RabbitMQ 队列中接收消息。该注解支持多种参数类型,这些参数类型代表了从 RabbitMQ 接收到的消息和相关信息。

以下是一些常用的参数类型:

  1. String:返回消息的内容
  2. Message(org.springframework.amqp.core.Message):Spring AMQP 的Message类,返回原始的消息体以及消息的属性,如消息ID、内容、队列信息等。
  3. Channel(com.rabbitmq.client.Channel):RabbitMQ 的通道对象,可以用于进行更高级的操作,如手动确认消息。

运行结果

运行程序,然后发起请求,会有三个队列接收消息,如下所示:

管理页面中可以看到三个消费者以及一个生产者通道:

二、Publish/Subscribe(发布/订阅模式)

RabbitMQ 交换机常见三种类型:fanoutdirecttopic,不同类型有着不同的路由策略。

  1. Fanout广播策略,将消息交给所有绑定到该交换机的队列(Publish/Subscribe 模式
  2. Direct定向策略,把消息交给符合指定routing key的队列(Routing 模式
  3. Topic通配符策略,把消息交给符合routing pattern的队列(Topics 模式

编写生产者代码

常量类:

// 发布订阅模式publicstaticfinalStringFANOUT_QUEUE1="fanout.queue1";publicstaticfinalStringFANOUT_QUEUE2="fanout.queue2";publicstaticfinalStringFANOUT_EXCHANGE="fanout.exchange";

然后在 config 包中声明队列:(注意包要导对~)

// 发布订阅模式@Bean("publishConfirmQueue1")publicQueuepublishConfirmQueue1(){returnQueueBuilder.durable(Constants.FANOUT_QUEUE1).build();// 声明队列}@Bean("publishConfirmQueue2")publicQueuepublishConfirmQueue2(){returnQueueBuilder.durable(Constants.FANOUT_QUEUE2).build();// 声明队列}@Bean("fanoutExchange")publicFanoutExchangefanoutExchange(){returnExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).build();// 声明交换机}@Bean("fanoutBinding1")publicBindingfanoutBinding1(@Qualifier("publishConfirmQueue1")Queuequeue,@Qualifier("fanoutExchange")FanoutExchangeexchange){returnBindingBuilder.bind(queue).to(exchange);// 绑定交换机和队列}@Bean("fanoutBinding2")publicBindingfanoutBinding2(@Qualifier("publishConfirmQueue2")Queuequeue,@Qualifier("fanoutExchange")FanoutExchangeexchange){returnBindingBuilder.bind(queue).to(exchange);// 绑定交换机和队列}

使用接口发送消息

@RequestMapping("/fanout")publicStringfanout(){rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE,"","hello spring amqp: fanout...");return"发送成功!";}

编写消费者代码

@ComponentpublicclassFanoutListener{@RabbitListener(queues=Constants.FANOUT_QUEUE1)publicvoidfanoutQueue1(Stringmessage){System.out.println("fanoutQueue1 ["+Constants.FANOUT_QUEUE1+"]收到消息:"+message);}@RabbitListener(queues=Constants.FANOUT_QUEUE2)publicvoidfanoutQueue2(Stringmessage){System.out.println("fanoutQueue2 ["+Constants.FANOUT_QUEUE2+"]收到消息:"+message);}}

消费者另一种写法

@RabbitListener是一个功能强大的注解。这个注解里面可以配置@QueueBinding@Queue@Exchange,直接通过这个组合注解一次性搞定多个交换机、绑定、路由、并且配置监听功能等

@Slf4j@ComponentpublicclassUserRegisterListener{@RabbitListener(bindings=@QueueBinding(value=@Queue(value=Constants.USER_QUEUE_NANE,// 队列名durable="true"// 是否持久化),exchange=@Exchange(value=Constants.USER_EXCHANGE_NAME,// 交换机名type=ExchangeTypes.FANOUT// fanout 交换机)// fanout 不需要 routingKey))publicvoidMailListenerQueue(Messagemessage,Channelchannel)throwsIOException{longdeliveryTag=message.getMessageProperties().getDeliveryTag();try{// 处理用户注册消息Stringbody=newString(message.getBody());log.info("用户注册消息处理成功,deliveryTag={}, message={}",deliveryTag,body);// 发送邮件TODO// 确认消息channel.basicAck(deliveryTag,true);}catch(Exceptione){// 异常拒绝消息,进行重发channel.basicNack(deliveryTag,true,true);log.error("用户注册消息处理失败,拒绝消息,deliveryTag={}",deliveryTag,e);}}}

启动时 Spring AMQP 会做的事情,顺序大致是:

  1. QueueDeclare
    1. 声明一个 durable 队列
  2. ExchangeDeclare
    1. 声明一个 fanout 交换机
  3. QueueBind
    1. 把队列绑定到交换机

三、Routing(路由模式)

RabbitMQ 交换机常见三种类型:fanoutdirecttopic,不同类型有着不同的路由策略。

  1. Fanout广播策略,将消息交给所有绑定到该交换机的队列(Publish/Subscribe 模式
  2. Direct定向策略,把消息交给符合指定routing key的队列(Routing 模式
  3. Topic通配符策略,把消息交给符合routing pattern的队列(Topics 模式

路由模式采用的是 RabbitMQ 中的Direct定向策略,生产者发送消息的时候,交换机需要根据消息中的Routing Key将消息发送给指定的队列,而不是发给每一个队列了!

此时,队列和交换机的绑定,不能是任意的绑定了,而是要指定一个Binding Key

只有队列绑定时的Binding Key和消息中的Routing Key完全一致,队列才会接收到消息

编写生产者代码

常量类:

// 路由模式publicstaticfinalStringDIRECT_EXCHANGE="direct.exchange";publicstaticfinalStringDIRECT_QUEUE1="direct.queue1";publicstaticfinalStringDIRECT_QUEUE2="direct.queue2";

和发布订阅模式的区别是:交换机类型不同、绑定队列的Binding Key不同。

// 路由模式(direct模式)@Bean("directQueue1")publicQueuedirectQueue1(){returnQueueBuilder.durable(Constants.DIRECT_QUEUE1).build();// 声明队列}@Bean("directQueue2")publicQueuedirectQueue2(){returnQueueBuilder.durable(Constants.DIRECT_QUEUE2).build();// 声明队列}@Bean("directExchange")publicDirectExchangedirectExchange(){returnExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).build();// 声明交换机}// 队列1绑定orange@Bean("directBinding1")publicBindingdirectBinding1(@Qualifier("directQueue1")Queuequeue,@Qualifier("directExchange")DirectExchangeexchange){returnBindingBuilder.bind(queue).to(exchange).with("orange");// 绑定交换机和队列,以及bindingKey}// 队列2绑定green、black@Bean("directBinding2")publicBindingdirectBinding2(@Qualifier("directQueue2")Queuequeue,@Qualifier("directExchange")DirectExchangeexchange){returnBindingBuilder.bind(queue).to(exchange).with("green");// 绑定交换机和队列,以及bindingKey}@Bean("directBinding3")publicBindingdirectBinding3(@Qualifier("directQueue2")Queuequeue,@Qualifier("directExchange")DirectExchangeexchange){returnBindingBuilder.bind(queue).to(exchange).with("black");// 绑定交换机和队列,以及bindingKey}

使用接口发送消息:

@RequestMapping("/direct/{routing_key}")publicStringdirct(@PathVariable("routing_key")Stringrouting_key){rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE,routing_key,"hello spring amqp: direct..."+routing_key);return"发送成功!";}

编写消费者代码

@ComponentpublicclassDirectListener{@RabbitListener(queues=Constants.DIRECT_QUEUE1)publicvoiddirectQueue1(Stringmessage){System.out.println("directQueue1 ["+Constants.DIRECT_QUEUE1+"]收到消息:"+message);}@RabbitListener(queues=Constants.DIRECT_QUEUE2)publicvoiddirectQueue2(Stringmessage){System.out.println("directQueue2 ["+Constants.DIRECT_QUEUE2+"]收到消息:"+message);}}

分别请求三个不同的 routingkey,结果如下所示:

四、Topics(通配符模式)

Topics 和 Routing 模式的区别是:

  1. 交换机类型不同:Topics 模式使用的交换机类型为topic;Routing 模式使用的交换机类型为direct
  2. 匹配规则不同:topic类型的交换机在匹配规则上进行了扩展,Binding Key支持通配符匹配;direct类型的交换机路由规则是Binding KeyRouting Key完全匹配。

匹配规则有如下要求:

  1. Routing Key是一系列由点.分隔的单词,比如 “stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
  2. Binding KeyRouting Key一样,也是点.分割的字符串
  3. Binding Key中可以存在两种特殊字符串,用于模糊匹配
    1. \*:表示一个单词
    2. #:表示多个单词(0-N个)

编写生产者代码

常量类:

// 通配符模式publicstaticfinalStringTOPIC_EXCHANGE="topic.exchange";publicstaticfinalStringTOPIC_QUEUE1="topic.queue1";publicstaticfinalStringTOPIC_QUEUE2="topic.queue2";

生产者代码如下所示:

// 通配符模式(topics模式)@Bean("topicQueue1")publicQueuetopicQueue1(){returnQueueBuilder.durable(Constants.TOPIC_QUEUE1).build();// 声明队列}@Bean("topicQueue2")publicQueuetopicQueue2(){returnQueueBuilder.durable(Constants.TOPIC_QUEUE2).build();// 声明队列}@Bean("topicExchange")publicTopicExchangetopicExchange(){returnExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).build();// 声明交换机}// 队列1绑定error, 仅接收error信息@Bean("topicBinding1")publicBindingtopicBinding1(@Qualifier("topicQueue1")Queuequeue,@Qualifier("topicExchange")TopicExchangeexchange){returnBindingBuilder.bind(queue).to(exchange).with("*.error");// 绑定交换机和队列,以及bindingKey}// 队列2绑定info, error: error,info信息都接收@Bean("topicBinding2")publicBindingtopicBinding2(@Qualifier("topicQueue2")Queuequeue,@Qualifier("topicExchange")TopicExchangeexchange){returnBindingBuilder.bind(queue).to(exchange).with("*.error");// 绑定交换机和队列,以及bindingKey}@Bean("topicBinding3")publicBindingtopicBinding3(@Qualifier("topicQueue2")Queuequeue,@Qualifier("topicExchange")TopicExchangeexchange){returnBindingBuilder.bind(queue).to(exchange).with("#.info");// 绑定交换机和队列,以及bindingKey}

使用接口发送消息:

@RequestMapping("/topics/{routing_key}")publicStringtopics(@PathVariable("routing_key")Stringrouting_key){rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE,routing_key,"hello spring amqp: topics..."+routing_key);return"发送成功!";}

编写消费者代码

@ComponentpublicclassTopicListener{@RabbitListener(queues=Constants.TOPIC_QUEUE1)publicvoidtopicQueue1(Stringmessage){System.out.println("topicQueue1 ["+Constants.TOPIC_QUEUE1+"]收到消息:"+message);}@RabbitListener(queues=Constants.TOPIC_QUEUE2)publicvoidtopicQueue2(Stringmessage){System.out.println("topicQueue2 ["+Constants.TOPIC_QUEUE2+"]收到消息:"+message);}}

分别请求两个不同的请求以及参数之后,运行结果如下:

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询