以前看过的关于RabbitMQ核心消息模式的文章都是基于Java API的,最近看了下官方文档,发现这些核心消息模式都可以通过Spring AMQP来实现。于是总结了下RabbitMQ的实用技巧,包括RabbitMQ在Windows和Linux下的安装、5种核心消息模式的Spring AMQP实现,相信对于想要学习和回顾RabbitMQ的朋友都会有所帮助。
- 安装Erlang,下载地址:
- 安装RabbitMQ,下载地址:
- 安装完成后,进入RabbitMQ安装目录下的sbin目录;
- 在地址栏输入cmd并回车启动命令行,然后输入以下命令启动管理功能。
rabbitmq-plugins enable rabbitmq_management
- 下载rabbitmq 3.7.15的Docker镜像;
docker pull rabbitmq:3.7.15
- 使用Docker命令启动服务;
docker run -p 5672:5672 -p 15672:15672 --name rabbitmq \ -d rabbitmq:3.7.15
- 进入容器并开启管理功能;
docker exec -it rabbitmq /bin/bash rabbitmq-plugins enable rabbitmq_management
- 开启防火墙便于外网访问。
firewall-cmd --zone=public --add-port=15672/tcp --permanent firewall-cmd --zone=public --add-port=5672/tcp --permanent firewall-cmd --reload
- 访问RabbitMQ管理页面地址,查看是否安装成功(Linux下使用服务器IP访问即可):http://localhost:15672/
- 输入账号密码并登录,这里使用默认账号密码登录:guest guest
- 创建帐号并设置其角色为管理员:mall mall
- 创建一个新的虚拟host为:/mall
- 点击mall用户进入用户配置页面;
- 给mall用户配置该虚拟host的权限;
- 至此,RabbitMQ的配置完成。
这5种消息模式是构建基于RabbitMQ的消息应用的基础,一定要牢牢掌握它们。学过RabbitMQ的朋友应该了解过这些消息模式的Java实现,这里我们使用Spring AMQP的形式来实现它们。
Spring AMQP实现
- 首先需要在中添加Spring AMQP的相关依赖;
<!--Spring AMQP依赖--> <dependency> <groupId>org.;/groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
- 然后修改a,添加RabbitMQ的相关配置;
spring: rabbitmq: host: localhost port: 5672 virtual-host: /mall username: mall password: mall publisher-confirms: true #消息发送到交换器确认 publisher-returns: true #消息发送到队列确认
- 添加简单模式相关Java配置,创建一个名为的队列、一个生产者和一个消费者;
/** * Created by macro on 2020/5/19. */ @Configuration public class SimpleRabbitConfig { @Bean public Queue hello() { return new Queue(""); } @Bean public SimpleSender simpleSender(){ return new SimpleSender(); } @Bean public SimpleReceiver simpleReceiver(){ return new SimpleReceiver(); } }
- 生产者通过send方法向队列中发送消息;
/** * Created by macro on 2020/5/19. */ public class SimpleSender { private static final Logger LOGGER = LoggerFac); @Autowired private RabbitTemplate template; private static final String queueName=""; public void send() { String message = "Hello World!"; (queueName, message); LOGGER.info(" [x] Sent '{}'", message); } }
- 消费者从队列中获取消息;
/** * Created by macro on 2020/5/19. */ @RabbitListener(queues = "") public class SimpleReceiver { private static final Logger LOGGER = LoggerFac); @RabbitHandler public void receive(String in) { LOGGER.info(" [x] Received '{}'", in); } }
- 在controller中添加测试接口,调用该接口开始发送消息;
/** * Created by macro on 2020/5/19. */ @Api(tags = "RabbitController", description = "RabbitMQ功能测试") @Controller @RequestMapping("/rabbit") public class RabbitController { @Autowired private SimpleSender simpleSender; @ApiOperation("简单模式") @RequestMapping(value = "/simple", method = Reque) @ResponseBody public CommonResult simpleTest() { for(int i=0;i<10;i++){ (); T(1000); } return CommonRe(null); } }
- 运行后结果如下,可以发现生产者往队列中发送消息,消费者从队列中获取消息并消费。
Spring AMQP实现
- 添加工作模式相关Java配置,创建一个名为work.hello的队列、一个生产者和两个消费者;
/** * Created by macro on 2020/5/19. */ @Configuration public class WorkRabbitConfig { @Bean public Queue workQueue() { return new Queue("work.hello"); } @Bean public WorkReceiver workReceiver1() { return new WorkReceiver(1); } @Bean public WorkReceiver workReceiver2() { return new WorkReceiver(2); } @Bean public WorkSender workSender() { return new WorkSender(); } }
- 生产者通过send方法向队列work.hello中发送消息,消息中包含一定数量的.号;
/** * Created by macro on 2020/5/19. */ public class WorkSender { private static final Logger LOGGER = LoggerFac); @Autowired private RabbitTemplate template; private static final String queueName = "work.hello"; public void send(int index) { StringBuilder builder = new StringBuilder("Hello"); int limitIndex = index % 3+1; for (int i = 0; i < limitIndex; i++) { builder.append('.'); } builder.append(index+1); String message = builder.toString(); (queueName, message); LOGGER.info(" [x] Sent '{}'", message); } }
- 两个消费者从队列work.hello中获取消息,名称分别为instance 1和instance 2,消息中包含.号越多,耗时越长;
/** * Created by macro on 2020/5/19. */ @RabbitListener(queues = "work.hello") public class WorkReceiver { private static final Logger LOGGER = LoggerFac); private final int instance; public WorkReceiver(int i) { = i; } @RabbitHandler public void receive(String in) { StopWatch watch = new StopWatch(); wa(); LOGGER.info("instance {} [x] Received '{}'", , in); doWork(in); wa(); LOGGER.info("instance {} [x] Done in {}s", , wa()); } private void doWork(String in) { for (char ch : in.toCharArray()) { if (ch == '.') { T(1000); } } } }
- 在controller中添加测试接口,调用该接口开始发送消息;
/** * Created by macro on 2020/5/19. */ @Api(tags = "RabbitController", description = "RabbitMQ功能测试") @Controller @RequestMapping("/rabbit") public class RabbitController { @Autowired private WorkSender workSender; @ApiOperation("工作模式") @RequestMapping(value = "/work", method = Reque) @ResponseBody public CommonResult workTest() { for(int i=0;i<10;i++){ workSender.send(i); T(1000); } return CommonRe(null); } }
- 运行后结果如下,可以发现生产者往队列中发送包含不同数量.号的消息,instance 1和instance 2消费者互相竞争,分别消费了一部分消息。
Spring AMQP实现
- 添加发布/订阅模式相关Java配置,创建一个名为exc的交换机、一个生产者、两个消费者和两个匿名队列,将两个匿名队列都绑定到交换机;
/** * Created by macro on 2020/5/19. */ @Configuration public class FanoutRabbitConfig { @Bean public FanoutExchange fanout() { return new FanoutExchange("exc"); } @Bean public Queue fanoutQueue1() { return new AnonymousQueue(); } @Bean public Queue fanoutQueue2() { return new AnonymousQueue(); } @Bean public Binding fanoutBinding1(FanoutExchange fanout, Queue fanoutQueue1) { return BindingBuilder.bind(fanoutQueue1).to(fanout); } @Bean public Binding fanoutBinding2(FanoutExchange fanout, Queue fanoutQueue2) { return BindingBuilder.bind(fanoutQueue2).to(fanout); } @Bean public FanoutReceiver fanoutReceiver() { return new FanoutReceiver(); } @Bean public FanoutSender fanoutSender() { return new FanoutSender(); } }
- 生产者通过send方法向交换机exc中发送消息,消息中包含一定数量的.号;
/** * Created by macro on 2020/5/19. */ public class FanoutSender { private static final Logger LOGGER = LoggerFac); @Autowired private RabbitTemplate template; private static final String exchangeName = "exc"; public void send(int index) { StringBuilder builder = new StringBuilder("Hello"); int limitIndex = index % 3 + 1; for (int i = 0; i < limitIndex; i++) { builder.append('.'); } builder.append(index + 1); String message = builder.toString(); (exchangeName, "", message); LOGGER.info(" [x] Sent '{}'", message); } }
- 消费者从绑定的匿名队列中获取消息,消息中包含.号越多,耗时越长,由于该消费者可以从两个队列中获取并消费消息,可以看做两个消费者,名称分别为instance 1和instance 2;
/** * Created by macro on 2020/5/19. */ public class FanoutReceiver { private static final Logger LOGGER = LoggerFac); @RabbitListener(queues = "#{}") public void receive1(String in) { receive(in, 1); } @RabbitListener(queues = "#{}") public void receive2(String in) { receive(in, 2); } private void receive(String in, int receiver) { StopWatch watch = new StopWatch(); wa(); LOGGER.info("instance {} [x] Received '{}'", receiver, in); doWork(in); wa(); LOGGER.info("instance {} [x] Done in {}s", receiver, wa()); } private void doWork(String in) { for (char ch : in.toCharArray()) { if (ch == '.') { T(1000); } } } }
- 在controller中添加测试接口,调用该接口开始发送消息;
/** * Created by macro on 2020/5/19. */ @Api(tags = "RabbitController", description = "RabbitMQ功能测试") @Controller @RequestMapping("/rabbit") public class RabbitController { @Autowired private FanoutSender fanoutSender; @ApiOperation("发布/订阅模式") @RequestMapping(value = "/fanout", method = Reque) @ResponseBody public CommonResult fanoutTest() { for(int i=0;i<10;i++){ (i); T(1000); } return CommonRe(null); } }
- 运行后结果如下,可以发现生产者往队列中发送包含不同数量.号的消息,instance 1和instance 2同时获取并消费了消息。
Spring AMQP实现
- 添加路由模式相关Java配置,创建一个名为exc的交换机、一个生产者、两个消费者和两个匿名队列,队列通过路由键都绑定到交换机,队列1的路由键为orange和black,队列2的路由键为green和black;
/** * Created by macro on 2020/5/19. */ @Configuration public class DirectRabbitConfig { @Bean public DirectExchange direct() { return new DirectExchange("exc"); } @Bean public Queue directQueue1() { return new AnonymousQueue(); } @Bean public Queue directQueue2() { return new AnonymousQueue(); } @Bean public Binding directBinding1a(DirectExchange direct, Queue directQueue1) { return BindingBuilder.bind(directQueue1).to(direct).with("orange"); } @Bean public Binding directBinding1b(DirectExchange direct, Queue directQueue1) { return BindingBuilder.bind(directQueue1).to(direct).with("black"); } @Bean public Binding directBinding2a(DirectExchange direct, Queue directQueue2) { return BindingBuilder.bind(directQueue2).to(direct).with("green"); } @Bean public Binding directBinding2b(DirectExchange direct, Queue directQueue2) { return BindingBuilder.bind(directQueue2).to(direct).with("black"); } @Bean public DirectReceiver receiver() { return new DirectReceiver(); } @Bean public DirectSender directSender() { return new DirectSender(); } }
- 生产者通过send方法向交换机exc中发送消息,发送时使用不同的路由键,根据路由键会被转发到不同的队列;
/** * Created by macro on 2020/5/19. */ public class DirectSender { @Autowired private RabbitTemplate template; private static final String exchangeName = "exc"; private final String[] keys = {"orange", "black", "green"}; private static final Logger LOGGER = LoggerFac); public void send(int index) { StringBuilder builder = new StringBuilder("Hello to "); int limitIndex = index % 3; String key = keys[limitIndex]; builder.append(key).append(' '); builder.append(index+1); String message = builder.toString(); (exchangeName, key, message); LOGGER.info(" [x] Sent '{}'", message); } }
- 消费者从自己绑定的匿名队列中获取消息,由于该消费者可以从两个队列中获取并消费消息,可以看做两个消费者,名称分别为instance 1和instance 2;
/** * Created by macro on 2020/5/19. */ public class DirectReceiver { private static final Logger LOGGER = LoggerFac); @RabbitListener(queues = "#{direc}") public void receive1(String in){ receive(in, 1); } @RabbitListener(queues = "#{direc}") public void receive2(String in){ receive(in, 2); } private void receive(String in, int receiver){ StopWatch watch = new StopWatch(); wa(); LOGGER.info("instance {} [x] Received '{}'", receiver, in); doWork(in); wa(); LOGGER.info("instance {} [x] Done in {}s", receiver, wa()); } private void doWork(String in){ for (char ch : in.toCharArray()) { if (ch == '.') { T(1000); } } } }
- 在controller中添加测试接口,调用该接口开始发送消息;
/** * Created by macro on 2020/5/19. */ @Api(tags = "RabbitController", description = "RabbitMQ功能测试") @Controller @RequestMapping("/rabbit") public class RabbitController { @Autowired private DirectSender directSender; @ApiOperation("路由模式") @RequestMapping(value = "/direct", method = Reque) @ResponseBody public CommonResult directTest() { for(int i=0;i<10;i++){ direc(i); T(1000); } return CommonRe(null); } }
- 运行后结果如下,可以发现生产者往队列中发送包含不同路由键的消息,instance 1获取到了orange和black消息,instance 2获取到了green和black消息。
- *:只能匹配一个单词;
- #:可以匹配零个或多个单词。
Spring AMQP实现
- 添加通配符模式相关Java配置,创建一个名为exc的交换机、一个生产者、两个消费者和两个匿名队列,匹配*.orange.*和*.*.rabbit发送到队列1,匹配lazy.#发送到队列2;
/** * Created by macro on 2020/5/19. */ @Configuration public class TopicRabbitConfig { @Bean public TopicExchange topic() { return new TopicExchange("exc"); } @Bean public Queue topicQueue1() { return new AnonymousQueue(); } @Bean public Queue topicQueue2() { return new AnonymousQueue(); } @Bean public Binding topicBinding1a(TopicExchange topic, Queue topicQueue1) { return BindingBuilder.bind(topicQueue1).to(topic).with("*.orange.*"); } @Bean public Binding topicBinding1b(TopicExchange topic, Queue topicQueue1) { return BindingBuilder.bind(topicQueue1).to(topic).with("*.*.rabbit"); } @Bean public Binding topicBinding2a(TopicExchange topic, Queue topicQueue2) { return BindingBuilder.bind(topicQueue2).to(topic).with("lazy.#"); } @Bean public TopicReceiver topicReceiver() { return new TopicReceiver(); } @Bean public TopicSender topicSender() { return new TopicSender(); } }
- 生产者通过send方法向交换机exc中发送消息,消息中包含不同的路由键;
/** * Created by macro on 2020/5/19. */ public class TopicSender { @Autowired private RabbitTemplate template; private static final String exchangeName = "exc"; private static final Logger LOGGER = LoggerFac); private final String[] keys = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox", "lazy.brown.fox", "lazy.;, "quick.brown.fox"}; public void send(int index) { StringBuilder builder = new StringBuilder("Hello to "); int limitIndex = index%keys.length; String key = keys[limitIndex]; builder.append(key).append(' '); builder.append(index+1); String message = builder.toString(); (exchangeName, key, message); LOGGER.info(" [x] Sent '{}'",message); Sy(" [x] Sent '" + message + "'"); } }
- 消费者从自己绑定的匿名队列中获取消息,由于该消费者可以从两个队列中获取并消费消息,可以看做两个消费者,名称分别为instance 1和instance 2;
/** * Created by macro on 2020/5/19. */ public class TopicReceiver { private static final Logger LOGGER = LoggerFac); @RabbitListener(queues = "#{}") public void receive1(String in){ receive(in, 1); } @RabbitListener(queues = "#{}") public void receive2(String in){ receive(in, 2); } public void receive(String in, int receiver){ StopWatch watch = new StopWatch(); wa(); LOGGER.info("instance {} [x] Received '{}'", receiver, in); doWork(in); wa(); LOGGER.info("instance {} [x] Done in {}s", receiver, wa()); } private void doWork(String in){ for (char ch : in.toCharArray()) { if (ch == '.') { T(1000); } } } }
- 在controller中添加测试接口,调用该接口开始发送消息;
/** * Created by macro on 2020/5/19. */ @Api(tags = "RabbitController", description = "RabbitMQ功能测试") @Controller @RequestMapping("/rabbit") public class RabbitController { @Autowired private TopicSender topicSender; @ApiOperation("通配符模式") @RequestMapping(value = "/topic", method = Reque) @ResponseBody public CommonResult topicTest() { for(int i=0;i<10;i++){ (i); T(1000); } return CommonRe(null); } }
- 运行后结果如下,可以发现生产者往队列中发送包含不同路由键的消息,instance 1和instance 2分别获取到了匹配的消息。
作 者:macrozheng