当前位置:首页阅读

RabbitMQ

RabbitMQ

消息中间件

RabbitMQ

####基本概念以及AMQP协议

RabbitMQ 是一个开源的消息代理和队列服务器,用来通过普通协议实现跨平台,跨语言共享数据,使用Erlang语言开发,基于AMQP协议。

**Erlang 有着和原生Socket一样的延迟**

AMQP 是具有特征的二进制协议,一个提供统一消息的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计

* server(broker) 接受客户端连接,实现amqp实体服务

* connection,连接,应用程序与broker的网络连接

* channel 网络信道 几乎所有操作都在channel中进行,是进行消息读写的通道,客户端可建立多个channel,每个channel代表一个会话任务。

* message 服务器和应用程序之间传送的数据,properties和body组成,properties对消息进行修饰,比如消息的优先级,延迟等高级特性,body是消息体的内容。

* virtual host (虚拟地址) 用于进行逻辑隔离,是最上层的消息路由,一个virtual host 里面可以有若干个exchange和queue,同一个virtual host里面不能有相同名称的exchange或queue

* exchange (交换机) 接收消息,根据路由键转发消息到绑定的队列

* binding exchange和queue之间的虚拟连接 binding中包含routing key

* routing key 一个路由规则 虚拟机可用它来确定如何路由一个特定消息

* queue (message queue 消息队列)保存消息并将它转发给消费者

####Exchange

(交换机) 接收消息,根据路由键转发消息到绑定的队列

* name :交换机名称

* type :交换机类型 direct,topic,fanout,headers

* durability :是否持久化,ture

* Auto Delete :当最后一个绑定的exchange上的队列删除后,是否自动删除exchange

* internal :当前exchange是否用于mq内部使用,默认为false

* Arguments :扩展参数,用于扩展amqp协议自定制使用

* Direct Exchange

* 所有发送到direct exchange的消息被转发到route key中指定的queue ps : 默认routeingKey和queueName 相同

* Topic Exchange

* 所有发送到**topic exchange**的消息被转发到所有关心route key中指定**topic**的queue

* exchange将ruoteKey和某个topic进行模糊匹配 此时队列需要绑定一个topic

* 可以使用通配符进行匹配

* # 匹配一个或者多个词

* * 匹配不多不少一个词

* Fanout Exchange

* 不处理任何的路由键,只需要简单的将队列绑定到交换机上

* 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上

* fanout exchange换发消息是最快的

##### Binding绑定

* exchange和exchange,queue之间的连接关系

* Binding 中可以包含RoutingKey或者参数

##### Queue 消息队列

* 消息队列 实际存贮消息数据

* Durability :是否持久化 Durable : 是 Transient :否

* Aoto delete : 如选yes 代表当最后一个监听被移除之后该queue会被自动删除

##### Message 消息

* 服务器和应用程序之间传送的数据

* 本质上就是一段数据,由properties和payload(body)组成

* 常用属性 : delivery mode, headers(自定义属性)

* content_type,content_encoding,priority

* correlation_id,reply_to,expiration,message_id

##### Virtual host 虚拟主机

* 虚拟地址用于进行逻辑隔离,是最上层的消息路由,

* 一个virtual host 里面可以有若干个exchange和queue,

* 同一个virtual host里面不能有相同名称的exchange或queue

#### 高级特性

* 如何保障消息的100%投递成功

* 保障消息成功发出

* 保障mq节点成功接收

* 发送端收到mq节点(Broker) 确认应答

* 完善消息补偿机制

* 可靠性投递解决方案

* 消息信心落库,对消息状态进行打标 (图1)

* 消息的延迟投递,做二次确认,回调检查 (图2)

图一

RabbitMQ_WWW.XUNWANGBA.COM

生产端-可靠性投递

图二

RabbitMQ_WWW.XUNWANGBA.COM

生产端-可靠性投递

*幂等性

并发场景下多次操作结果一致。

* 幂等性的解决方案

* 唯一id+指纹码机制,利用数据库主键去重

* 利用redis的原子性去实现

*Confirm 确认消息

* 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给生产者一个应答。

* 生产者进行接收应答,用来确认消息是否正常发送到Broker,做可靠性投递保障。

* 实现Confirm确认消息

* 在channel上开启确认模式 **channel.confrimSelect()**

* 在channel上添加监听,**addConfrimListener** 监听处理结果

* Return Listener 机制

处理生产者投递消息不可达机制

* Mandatory默认为false 如果是默认值 broker自动删除该消息

* 自定义消费端监听

extends DefaultConsumer

* 消费端限流

* broker堆积大量未处理数据,消费端消费能力有限。

* rabbitmq 提供一种qos(服务质量保证),在非确认消息的前提下,如果一定数目的消息(基于consumer或者channel设置qos值)未被确认前,不进行消费新的消息。

* void BasicQos(prefetchSize,prefetchCount,global)

* prefetchSize :消息限制大小 0 表示不限制

* prefetchCount :一次最多推送的消息个数

* 在no_ack=false 生效

* global :限流策略级别(T : channel F :consumer)

* Ack与重回队列

* 手动ack和Nack

* 消费端进行消费的时候,如果由于业务异常我们可以进行体制记录,然后进行补偿。

* 宕机等问题,进行手动Ack保障

---------------------------------------------------------------

* 消费端重回队列是为了对处理异常消息,重新投递

* 一般应用建议关闭重回队列

* TTL

* time to live 消息过期时间

* 消息过期

* 队列过期

* 死信队列(DLX)

DLX是一个正常的exchange,可以在任何队列上被指定,实际就是设置某个队列的属性,当队列中有死信时,mq会自动将这个消息重新发布到exchange上去,进而被路由到另一个队。

* dead-letter-exchange

* 消息被拒绝(basic.reject/basic.nack)并且requeue=false

* 消息TTL过期

* 队列达到最大长度

* 队列上添加参数 : arguments.put(x-dead-letter-exchange,dlx.exchange);

##### 整合

* spring

* RabbitaAdmin

* RabbitTemplate

* SimpleMessageLIstenerContainer(动态修改配置)

* 设置事务特性,事务管理器,事务属性,事务容量,是否开启事务,回滚消息等

* 设置消费者数量,最小最大数量,批量消费

* MessageListenerAdapter (消息监听适配器)

* DefaultListenerMethod : 设置监听方法名称

* Delegate 委托对象:实际真实的委托对象,用于处理消息

* QueueOrTagToMethodName 队列标识与方法名称组成匹配

* MessageConverter 消息转换器

默认二进制数据格式传输,可以自定义数据格式。 (可转格式 json,jpg,pdf,ppt,media 等)

*toMessage()

*fromMessage()

* springboot

* RabbitTemplate.ConfirmCallBack

实现一个监听器用于监听broker端给我们返回的确认消息

* RabbitTemplate.ReturnCallBack

保证消息对broker端是可达的 如果出现路由键不可达的情况,则使用监听器对不可达消息进行后续处理,保证消息路由成功

* spring.rabbitmq.listener.simple.acknowledge-mode=manual手动ack

* 手动确认模式可以保证消息的可靠性投递,或者消费端失败的时候可以做重回队列,根据业务记录日志等处理

* 可设置监听个数和最大个数,用于控制消费端并发情况

* RabbitListener

```java

@RabbitListener (

bindings = @QueueBinding(value = @Queue (value = test.springboot.queue,durable = true),

exchange =@Exchange(value = test.springboot.exchange,durable = true,type = topic,ignoreDeclarationExceptions = true),

key = test.springboot.exchange.#

)

)

@RabbitHandler

public voidreceiver(Message message, Channel channel){

log.info(消费端收到 :+message.getPayload().toString());

log.info(消费端收到 :+message.getHeaders().get(AmqpHeaders.DELIVERY_TAG));

//手工Ack

try {

channel.basicAck((Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG),false);

} catch (IOException e) {

e.printStackTrace();

}

}

```

* springcloud

springboot cloud stream 整合

##### 集群架构

* 主备模式

HaPorxy

* 远程模式

Shovel模式 跨地域多台mq集群互联

* 镜像模式

* 多活模式多中心

Federation插件Federation Exchange 可以看成是Downstream 从Upstream 主动拉取消息,但不是拉取所有消息,必须在Downstram上已经明确定义的Bingdings关系的Exchange

也就是有实际的物理Queue来接收消息,才会从Upstream拉取消息到Downstream,使用AMQP协议实施代理间通信,Downstream会将绑定关系组合在一起,绑定,解除命令将发送到Upstream交换机

因此 Federation Exchange 只接受具有订阅的消息

RabbitMQ)宝,都看到这里了你确定不收藏一下??

92%的人还看了