消息队列

消息队列

1. 为什么会需要消息队列(MQ)
   解耦,冗余,扩展性,灵活性 & 峰值处理能力,可恢复性,顺序保证,缓冲
  • MQ常用的使用场景:
    1.进程间通讯和系统间的消息通知,比如分布式系统.
    2.解耦,如每个团队负责业务的不同模块,各个开发团队可以使用MQ来通信
    3.在一些高并发场景下,使用MQ的异步特性

  • 消息队列的特点
    Message Queue把请求的压力保存一下,逐渐释放出来,让处理者按照自己的节奏来处理。
    Message Queue引入一下新的结点,让系统的可靠性会受Message Queue结点的影响。
    Message Queue是异步单向的消息。发送消息设计成是不需要等待消息处理的完成。

常用的消息队列及使用场景

ActiveMQ

作为一种消息存储和分发组件,涉及到client和broker端数据交互的方方面面,它不仅要担保消息的
存储安全性,还要提供额外的手段确保消息的分发是可靠的.
ActiveMQ消息传送机制
Producer客户端使用来发送消息的, Consumer客户端用来消费消息;它们的协同中心就是
ActiveMQ broker,broker也是让producer和consumer调用过程解耦的工具,最终实现了异
步RPC/数据交换的功能。随着ActiveMQ的不断发展,支持了越来越多的特性,也解决开发者在
各种场景下使用ActiveMQ的需求。比如producer支持异步调用;使用flow control机制让
broker协同consumer的消费速率;consumer端可以使用prefetchACK来最大化消息消费的速率;
提供”重发策略”等来提高消息的安全性等
(插入 消息生命周期图片)
一条消息从producer端发出之后,一旦被broker正确保存,那么它将会被consumer消费,
然后ACK,broker端才会删除;不过当消息过期或者存储设备溢出时,也会终结它。

Jms

JMS是一个用于提供消息服务的技术规范,它制定了在整个消息服务提供过程中的所有数据结构
和交互流程。而MQ则是消息队列服务,是面向消息中间件(MOM)的最终实现,是真正的服务提
供者;MQ的实现可以基于JMS,也可以基于其他规范或标准。

JMS支持两种消息传递模型:
    点对点(point-to-point,简称PTP)和发布/订阅(publish/subscribe,简称pub/sub)

    PTP消息传递模型规定了一条消息只能够传递给一个接收方
    Pub/sub消息传递模型允许一条消息传递给多个接收方
点对点模型
通过点对点的消息传递模型,一个应用程序可以向另外一个应用程序发送消息。在此传递模型中,
目标类型是队列。消息首先被传送至队列目标,然后从该队列将消息传送至对此队列进行监听的
某个消费者.
(插入 点对点图片)
一个队列可以关联多个队列发送方和接收方,但一条消息仅传递给一个接收方果多个接收方正在
监听队列上的消息,JMS Provider将根据“先来者优先”的原则确定由哪个接收方接受下一条消
息。如果没有接收方在监听队列,消息将保留在队列中,直至接收方连接到队列为止。这种消息
传递模型是传统意义上的拉模型或轮询模型。在此列模型中,消息不时自动推动给客户端的,而
是要由客户端从队列中请求获得。

######点对点模型的代码(springboot+jms+activemq)

@Service("queueproducer")
public class QueueProducer {
@Autowired // 也可以注入JmsTemplate,JmsMessagingTemplate对JmsTemplate进行了封装
private JmsMessagingTemplate jmsMessagingTemplate;
//发送消息,destination是发送到的队列,message是待发送的消息
@Scheduled(fixedDelay=3000)//每3s执行1次
public void sendMessage(Destination destination, final String message){
   jmsMessagingTemplate.convertAndSend(destination, message);
}
 @JmsListener(destination="out.queue")
 public void consumerMessage(String text){
   System.out.println("从out.queue队列收到的回复报文为:"+text);
 }
}
Producer的实现
@Component public class QueueConsumer2 { // 使用JmsListener配置消费者监听的队列,其中text是接收到的消息
@JmsListener(destination = "mytest.queue") //SendTo 该注解的意思是将return回的值,再发送的"out.queue"队列中
@SendTo("out.queue") public String receiveQueue(String text) {
  System.out.println("QueueConsumer2收到的报文为:"+text);
  return "return message "+text;
 }
}
Consumer的实现
@RunWith(SpringRunner.class)
@SpringBootTest
public class ActivemqQueueTests {
  @Autowired
  private QueueProducer producer;
  @Test
  public void contextLoads() throws InterruptedException {
    Destination destination = new ActiveMQQueue("mytest.queue");
    for(int i=0; i<10; i++){
      producer.sendMessage(destination, "myname is Flytiger" + i);
    }
  }
}
发布/订阅模型
通过发布/订阅消息传递模型,应用程序能够将一条消息发送到多个接收方。在此传送模型中,
目标类型是主题。消息首先被传送至主题目标,然后传送至所有已订阅此主题的或送消费者
(插入 发布或订阅图片)
在该模型中,消息会自动广播,消费者无须通过主动请求或轮询主题的方法来获得新的消息。

发布/订阅模型的代码(springboot+jms+activemq)实现如下:

@Service("topicproducer")
public class TopicProducer {
    @Autowired // 也可以注入JmsTemplate,JmsMessagingTemplate对JmsTemplate进行了封装
    private JmsMessagingTemplate jmsMessagingTemplate;
    // 发送消息,destination是发送到的队列,message是待发送的消息
    @Scheduled(fixedDelay=3000)//每3s执行1次
    public void sendMessage(Destination destination, final String message){
        jmsMessagingTemplate.convertAndSend(destination, message);
    }
}

Producer的实现

@Component
public class TopicConsumer2 {
    // 使用JmsListener配置消费者监听的队列,其中text是接收到的消息
    @JmsListener(destination = "mytest.topic")
    public void receiveTopic(String text) {
        System.out.println("TopicConsumer2收到的topic报文为:"+text);
    }
}

Consumer的实现

@RunWith(SpringRunner.class)
@SpringBootTest
public class ActivemqTopicTests {
   @Autowired
   private TopicProducer producer;
   @Test
   public void contextLoads() throws InterruptedException {
      Destination destination = new ActiveMQTopic("mytest.topic");
      for(int i=0; i<3; i++){
         producer.sendMessage(destination, "myname is TopicFlytiger" + i);
      }
   }
}
ActiveMQ优缺点
优点:是一个快速的开源消息组件(框架),支持集群,同等网络,自动检测,TCP,SSL,广播,
持久化,XA,和J2EE1.4容器无缝结合,并且支持轻量级容器和大多数跨语言客户端上的Java
虚拟机。消息异步接受,减少软件多系统集成的耦合度。消息可靠接收,确保消息在中间件可靠
保存,多个消息也可以组成原子事务。
缺点:ActiveMQ默认的配置性能偏低,需要优化配置,但是配置文件复杂,ActiveMQ本身不提
供管理工具;示例代码少;主页上的文档看上去比较全面,但是缺乏一种有效的组织方式,文档
只有片段,用户很难由浅入深进行了解,二、文档整体的专业性太强。在研究阶段可以通过查
maillist、看Javadoc、分析源代码来了解。

RabbitMQ

多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,
而不是每个消费者都收到所有的消息并处理。这种分发方式叫做round-robin(循环的方式)。
当publisher将消息发给queue的过程中,publisher会指明routing key。Direct模式中,
Direct Exchange 根据 Routing Key 进行精确匹配,只有对应的 Message Queue 会接受
到消息。Topic模式中Exchange会根据routing key和bindkey进行模式匹配,决定将消息发
送到哪个queue中。


topic和direct只是publisher用来选择发到不同的queue,不是consumer接收消息。一个队
列一个消息只能发送给一个消费者,不然消费者的ack也会有很多,RabbitMQ Server也不好处理
RabbitMQ的消息确认
默认情况下,如果Message 已经被某个Consumer正确的接收到了,那么该Message就会被从
queue中移除。当然也可以让同一个Message发送到很多的Consumer。
如果一个queue没被任何的Consumer Subscribe(订阅),那么,如果这个queue有数据到达,
那么这个数据会被cache,不会被丢弃。当有Consumer时,这个数据会被立即发送到这个
Consumer,这个数据被Consumer正确收到时,这个数据就被从queue中删除。
RabbitMQ高可用方案
  • 普通模式(默认)

  • 镜像模式
    普通模式的基础上,把需要的队列做成镜像队列,存在于多个节点来实现高可用(HA)

RabbitMQ功能测试

本次测试依然是RabbitMQ+springboot,首先需要application.properties

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

这里的端口是5672,,15672时管理端的端口。
pom要添加依赖:

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Direct模型

Sender的实现:

@Component
public class Sender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String msg) {
        this.rabbitTemplate.convertAndSend("tiger", msg);
    }
}

Listener和listener2的实现均如下:

@Configuration
@RabbitListener(queues = "tiger")
public class Listener {
    private static final Logger LOGGER = LoggerFactory.getLogger(Listener.class);
    @Bean
    public Queue fooQueue() {
        return new Queue("tiger");
    }
    @RabbitHandler
    public void process(@Payload String foo) {
        LOGGER.info("Listener: " + foo);
    }
}

此时多次发送消息时,listener和listener2会按顺序分别收到消息。Listener收到的消息如下:

com.example.rabbitmq.Listener            : Listener: this is a test
com.example.rabbitmq.Listener2           : Listener2: this is a test
com.example.rabbitmq.Listener            : Listener: this is a test
com.example.rabbitmq.Listener2           : Listener2: this is a test
com.example.rabbitmq.Listener            : Listener: this is a test
com.example.rabbitmq.Listener2           : Listener2: this is a test
com.example.rabbitmq.Listener            : Listener: this is a test
com.example.rabbitmq.Listener2           : Listener2: this is a test

Topic模型

Sender的实现:

@Component
public class SenderTopic {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /*queue的key,用于和routing key 根据binding模式匹配*/
    @Bean(name="message")
    public Queue queueMessage() {
        return new Queue("topic.message");
    }

    @Bean(name="messages")
    public Queue queueMessages() {
        return new Queue("topic.messages");
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("exchange");
    }

    /*设置binding key,此时所有发送到这个exchange的消息,
      exchange都会根据routing key将消息与@Qualifier定义的queue进行匹配*/
    @Bean
    Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//*表示一个词,#表示零个或多个词
    }

    public void send(String routingKey, String msg) {
        this.rabbitTemplate.convertAndSend("exchange",routingKey, msg);
    }
}

Listener的实现如下:

@Configuration
@RabbitListener(queues = "topic.message")//监听器监听指定的Queue
public class ListenerTopic {

    private static final Logger LOGGER = LoggerFactory.getLogger(ListenerTopic.class);

    @RabbitHandler
    public void process(@Payload String foo) {
        LOGGER.info("Listener: " + foo);
    }
}

listener2的实现如下

@Configuration
@RabbitListener(queues = "topic.messages")
public class ListenerTopic2 {

    private static final Logger LOGGER = LoggerFactory.getLogger(ListenerTopic2.class);

    @RabbitHandler
    public void process(@Payload String foo) {
        LOGGER.info("Listener2: " + foo);
    }
}

发送topic.message会匹配到topic.#和topic.message 两个Receiver都可以收到消息,发送topic.messages(或者top、topic等)只有topic.#可以匹配所有只有Receiver2监听到消息。
Fanout模型

@Configuration
public class SenderFanout {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /*queue的key,用于和routing key 根据binding模式匹配*/
    @Bean(name="Amessage")
    public Queue AMessage() {
        return new Queue("fanout.A");
    }

    @Bean(name="Bmessage")
    public Queue BMessage() {
        return new Queue("fanout.B");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");//配置广播路由器
    }

    @Bean
    Binding bindingExchangeA(@Qualifier("Amessage") Queue AMessage,FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(@Qualifier("Bmessage") Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }

    public void send(String msg) {
        this.rabbitTemplate.convertAndSend("fanoutExchange","", msg);
    }
}

@Configuration
@RabbitListener(queues = "fanout.A")//监听器监听指定的Queue
public class ListenerFanout {

    private static final Logger LOGGER = LoggerFactory.getLogger(ListenerFanout.class);

    @RabbitHandler
    public void process(@Payload String foo) {
        LOGGER.info("Listener: " + foo);
    }
}

Kafka

Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下:

以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。
高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。
支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序传输。
同时支持离线数据处理和实时数据处理。
Scale out:支持在线水平扩展。

(插入 Kafka图片)

一个典型的Kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器
日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集
群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管
理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使
用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
Kafka代理
与其它消息系统不同,Kafka代理是无状态的。这意味着消费者必须维护已消费的状态信息。这
些信息由消费者自己维护,代理完全不管。这种设计非常微妙,它本身包含了创新。

从代理删除消息变得很棘手,因为代理并不知道消费者是否已经使用了该消息。Kafka创新性地
解决了这个问题,它将一个简单的基于时间的SLA应用于保留策略。当消息在代理中超过一定时
间后,将会被自动删除。
这种创新设计有很大的好处,消费者可以故意倒回到老的偏移量再次消费数据。这违反了队列
的常见约定,但被证明是许多消费者的基本特征。
从社区活跃度
RabbitMQ 、activeM 、ZeroMQ 三者中,综合来看,RabbitMQ 是首选。
持久化消息比较
ZeroMq 不支持,ActiveMq 和RabbitMq 都支持。持久化消息主要是指我们机器在不可抗力
因素等情况下宕机了,消息不会丢失的机制。
综合技术实现
可靠性、灵活的路由、集群、事务、高可用的队列、消息排序、问题追踪、可视化管理工具、
插件系统等等。
RabbitMq / Kafka 最好,ActiveMq 次之,ZeroMq 最差。当然ZeroMq 也可以做到,不过
自己必须手动写代码实现,代码量不小。尤其是可靠性中的:持久性、投递确认、发布者证实
和高可用性。
高并发
RabbitMQ 最高,原因是它的实现语言是天生具备高并发高可用的erlang 语言
RabbitMQ 和 Kafka
RabbitMq 比Kafka 成熟,在可用性上,稳定性上,可靠性上, RabbitMq 胜于 Kafka
(理论上)。RabbitMQ使用ProtoBuf序列化消息。极大的方便了Consumer的数据高效处理,
与XML相比,ProtoBuf有以下优势:
1.简单
2.size小了3-10倍
3.速度快了20-100倍
4.易于编程
5.减少了语义的歧义.

使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ

消息队列应用场景

异步处理,应用解耦,流量削锋和消息通讯四个场景

×

纯属好玩

扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

文章目录
  1. 1. 消息队列
    1. 1.0.1. 常用的消息队列及使用场景
      1. 1.0.1.1. ActiveMQ
        1. 1.0.1.1.0.1. ActiveMQ消息传送机制
        2. 1.0.1.1.0.2. (插入 消息生命周期图片)
    2. 1.0.1.2. Jms
      1. 1.0.1.2.0.1. 点对点模型
      2. 1.0.1.2.0.2. (插入 点对点图片)
      3. 1.0.1.2.0.3. Producer的实现
      4. 1.0.1.2.0.4. Consumer的实现
    3. 1.0.1.2.1. 发布/订阅模型
    4. 1.0.1.2.2. (插入 发布或订阅图片)
    5. 1.0.1.2.3. ActiveMQ优缺点
  • 1.0.2. RabbitMQ
    1. 1.0.2.0.1. RabbitMQ的消息确认
    2. 1.0.2.0.2. RabbitMQ高可用方案
  • 1.0.3. Direct模型
  • 1.0.4. Kafka
    1. 1.0.4.1. (插入 Kafka图片)
      1. 1.0.4.1.0.1. Kafka代理
      2. 1.0.4.1.0.2. 从社区活跃度
      3. 1.0.4.1.0.3. 持久化消息比较
      4. 1.0.4.1.0.4. 综合技术实现
      5. 1.0.4.1.0.5. 高并发
      6. 1.0.4.1.0.6. RabbitMQ 和 Kafka
  • 1.0.4.2. 消息队列应用场景
  • 本站总访问量: , 本页阅读量: