您的位置 首页 > 数码极客

mq中msgid是如何生成的

MQ 全称为 Message Queue,即消息队列。MQ 是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。我们如何使用它在项目一般作用与那些场景,本篇 Chat 会带你对于 MQ 概念及使用从无到有的过程。

常见的消息队列有哪些?该如何选择适合自己项目的消息队列?

什么是 RabbitMQ?

消息队列(MQ)是一种应用程序对应用程序的通信方法。RabbitMQ 是使用 Erlang 语言开发的消息中间件,协议采用的是 AMQP 协议(AMQP 协议在后面会细讲)。AMQP 协议对数据对数据一致性、稳定性和可靠性有很好的支持,相对的是牺牲了其性能和吞吐量。

常见的队列?

消息队列是分布式应用间交换信息的重要组件,通过队列各个服务之间在彼此互不干涉的情况下独立处理消息。一般我们可以使用队列解决应用解耦、流量削峰、异步消息等等。目前常见的主流队列有 ActiveMQ、RabbitMQ、Kafka、RocketMQ 等。

Kafka 在于分布式架构,RabbitMQ 基于 AMQP 协议来实现,RocketMQ 采用主从结构,在事务性可靠性方面有一定保障。对于事务有要求的考虑 RabbitMQ 或者 RocketMQ,对性能要求高的可考虑 Kafka。

RabbitMQ 安装(采用 Docker 安装)

安装方式我们采用 Docker 方式,如果有对 Docker 不了解的同学强烈建议赶紧去学习下,现在在 Docker 和 K8s 这些容器技术越来越受欢迎。无论是面试和日常工作都是我们必备开发技能之一。

安装 RabbitMQ:

docker search rabbitmq

启动 RabbitMQ:

docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq

访问地址 http://linuxip:15672,这里的用户名和密码默认都是 guest。

RabbitMQ 使用及原理

可能看到这里有些同学对于 RabbitMQ 还是一知半解,这篇 git 就从无到有带你学习 RabbitMQ。这部分内容大家可以多看几遍,帮助大家更好的理解并使用 RabbitMQ。接下来带大家理解队列中的一些专业术语。

  • Broker:消息队列服务器实体。
  • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
  • Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
  • Binding:它的作用就是把 exchange 和 queue 按照路由规则绑定起来。
  • Routing Key:路由关键字,exchange 根据这个关键字进行消息投递。
  • vhost:虚拟主机,一个 broker 里可以开设多个 vhost,用作不同用户的权限分离。
  • producer:消息生产者,就是投递消息的程序。
  • consumer:消息消费者,就是接受消息的程序。
  • channel:消息通道,在客户端的每个连接里,可建立多个 channel,每个 channel 代表一个会话任务。

RabbitMQ 执行流程图(大家熟悉原理的时候可以结合看)

  • 1:生产者将消息发送给 Exchange 会指定 Routing Key 指定消息的流向。
  • 2:使用 Binding 将 Exchange 与 Queue 关联起来,生产者就可以在发送消息给 Exchange 时,通过指定 routing key 来决定消息流向哪个 Queue(Routing Key 需要与 Exchange Type 联合使用)。
  • 3:消费者通过 ba 命令(订阅)或者 ba 命令(主动获取)获取消息然后进行消费。
  • 4:消费者在消费完消息后可以通过 Ack 或者 Reject 通知服务器消息消费的结果。ack 和 Reject 不同的是 ack 通知服务器可以安全的删除该消息。ba 命令拒绝某一个消息,可以设置一个 requeue 的属性,如果为 true,则消息服务器会重传该消息给下一个订阅者;如果为 false,则会直接删除该消息。

对于其执行原理相信大家也有了一定的理解,可能会有同学现在已经有了一些问题比如:ack 还没来的及返回消息服务器就挂了怎么办、消息的重复消费等等问题。大家的这些疑问我会在最后总结部分给大家一一解答。

RabbitMQ 常用的 Exchange Type 除了执行流程图上的三种,还有 headers、system、自定义等基本不用这里不做过多介绍。

  • fanout:fanout 类型的 Exchange 路由规则非常简单,把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中。
  • direct:会把消息路由到那些 binding key 与 routing key 完全匹配的 Queue 中。
  • topic :direct 类型的 Exchange 路由规则是完全匹配 binding key 与 routing key。topic 与其相似,不过它在匹配规则上进行了扩展。topic 的 binding key 中可以存在两种特殊字符 * 与 #,用于做模糊匹配,其中 * 用于匹配一个单词,# 用于匹配多个单词(可以是零个)。
  • headers:根据发送的消息内容中的 headers 属性进行匹配。

RabbitMQ 实际代码使用

1. 引入依赖

<!--RabbitMQ --> <dependency> <groupId>org.;/groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

2. 添加配置(博主这里使用的 Nacos,所以直接可以在管理界面配置)

spring: application: name: aegis-payments rabbitmq: # RabbitMQ 的主机地址(默认为:localhost) addresses: localhost # amqp 协议端口号:5672 port: 5672 username: guest password: guest ### 消费端配置 listener: simple: retry: # false 时,没有收到消费者的 ACK,会无限投递。为 true 时,默认投递次数为 3 次 enabled: true # 设置向消费者投递消息的最大次数 max-attempts: 3 #消费端手动 ack acknowledge-mode: manual #确保消息成功发送到交换器 publisher-confirms: true #确保消息在未被队列接收时返回 publisher-returns: true

3. 配置 RabbitMQ 配置类(可以根据业务实际配置,这个只是 demo 参考)

/** * @ClassName RabbitMqConfig * @Description rabbitMq 配置类 * @Author 一叶知秋 * @Date 2019-07-27 15:05 * @Version 1.0 **/ @Slf4j @Configuration public class RabbitConfig { // 交换机 public static final String EXCHANGE_A = "exchange_topic_a"; public static final String EXCHANGE_B = "exchange_topic_b"; // 消息队列 public static final String QUEUE_TOPIC_A = ";; public static final String QUEUE_TOPIC_B = ";; // 路由(topic 规则) public static final String ROUTINGKEY_TOPIC_A = ";; public static final String ROUTINGKEY_TOPIC_B = "topic.#"; /** * 设置交换机类型 FanoutExchange: 广播,将消息分发到所有的绑定队列 HeadersExchange :通过添加键值对 key-value 匹配 DirectExchange:按照路由 Routingkey 分发指定队列 TopicExchange:topic 主题模式,多关键字匹配 */ @Bean("exchangeA") public TopicExchange exchangeA() { return new TopicExchange(EXCHANGE_A); } @Bean("exchangeB") public TopicExchange exchangeB() { return new TopicExchange(EXCHANGE_B); } /** * 获取队列 A * @return */ @Bean("queueMessageA") public Queue queueMessageA() { return new Queue(QUEUE_TOPIC_A, true); //true,队列持久 } /** * 获取队列 B * @return */ @Bean("queueMessageB") public Queue queueMessageB() { return new Queue(QUEUE_TOPIC_B, true); //true,队列持久 } /** * 消息队列 A)绑定到交换机上, 路由规则是 * 一个交换机可以绑定多个消息队列,消息通过一个交换机,可以分发到不同的队列当中去。 * @return */ @Bean Binding bindingExchangeMessageA(@Qualifier("queueMessageA") Queue queueMessage, @Qualifier("exchangeA") TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with(ROUTINGKEY_TOPIC_A); } /** * 消息队列 Bs)绑定到交换机上, 路由规则是 topic.# * 一个交换机可以绑定多个消息队列,消息通过一个交换机,可以分发到不同的队列当中去。 * @return */ @Bean Binding bindingExchangeMessageB(@Qualifier("queueMessageB")Queue queueMessage, @Qualifier("exchangeB") TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with(ROUTINGKEY_TOPIC_B); } }

4. 生产者

/** * @ProjectName: demo * @Package: com.exam * @ClassName: Product1 * @Author: 一叶知秋 * @Description:消息队列生产者测试 * @Date: 2021/7/27 14:29 * @Version: 1.0 */ @Slf4j @Component @RestController public class Product1 { @Resource RabbitTemplate rabbitTemplate; /** * 发布 匹配的消息 */ @GetMapping("/sendMessage") public void sendMsgByTopics(){ try { UserInfo info = new UserInfo(){{ setUserName("一叶知秋");setAge(18); }}; log.info("发送消息:{}", in()); rabbi, info); } catch (Exception e) { e.printStackTrace(); log.info("发送消息列队失败"); } } /** * 发布 topic.#匹配的消息 */ @GetMapping("/sendMessages") public void sendMsg2() { try { UserInfo info = new UserInfo(){{ setUserName("一叶知秋");setAge(20); }}; log.info("发送消息:{}", in()); rabbi); } catch (Exception e) { e.printStackTrace(); log.info("发送消息列队失败"); } } }

5. 消费者

/** * @ProjectName: demo * @Package: com.exam * @ClassName: Consum1 * @Author: 一叶知秋 * @Description: 模拟消费者 MQReceiverA * @Date: 2021/7/24 16:41 * @Version: 1.0 */ @Component @RabbitListener(queues = Rabbi) public class MQReceiverA { @RabbitHandler public void process(String userInfo, Channel channel, Message message) throws IOException { Sy("接收处理队列 A 当中的消息: " + userInfo); c().getDeliveryTag(), false); } } /** * @ProjectName: demo * @Package: com.exam * @ClassName: Consum1 * @Author: 一叶知秋 * @Description: MQReceiverB * @Date: 2021/7/24 16:41 * @Version: 1.0 */ @Component @RabbitListener(queues = Rabbi) public class MQReceiverB { @RabbitHandler public void process(String userInfo, Channel channel, Message message) throws IOException { Sy("接收处理队列 B 当中的消息: " + userInfo); c().getDeliveryTag(), false); } }

总结

RabbitMQ 的应用场景

服务间异步通信、顺序消费、流量削峰、定时任务。

为什么需要消息队列?

从传统的单体项目到微服务架构,各个服务之间的相互调用及依赖,消息队列的出现就是为了解决服务之间的耦合关系,控制流量资源缓存流量高峰。简单来说就是:异步处理,流量削峰及服务解耦。

如何保证生产者消息发送至 RabbitMQ 服务器? 如何保证消费者消费了消息?

发送者将信道设置为确认模式,消息在被发送到信道上的同时会包含一个唯一 id,消费者在消费消息后会携带消息 id 回调 MQ 服务器确认消息。然后 MQ 会将该条消息删除。

消费者接受消息后还会对每一条消息进行确认,只有 MQ 收到消费者确认消息请求会才会删除服务器对应的消息,当然它也会携带唯一的消息 id。

如何避免消息重复投递或重复消费?

MQ 在消息发布的同时会对每一条消息生成一个 inner-msg-id,作为去重的依据避免消息被重复投递,消息消费时一般在业务生成一条唯一 id(订单号、交易流水号等等)作为消息接收去重的依据避免消息的重复消费。

如何实现消息不丢失?(MQ 的持久化)

RabbitMQ 实现持久话的实现方式就是将消息队列等信息写到其自身的持久化日志文件中,RabbitMQ 在发布一条持久化消息后,就会对应的提交到持久化日志文件中。当消费者消费该消息后,RabbitMQ 会对该条消息标记为等待垃圾回收处理。如果 RabbitMQ 在消息消费之前重启或者宕机,当 RabbitMQ 重启后会自动重新构建队列及交换器,最后通过持久化日志文件重新发布消息到对应的队列。

责任编辑: 鲁达

1.内容基于多重复合算法人工智能语言模型创作,旨在以深度学习研究为目的传播信息知识,内容观点与本网站无关,反馈举报请
2.仅供读者参考,本网站未对该内容进行证实,对其原创性、真实性、完整性、及时性不作任何保证;
3.本站属于非营利性站点无毒无广告,请读者放心使用!

“mq中msgid是如何生成的”边界阅读