Mallux - 宁静致远

RabbitMQ

RabbitMQ 简介

AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

RabbitMQ 基本概念

消息包含两部分内容:有效载荷(payload)和标签(label)。

  • 有效载荷(payload):传输的数据
  • 标签(label):描述子有效载荷,并且 RabbitMQ 用它来决定谁将获得消息的拷贝

AMQP 消息路由必须有三个部分:交换器(Exchange)、队列(Queue)和绑定。

ConnectionFactory、Connection、Channel

ConnectionFactory、Connection、Channel 都是 RabbitMQ 对外提供的 API 中最基本的对象。Connection 是 RabbitMQ 的 socket 链接,它封装了 socket 协议相关部分逻辑。ConnectionFactory 为 Connection 的制造工厂。

Channel 是我们 与 RabbitMQ 打交道的最重要的一个接口,我们大部分的业务操作是在 Channel 这个接口中完成的,包括定义 Queue、定义 Exchange、绑定 Queue 与 Exchange、发布消息等。

Channel:信道

应用程序和 RabbitMQ 代理服务器之间建立一条 TCP 连接,一旦认证通过,应用程序可以创建一条 AMQP 信道(channel)。信道是建产在 “真实的” TCP 连接内的虚拟连接。 AMQP 命令都是通过信道发送出去的。每条信道都会被指派一个唯一 ID(AMQP 库会帮你记住 ID 的)。不论是发布消息、订阅队列或是接收消息,这些动作都是通过信道完成的。

为所有线程只使用一条 TCP 连接,并在其上创建对应数量的信道(一条 TCP 连接上创建多少条信道是没有限制),不仅能确保每个线程的私密性,也避免了传统上的每线程一个 TCP 连接的开销。

Queue

Queue(队列)是 RabbitMQ 的内部对象,用于存储消息,用下图表示。

RabbitMQ 中的消息都只能存储在 Queue 中,生产者(下图中的 P)生产消息并最终投递到 Queue 中,消费者(下图中的 C)可以从 Queue 中获取消息并消费。

多个消费者可以订阅同一个 Queue,这时 Queue 中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

消费者(consume)接收消息的方式

  • basic.consume:通过 AMQP 的 basic.consume 命令订阅,这样做会将信道(channel)置为接收模式,直到取消对队列的订阅为止。订阅了消息后,消费者在消费(或拒绝)最近接收的那条消息后,就能从队列中(可用的)自动接收下一条消息

  • basic.get:从队列中获得单条消息,而不是持续订阅,如果要获得更多消息的话,需要再次发送 basic.get 命令。

大致上讲,basic.get 命令会订阅消息,获得单条消息,然后取消订阅。消费者理应始终使用 basic.consume 来实现高吞吐量。

消费者(consume)接收消息后,未确认消息的,可以有以下两个选择:

  • 把消费者从 RabbitMQ 服务器断开连接,这会导致 RabbitMQ 自动重新把消息入队并发送给另一个消费者。这样做的好处是所有的 RabbitMQ 版本都支持。缺点是,这样连接/断开连接的方式会额外增加 RabbitMQ 的负担(如果消费在处理每条消息时都遇到错误的话,会导致潜在的重大负荷)。

  • RabbitMQ 2.0.0 或者更新版本,那就使用 AMQP 的basic.reject命令。该命令允许消费者拒绝 RabbitMQ 发送的消息。如果把 reject 命令的 requeue 参数设置为 true 的话,RabbitMQ 会将消息重新发送给下一个订阅的消费者。如果设置成 false 的话,RabbitMQ 立即会把消息从队列中移除,而不会把它发送给新的消费者。你也可以通过对消息确认的方式来简单地忽略该消息(这种忽略消息的方式的优势在于所有版本的 RabbitMQ 都支持)。如果你检查到一条格式错误的消息而任何一个消费者都无法处理的时候,这样做就十分有用。

“死信(dead letter)” 队列

用来存放那些被拒绝而不重入队列的消息。死信队列让你通过检测”拒绝/未送到”的消息来发现问题。如果应用程序想自动从死信队列中获益的话,需要使用 reject 命令,并将 requeue 参数设置成 false。

创建队列

消费者和生产者都能使用 AMQP 的queue.declare命令来创建队列。但是如果消费者在同一条信道(channel)上订阅了另一个队列的话,就无法再声明队列了。必须首先取消订阅,将信道置为“传输”模式。

队列设置一些有用的参数:

  • exclusive:如果设置为 true 的话,队列将变成私有的,此时只有你的应用程序才能消费队列消息。当你想要限制一个队列只有一个消费者的时候很有帮助。

  • auto-delete:当最后一个消费者取消订阅的时候,队列会自动移除。如果你需要临时队列(对于构建在 AMQP 上的 RPC 应用来说,使用临时“匿名”队列很有用)只为一个消费者服务的话,请结合使用 auto-delete 和 exclusive。当消费者断开连接时,队列就被移除了。

Message acknowledgment(ACK)

在实际应用中,可能会发生消费者收到 Queue 中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后,必须通过 AMQP 的 basic.ack 命令显式地向 RabbitMQ 发送一个确认,RabbitMQ 收到消息回执(Message acknowledgment)后才将该消息从 Queue 中移除;如果 RabbitMQ 没有收到回执并检测到消费者的 RabbitMQ 连接断开,则 RabbitMQ 会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在 timeout 概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的 RabbitMQ 连接断开。这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给 RabbitMQ,这将会导致严重的 bug,Queue 中堆积的消息会越来越多;消费者重启后会重复消费这些消息并重复执行业务逻辑 …

Message durability(持久化)

队列和交换器

在之前的版本中,重启 RabbitMQ 服务器后,队列和交换器都会消失。

原因在于每个队列和交换器的durable 属性。将它设置为 true 时,你就不需要在服务器断电/重启后重新创建队列和交换器了。

RabbitMQ 3.x 版本,默认队列和交换器的 druable 属性为 true。

队列消息

如果我们希望即使在 RabbitMQ 服务重启的情况下,也不会丢失消息,需要:

  • 设置消息投递模式(delivery mode)选项为 2(持久化)
  • 设置交换器为 durable
  • 设置队列为 durable

这样可以保证绝大部分情况下我们的 RabbitMQ 消息不会丢失。但依然解决不了小概率丢失事件的发生(比如 RabbitMQ 服务器已经接收到生产者的消息,但还没来得及持久化该消息时 RabbitMQ 服务器就断电了),如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务。

实现机制

RabbitMQ 确保持久性消息能从服务器重启中恢复的方式是,将它们写们写入磁盘上的一个持久化日志文件。当发布一条持久性消后到持久交换器上时,RabbitMQ 会在消息提交到日志文件后才发送响应。记住,之后这条消息如果路由到了非持久队队的话,它会自动从持久性日志中移除,并且无法从服务器重启中恢复。

一旦你从持久化队列中消费了一条持久性消息的话(并且确认了它),RabbitMQ 会在持久性日志文件中把这条消息标记为等待垃圾收集。

在你消费持久性消息前,如果 RabbitMQ 重启的话,服务器会自动重建交换器和队列(以及绑定),重播持久性日志文件中的消息到合适的队列或者交换器上(取决于 RabbitMQ 服务器宕机的时候,消息处在路由过程的哪个环节)。

使用持久化机制导致消息吞吐量降低至少 10 倍的情况并不少见。另外还有一点就是,持久性消息在 RabbitMQ 内建集群环境下工作得并不好。确认持久化你需要考虑使用更快的存储。

Prefetch count(预取计数)

前面我们讲到如果有多个消费者同时订阅同一个 Queue 中的消息, Queue 中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置 prefetch Count 来限制 Queue 每次发送给每个消费者的消息数,比如我们设置 prefetchCount=1,则 Queue 每次给每个消费者发送一条消息;消费者处理完这条消息后 Queue 会再给该消费者发送一条消息。

Exchange

在上一节我们看到生产者将消息投递到 Queue 中,实际上这在 RabbitMQ 中这种事情永远都不会发生。实际的情况是,生产者将消息发送到 Exchange交换器,下图中的 X),由 Exchange 将消息路由到一个或多个 Queue 中(或者丢弃)。

Exchange 是按照什么逻辑将消息路由到 Queue 的?这个将在 Binding 一节介绍。RabbitMQ 中的 Exchange 有四种类型,不同的类型有着不同的路由策略,这将在 Exchange Types 一节介绍。

routing key

生产者在将消息发送给 Exchange 的时候,一般会指定一个 routing key,来指定这个消息的路由规则,而这个 routing key 需要与 Exchange Type 及 binding key 联合使用才能最终生效。

在 Exchange Type 与 binding key 固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给 Exchange 时,通过指定 routing key 来决定消息流向哪里。RabbitMQ 为 routing key 设定的长度限制为 255 bytes。

Binding

RabbitMQ 中通过 Binding 将 Exchange 与 Queue 关联起来,这样 RabbitMQ 就知道如何正确地将消息路由到指定的 Queue了。

Binding key

在绑定(Binding)Exchange 与 Queue 的同时,一般会指定一个 binding key;消费者将消息发送给 Exchange 时,一般会指定一个 routing key;当 binding key 与 routing key 相匹配时,消息将会被路由到对应的 Queue 中。这个将在 Exchange Types 章节会列举实际的例子加以说明。

在绑定多个 Queue 到同一个 Exchange 的时候,这些 Binding 允许使用相同的 binding key。

binding key 并不是在所有情况下都生效,它依赖于 Exchange Type,比如 fanout 类型的 Exchange 就会无视 binding key,而是将消息路由到所有绑定到该 Exchange 的 Queue。

Exchange Types

RabbitMQ 常用的 Exchange Type 有 direct、fanout、topic 和 headers 这四种(AMQP 规范里还提到两种 Exchange Type,分别为 system 与自定义,这里不予以描述)。

direct

direct 类型的 Exchange 路由规则也很简单,它会把消息路由到那些 binding key 与 routing key 完全匹配的 Queue 中。

以上图的配置为例,我们以 routingKey=”error” 发送消息到 Exchange,则消息会路由到 Queue1(amqp.gen-S9b…,这是由 RabbitMQ 自动生成的 Queue 名称)和 Queue2(amqp.gen-Agl…);如果我们以 routingKey=”info” 或 routingKey=”warning” 来发送消息,则消息只会路由到 Queue2。如果我们以其他 routingKey 发送消息,则消息不会路由到这两个 Queue 中。

fanout

fanout 类型的 Exchange 路由规则非常简单,它会把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中,这将允许应用对单条消息做出不同的反应。例如,一个 Web 应用程序可以需要在用户上传新图片时,用户相册必须清除缓存,同时用户应该得到些积分奖励。那么你可以将两个队列绑定到图片上交换器上,一个用于清除缓存,另一个用于增加用户积分。

上图中,生产者(P)发送到 Exchange(X)的所有消息都会路由到图中的两个 Queue,并最终被两个消费者(C1 与 C2)消费。

topic

前面讲到 direct 类型的 Exchange 路由规则是完全匹配 binding key 与 routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic 类型的 Exchang e在匹配规则上进行了扩展,它与 direct 类型的 Exchage 相似,也是将消息路由到 binding key 与 routing key 相匹配的 Queue 中,但这里的匹配规则有些不同,它约定:

  1. routing key 为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如 “stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
  2. binding key 与 routing key 一样也是句点号“. ”分隔的字符串
    binding key 中可以存在两种特殊字符 “” 与 “#”,用于做模糊匹配,其中 “” 用于匹配一个单词,“#” 用于匹配多个单词(可以是零个)

以上图中的配置为例,routingKey=”quick.orange.rabbit” 的消息会同时路由到 Q1 与 Q2,routingKey=”lazy.orange.fox” 的消息会路由到 Q1,routingKey=”lazy.brown.fox” 的消息会路由到 Q2,routingKey=”lazy.pink.rabbit” 的消息会路由到 Q2(只会投递给 Q2 一次,虽然这个 routingKey 与 Q2 的两个 bindingKey 都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit” 的消息将会被丢弃,因为它们没有匹配任何 bindingKey。

headers

headers 类型的 Exchange 不依赖于 routing key 与 binding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。

在绑定 Queue 与 Exchange 时指定一组键值对;当消息发送到 Exchange 时,RabbitMQ 会取到该消息的 headers(也是一个键值对的形式),对比其中的键值对是否完全匹配 Queue 与 Exchange 绑定时指定的键值对;如果完全匹配则消息会路由到该 Queue,否则不会路由到该 Queue。

headers 与 direct 完全一致,但性能会差很多。因此它并不太实用,而且几乎再也用不到了。

RPC

MQ 本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到 RabbitMQ 后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。

但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。在 RabbitMQ 中也支持 RPC。

RabbitMQ 中实现 RPC 的机制是:

  1. 客户端发送请求(消息)时,在消息的属性(MessageProperties,在 AMQP 协议中定义了 14 种 properties,这些属性会随着消息一起发送)中设置两个值 replyTo(一个 Queue 名称,用于告诉服务器处理完成后将通知我的消息发送到这个 Queue中)和 correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个 id 了解哪条请求被成功执行了或执行失败)
  2. 服务器端收到消息并处理
  3. 服务器端处理完消息后,将生成一条应答消息到 replyTo 指定的 Queue,同时带上 correlationId 属性
  4. 客户端之前已订阅 replyTo 指定的 Queue,从中收到服务器的应答消息后,根据其中的 correlationId 属性分析哪条请求被执行了,根据执行结果进行后续业务处理

AMQP 事务和发送方确认模式

事务(transaction)模式

在 AMQP 中,把信道(channel)设置成事务模式后,你通过信道发送那些想要的确诉的信息,之后还有多个其它 AMQP 命令。这些命令是执行还是忽略,取决于第一条消息发送是否成功。一旦你发送完所有命令,就可以提交事务了。如是事务中的首次发布成功了,那么信息会在事务中完成其它 AMQP 命令。如果发送失败的话,其他 AMQP 命令将不会执行。事务填补了生产者发布消息以及 RabbitMQ 将它们提交到磁盘上这两者之间“最后 1 英里”的差距。不过,还有更好的方法来填补差距。

发送方确认模式:异步机制

使用事务不但会降低大约 2~10 倍的消息吞吐量,而且会而生产者应用程序产生同步。而你使用消息就是想避免同步。因此,RabbitMQ 团队拿出更好的方案来保证消息投递:发送方确认模式。

和事务相仿,你需要告诉 RabbitMQ 将信道设置成confirm 模式,而且你只能通过重新创建信道来关闭该设置。一旦信道进入 confirm 模式,所有在信道上发送的消息都会被指派一个唯一的 ID 号(从 1 开始)。一旦消息被投递给所有匹配的队列后,信道会发送一个发送方确认模式给生产者应用程序(包含消息的唯一 ID)。这使得生产者知晓消息已经安全到达目的队列了。如果消息和队列是可持久化的,那么确认消息只会在队列将消息写入磁盘后才会生效。发送方确认模式的最大好处是它们是异步的。一旦发布了一条消息,生产者应用程序就可以在等待确认的同时继续发送下一条。当确认消息最终收到的时候,生产者应用程序的回调方法就会被触发来处理该确认消息。如果 RabbitMQ 发生了内部错误从而导致了消息的丢失,RabbitMQ 会发送一条 nack(not acknowledged,未确认)消息。就像发送方确认消息那样,只不过这次说明消息已经丢人了。同时,由于没有消息回滚的概念(同事务相比),因此发送方确认模式更加轻量级,同时对 RabbitMQ 代理服务器的性能影响几乎可以忽略不计。

运行和管理 RabbitMQ

安装 RabbitMQ

运行一个 ubuntu 容器

1
# docker run -it --name rabbitmq --hostname rabbitmq -p 5672:5672 -p 15672:15672 -p 25672:25672 192.168.101.26/base/ubuntu14:1.0 /bin/bash

dokcer-compose.yml

1
2
3
4
5
6
7
8
9
10
11
### rabbitmq - Service define start
rabbitmq:
image: 192.168.101.26/base/ubuntu14:1.0
ports:
- "5672:5672"
- "15672:15672"
- "25672:25672"
hostname: rabbitmq
mem_limit: 1024M
command: ["/bin/bash","-c","while :; do sleep 60 ; done"]
### rabbitmq - Service define end

安装 erlang 依赖包

依赖 erlang APT 官方源,安装 erlang 的包很慢很慢。因此,还是建议设置一个国内的 ubuntu 源(例,阿里源)直接安装吧。

  • 方式一:使用 erlang APT 官方源
1
2
# wget -O - https://packages.erlang-solutions.com/ubuntu/erlang_solutions.asc | apt-key add -
# echo 'deb https://packages.erlang-solutions.com/ubuntu trusty contrib' | tee /etc/apt/sources.list.d/erlang.list
  • 方式二:使用阿里云的 ubuntu 源
1
2
3
4
5
6
7
8
9
10
11
12
# cat > /etc/apt/sources.list <<-EOF
deb http://mirrors.aliyun.com/ubuntu/ trusty main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ trusty-security main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ trusty-updates main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ trusty-proposed main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ trusty-backports main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ trusty main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ trusty-security main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ trusty-updates main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ trusty-proposed main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ trusty-backports main restricted universe multiverse
EOF
  • 安装 erlang 依赖包
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# apt-get update && apt-get install -y --no-install-recommends \
erlang-asn1 \
erlang-base-hipe \
erlang-crypto \
erlang-eldap \
erlang-inets \
erlang-mnesia \
erlang-nox \
erlang-os-mon \
erlang-public-key \
erlang-ssl \
erlang-xmerl \
socat \
libwrap0

安装 rabbitmq-server

  • 方式一:使用国内 ubuntu 源,版本可能比较老

  • 方式二: rabbitmq 官方 APT 源,很慢很慢

1
2
# wget -O - https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | apt-key add -
# echo 'deb http://www.rabbitmq.com/debian/ testing main' | tee /etc/apt/sources.list.d/rabbitmq.list
  • 方式三:下载 rabbitmq 官方的 deb 包,直接安装

[deb] http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server_3.6.6-1_all.deb

1
2
# wget http://192.168.112.4:88/rabbitmq-server_3.6.6-1_all.deb
# dpkg -i rabbitmq-server_3.6.6-1_all.deb

修改 /etc/hosts 文件

增加 “127.0.0.1 rabbitmq” 映射,防止 rabbit 启动时出现如下错误:

1
ERROR: epmd error for host c7-Host: nxdomain (non-existing domain)

如上所示,rabbitmq 在容器中启动时,显示 “c7-Host” (该主机名是 docker 宿主机的名字)解析错误,rabbitmq 容器的主机名实际是 rabbitmq。

如果修改主机名,还是报主机解析错误,那么可以设置RABBITMQ_NODENAME=rabbit@<主机名>

启动 rabbitmq

1
# /etc/init.d/rabbitmq-server start
1
2
3
4
5
6
[root@rabbitmq /]# netstat -nplt
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 0.0.0.0:25672 0.0.0.0:* LISTEN -
tcp6 0 0 :::4369 :::* LISTEN -
tcp6 0 0 :::5672 :::* LISTEN -

安装 web 管理插件

1
# rabbitmq-plugins enable rabbitmq_management

guest 账号无法访问 web 界面

rabbitmq 从 3.3.0 开始禁止使用 “guest/guest” 权限通过除 localhost 外的访问。如果想使用 guest 账号通过远程机器访问,需要在 rabbitmq 配置文件中(/etc/rabbitmq/rabbitmq.config)中设置 loopback_users 为 [],注意后面的点号:

1
[{rabbit, [{loopback_users, []}]}].

端口访问

  • 4369 (epmd)
  • 5672, 5671 (AMQP 0-9-1 and 1.0 without and with TLS)
  • 25672:This port used by Erlang distribution for inter-node and CLI tools communication and is allocated from a dynamic range (limited to a single port by default, computed as AMQP port + 20000). See networking guide for details.
  • 15672 (if management plugin is enabled)
  • 61613, 61614 (if STOMP is enabled)
  • 1883, 8883 (if MQTT is enabled)

服务器管理

启动节点

rabbitmq-server 命令:同时启动节点和应用程序

  • 前端运行
1
2
3
4
5
6
7
8
9
10
# rabbitmq-server
RabbitMQ 3.6.6. Copyright (C) 2007-2016 Pivotal Software, Inc.
## ## Licensed under the MPL. See http://www.rabbitmq.com/
## ##
########## Logs: /var/log/rabbitmq/rabbit@rabbitmq.log
###### ## /var/log/rabbitmq/rabbit@rabbitmq-sasl.log
##########
Starting broker...
completed with 6 plugins.

rabbitmq-server 命令运行的 rabbitmq 服务,在使用 “ctrl+c” 中断运行后,其仍然会在后台运行。

  • 以守护程序的方式启动
1
# rabbitmq-server -detached

/etc/init.d/rabbitmq-server init 方式

1
# /etc/init.d/rabbitmq-server start

停止节点

同时关闭节点和应用程序

  • rabbitmqctl stop
  • /etc/init.d/rabbitmq-server stop

关闭和重启应用程序

rabbitmq-server 命令,将 RabbitMQ 应用程序预先设置成了独立运行模式,它同时启动了节点和应用程序。为了把节点加入到现有集群当中,你需要做的是停止应用程序,把节点重置为原始状态。这样节点就准备好加入集群了。此外,你可能会在同一节点上运行除了 RabbitMQ 之外的其它 Erlang 应用程序,这使得停止整个节点是不可取的。

因此,我们可以使用rabbitmqctl命令的stop_appstart_app,来启停 RabbitMQ 应用程序,而不至于关闭整个 Erlang 节点。

1
2
3
4
5
6
7
# netstat -nplt
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 0.0.0.0:15672 0.0.0.0:* LISTEN 11812/beam.smp
tcp 0 0 0.0.0.0:25672 0.0.0.0:* LISTEN 11812/beam.smp
tcp6 0 0 :::4369 :::* LISTEN 209/epmd
tcp6 0 0 :::5672 :::* LISTEN 11812/beam.smp
1
2
3
4
5
6
7
8
# rabbitmqctl stop_app
Stopping node rabbit@rabbitmq ...
# netstat -nplt
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 0.0.0.0:25672 0.0.0.0:* LISTEN 11812/beam.smp
tcp6 0 0 :::4369 :::* LISTEN 209/epmd

RabbitMQ 配置文件

默认配置文件为/etc/rabbitmq/rabbitmq.config,我们可以通过 rabbitmq-server 脚本对 CONFIG_FILE 环境变量进行设置。

RabbitMQ 配置文件,其本质是原始的 Erlang 数据结构,其内容是一个包含了嵌套哈希表(字典或命名数组)的数组。

1
2
3
4
[
{ mnesia, [{dump_log_write_threshold, 1000}] },
{ rabbit, [{vm_memory_high_watermark, 0.4}, {loopback_users, []}] }
].

如上所示,第一行和最后一行分别开启和关闭子配置数组。通过外部配置数据,每个 Erlang 应用程序会有自己的哈希表来配置选项(此处我们有两个应用)。

  • mnesia 指的是 Mnesia 数据库配置选项(Mnesia 是 RabbitMQ 用来存储交换器、队列和绑定的元数据的,除了消息的内容)。
  • rabbitmq 指的是 RabbitMQ 特定的配置选项。每个选项都表达为这种形式:{[option_name], [option_value]}

例如,{dump_log_write_threshold, 1000} 更改了 Mensia 把条目从仅限追加的日志刷出到真实数据库文件的频度。为了添加另一个 Mnesia 配置选项,只需增加一个 {[option_name], [option_value]} 条目,该条目需要和最后一个之间逗号隔开。

请求许可

[Doc] http://www.rabbitmq.com/management.html

RabbitMQ 权限系统,就像大多数权限系统那样,首先需要创建用户,然后为其赋予权限。用户可以为连接到 RabbitMQ 主机的用户程序设置不同级别的权限(读、写、和 / 或配置)。

RabbitMQ 权限系统的一个好的地方在于单个用户可以跨越多个 vhost 进行制授权。当应用程序需要跨越多个安全域进行通信时(使用虚拟机进行隔离),这会极大地方便访问控制的管理。

在 RabbitMQ 中,用户是访问控制的基本单元。针对一到多个 vhost,其可以被赋于不同级别的访问权限,并使用标准的用户名、密码对来认证用户。对用户的增加、删除以及列出列表,都非常简单。这些操作都是通过 rabbitmqctl 命令来完成的。

RabbitMQ 的用户角色

用户角色分类

  • none

    • 不能访问 management plugin
  • management

    • 用户可以通过 AMQP 做的任何事外加:
    • 列出自己可以通过 AMQP 登入的 virtual hosts
    • 查看自己的 virtual hosts 中的 queues、exchanges 和 bindings
    • 查看和关闭自己的 channels 和 connections
    • 查看有关自己的 virtual hosts 的“全局”的统计信息,包含其他用户在这些 virtual hosts 中的活动。
  • policymaker

    • management 可以做的任何事外加:
    • 查看、创建和删除自己的 virtual hosts 所属的 policies 和 parameters
  • monitoring

    • management 可以做的任何事外加:
    • 列出所有 virtual hosts,包括他们不能登录的 virtual hosts
    • 查看其他用户的 connections 和 channels
    • 查看节点级别的数据如 clustering 和 memory 使用情况
    • 查看真正的关于所有 virtual hosts 的全局的统计信息
  • administrator

    • policymaker 和 monitoring 可以做的任何事外加:
    • 创建和删除 virtual hosts
    • 查看、创建和删除 users
    • 查看创建和删除 permissions
    • 关闭其他用户的 connections

RabbitMQ 的权限系统

  • :有关消费信息的任何操作,包括“清除”整个队列(同样需要绑定操作的成功)
  • :发布消息(同样需要绑定操作的成功)
  • 配置:队列和交换器的创建和删除

每一条控制条目条由由以下四部分组成:

  • 补授予访问权限的用户
  • 权限控制的应用的 vhost
  • 需要授予的读、写、配置权限的组合
  • 权限范围

访问控制条目是无法跨越 vhost 的。因此你需要分别为不同的 vhost 创建访问控制条目。

rabbitmqctl

创建管理员账号、删除默认的 guest 管理账号

1
2
3
4
5
6
7
8
9
10
11
# rabbitmqctl add_user admin admin
Creating user "admin" ...
# rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...
# rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
Setting permissions for user "admin" in vhost "/" ...
# rabbitmqctl delete_user guest
Deleting user "guest" ...

monitor:监控权限账号

1
2
3
# rabbitmqctl add_user monitor monpass
# rabbitmqctl set_user_tags monitor monitoring
# rabbitmqctl set_permissions -p / monitor "" "" ""

创建 vhost

  • 创建用户
1
2
# rabbitmqctl add_user appuser appasswd
# rabbitmqctl set_user_tags appuser policymaker
  • 创建 vhost、授权权限
    1
    2
    3
    4
    5
    6
    # for vhost in app lender usercenter
    do
    rabbitmqctl add_vhost $vhost
    rabbitmqctl set_permissions -p $vhost appuser ".*" ".*" ".*"
    rabbitmqctl set_permissions -p $vhost admin ".*" ".*" ".*"
    done

rabbitmqadmin

[Doc] http://www.rabbitmq.com/management-cli.html

rabbitmq_management 插件提供了一个额外的 rabbitmqadmin 命令行工具,透过它可以实现其 Web 控制台一样的操作。

功能

  • list exchanges, queues, bindings, vhosts, users, permissions, connections and channels.
  • show overview information.
  • declare and delete exchanges, queues, bindings, vhosts, users and permissions.
  • publish and get messages.
  • close connections and purge queues.
  • import and export configuration.

下载 rabbitmqadmin 脚本:依赖于 python >=2.6

1
# curl -L http://127.0.0.1:15672/cli/rabbitmqadmin > /usr/local/bin/rabbitmqadmin | chmod +x /usr/local/bin/rabbitmqadmin

命令补齐

1
2
3
# rabbitmqadmin --bash-completion > /etc/bash_completion.d/rabbitmqadmin
# echo 'source /etc/bash_completion.d/rabbitmqadmin' >> ~/.bashrc

rabbitmqadmin 配置文件

1
2
3
4
5
6
7
# cat > rabbitmqadmin.conf <<-EOF
[localhost]
hostname = rabbitmq
port = 15672
username = admin
password = admin
EOF

Demo

  • 创建队列、交换器和绑定
1
2
3
4
5
# rabbitmqadmin -c rabbitmqadmin.conf -N localhost -V 'app' declare queue name=sms_consumer
# rabbitmqadmin -c rabbitmqadmin.conf -N localhost -V 'app' declare exchange name=rocket2.events type=fanout
# rabbitmqadmin -c rabbitmqadmin.conf -N localhost -V 'app' declare binding source=rocket2.events destination=sms_consumer
  • 备份、还原配置
1
2
3
# rabbitmqadmin -c rabbitmqadmin.conf -N localhost export app.settings
# rabbitmqadmin -c rabbitmqadmin.conf -N localhost import app.settings

RabbitMQ 集群

RabbitMQ 最优秀的功能之一就是其内建集群,你能够在 5 分钟内快速搭建一个集群并运行起来。

RabbitMQ 内建集群的设计主要用于完成两个目标:

  • 允许消费者和生产者在 RabbitMQ 节点崩溃的情况下继续运行。
  • 通过添加更多的节点来线性扩展消息通信吞吐量。

RabbitMQ 通过利用 Erlang 提供的开放电信平台(Open Telecom Platform, OTP)分式布通信框架来巧妙地满足以上两个需求。你可以失去一个 RabbitMQ 节点,同时客户端能够重新连接到集群中的任何其它节点并继续生产或消费信息。

即便你将所有事情处理得很好(将消息、队列、交换器设置成可持化,等等),当一个 RabbitMQ 集群节点崩溃时,该节点上的消息也会消失。这是因为 RabbitMQ 默认不会将队列的内容复制到整个集群上。如果不进行特别的配置,这些消息仅存在于队列所属的那个节点上。

集群架构

RabbitMQ 会始终记录以下四种类型的内部元数据:

  • 队列元数据:队列名称和它们的属性(是否可持久化、是否自动删除)
  • 交换器元数据:交换器名称、类型和属性(可持久化等)
  • 绑定元数据:一张简单的表格展示了如何将消息路由到队列
  • vhost 元数据:为 vhost 内的队列、交换器和绑定提供命名空间和安全属性

在单一节点内,RabbitMQ 会将所有这些信息存储在内存中,同时将那些标记为可持久化的队列和交换器(以及它们的绑定)存储到硬盘上。存储到硬盘上可以确保队列和交换器在重启 RabbitMQ 节点后重新创建。当你引入集群时,RabbitMQ 需要追踪新的元数据类型:集群节点位置,以及节点与记录的其它类型元数据的关系。集群也提供了选择:将元数据存储到磁盘上(独立节点的默认位置),或者仅存储在内存中。

集群中的队列

如果在集群中创建队列的话,集群只会在单个节点而不是所有节眯上创建完整的队列信息(元数据、状态、内容)。结果是只有队列的所有者节点知道有关队列的完整信息。所有其它非所有者节点只知道队列的元数据和指向该队列存在的那个节点的指针。因此,当集群节点崩溃时,该节点的队列和关联的绑定都消失了。附加在那些队列上的消费都丢失了其订阅信息,并且任何匹配该队列绑定信息的新消息也都丢失了。

你可以让消费者重加到集群并重新创建队列,但这种做法仅当队列最开始没有被设置成可持久化时才是可行的。如果重新创建的队列被标记成持久化了,那么其它节点上的重新声明会得到一个 “404 NOT_FOUND” 错误。这样确保了当失败节点恢复后加入集群,该节点上的队列消息不会丢失。想要该指定队列重回集群的方法是恢复故障节点。但是如果消费者尝试重建的队列不是可持久化的,那么重新声明就会成功,你可以准备重新绑定它们并传输数据。

默认情况下,RabbitMQ 不将队列内容和状态复制到所有的其它上。其原因为存储空间(每个集群拥有相同消息的完整拷贝)和性能(持久化所带的磁盘写入)。

通过设置集群中唯一节点来负责任何特定队列,只有该负责节点才会因队列消息而遭受磁盘活动的影响。所有其它节点需要将接收到的该队列的消息传递给该队列的所有者节点上。因此,往 RabbitMQ 集群添加更多的节点意味着你将拥有更多的节点来传播队列,这些新增节点为你带来了性能的提升。当负载增加时,RabbitMQ 集群是性能扩展的最佳方案。

分布交换器

不同于队列那样拥有自己的进程,交换器说到底只是一个名称和一个队列绑定列表。当你将消息发布到交换器时,实际上是由你所连接的信道将消息上的路由键同交换器的绑定列表进行比较,然后路由消息。

理解 RabbitMQ 背后的消息路由工作机制的方法是把每个队列想名成节点上运行的进程,每个进程拥有自己的进程 ID(PID)。交换器只不过是路由模式列表和匹配消息应发往的队列进程 ID 列表。当发布的消息匹配了交换器中的绑定规则时,实际上是由信道完成了匹配工作,并在匹配之后建立队列 PID 的连接,然后将消息传送过去。队列的进程 PID 本质是其在集群中的 Erlang 地址。

由于交换器只不过是一张查询表,而非实际的消息路由器,因此将交换器在整个集群中进得复制会更加简单。当创建一个新的交换器时,RabbitMQ 所要做的是将查询表添加到集群中的所有节点上。

AMQP 的 basic.publish 命令不返回消息的状态,这意味着当信道节点崩溃时信道可能仍然在中路由消息,而生产者已继续创建下一条消息了。在这种情况下,你将承受丢失消息的风险。解决方案是使用 AMQP 事务,在消息路由到队列之前它一直它阻塞;或者使用发送方确认模式来记录连接中断时尚未确认的消息。这两种模式都能确保程序一直发布而不丢失一条消息。

内存节点和磁盘节点

每个 RabbitMQ 节点,不管是单一节点系统或者是庞大集群的一部分,要么是内存节点(RAM node),要么是磁盘节点(Disc node)。内存节点将所有的队列、交换器、绑定、用户、权限和 vhost 的元数据定义都仅存存储在内存中。而磁盘节点则将元数据存储在磁盘中。单节点系统只允许磁盘类型的节点;否则,每次你重启 RabbitMQ 之后,所有关行系统的配置信息都会丢失。不过在集群中,你可以选择配置部分节点为内存节点。

当在集群中声明队列、交换器或者绑定的时候,这些操作会直到所有集群节点都成功提交元数据变更后才返回。对于内存节点来说,这意味着将交换写入内存;而对于磁盘节点来说,这意味着昂贵的磁盘写入操作。

RabbitMQ 只要求在集群中至少有一个磁盘节点。所有其它节点可以是内存节点。记住,当节点加入或者离开集群时,它们必须要将该变更通知至至少一个磁盘节点。如果只有一个磁盘节点,而且不凑巧的是它刚好又崩溃子,那么集群可以继续路由,但是不能做以下的操作:

  • 创建队列、交换器、绑定
  • 添加用户、更改权限
  • 添加或删除集群节点

换句话说,如果集群中唯一的磁盘节点崩溃的话,集群仍然可以操持运行,但是直到所有该节点恢复到集群前,你无法变更任何东西。解决方案是在集群中设置两个磁盘节点,因此经们至少有一个是可用的,能在任何时候保存元数据变量。当内存节点重启,它们会连接到集群中的磁盘节点,下载当前集群元数据拷贝。所以当添加内存节点时,确保任一磁盘节点在线(内存节点唯一存储到磁盘的元数据信息是集群中磁盘节点的地址)。只要内存节点可以找到至少一个磁盘节点,那么它就能在重启后重新加入集群。

配置集群

在每次调用 rabbitmq-server 命令前,可以通过设置RABBITMQ_NODENAMERABBITMQ_NODE_PORTRABBITMQ_DIST_PORT环境变量来明确指定唯一的节点名称和端口。

此外,在启动集动集群节点需要禁用插件(执行 rabbitmqctl stop_app 关闭 RabbitMQ 应用程序),这是因为像 RabbitMQ Management 这样的插件会监听专门的端口(15672)提供服务。

单节点集群

docker-compose.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
### rabbitmq - Service define start
rabbitmq:
image: 192.168.101.26/base/rabbitmq-3.6.6-cluster:1.0
ports:
- "5672-5674:5672-5674"
- "15672-15674:15672-15674"
- "25672:25672"
hostname: rabbitmq
mem_limit: 2048M
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: admin
command:
- "--cluster-nodes=3"
- "--cluster-hostname=rabbitmq"
### rabbitmq - Service define end

配置集群

  • Step 1:以守护程序模式启动节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# RABBITMQ_NODE_PORT=5672 RABBITMQ_DIST_PORT=25672 \
RABBITMQ_NODENAME=rabbit_1 \
RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15672}]" \
rabbitmq-server -detached
# RABBITMQ_NODE_PORT=5673 RABBITMQ_DIST_PORT=25673 \
RABBITMQ_NODENAME=rabbit_2 \
RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" \
rabbitmq-server -detached
# RABBITMQ_NODE_PORT=5674 RABBITMQ_DIST_PORT=25674 \
RABBITMQ_NODENAME=rabbit_3 \
RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}]" \
rabbitmq-server -detached
  • Step 2:加入集群

集群中的第一个节点将初始元数据带入集群中,并且无须被告知加入。而第二个和之后的节点将加入它将获取它的元数据。要加入第二个和第三个节点,

  • rabbitmqctl -n <node > stop_app:停止 Erlang 节点上运行的 RabbitMQ 应用程序,
  • rabbimqctl -n <node> reset:重设(清空)该节点的数据,通知集群该节点正在离开集群。对于任何非正式离开的集群,集群会认为该节点出了故障,并期望其最终能恢复回来。当离开的节点是磁盘节点时,这点特别重要。

执行以上两个步骤以后,新节点加入集群才能有元数据。

Demo

  • 指定 rabbit_2@rabbitmq 为磁盘节点,join_cluster 不指定 --ram
1
2
3
4
5
6
7
8
9
10
11
# rabbitmqctl -n rabbit_2@rabbitmq stop_app
Stopping node rabbit_2@rabbitmq ...
# rabbitmqctl -n rabbit_2@rabbitmq reset
Resetting node rabbit_2@rabbitmq ...
# rabbitmqctl -n rabbit_2@rabbitmq join_cluster rabbit_1@rabbitmq
Clustering node rabbit_2@rabbitmq with rabbit_1@rabbitmq ...
# rabbitmqctl -n rabbit_2@rabbitmq start_app
Starting node rabbit_2@rabbitmq ...
  • 指定 rabbit_3@rabbitmq 为内存节点,join_cluster 指定 --ram
1
2
3
4
5
6
7
# rabbitmqctl -n rabbit_3@rabbitmq stop_app
# rabbitmqctl -n rabbit_3@rabbitmq reset
# rabbitmqctl -n rabbit_3@rabbitmq join_cluster --ram rabbit_1@rabbitmq
# rabbitmqctl -n rabbit_3@rabbitmq start_app
  • 查看集群状态
1
2
3
4
5
6
7
8
9
10
# rabbitmqctl cluster_status
Cluster status of node rabbit_1@rabbitmq ...
[{nodes,[{disc,[rabbit_1@rabbitmq,rabbit_2@rabbitmq]},
{ram,[rabbit_3@rabbitmq]}]},
{running_nodes,[rabbit_3@rabbitmq,rabbit_2@rabbitmq,rabbit_1@rabbitmq]},
{cluster_name,<<"rabbit_1@rabbitmq">>},
{partitions,[]},
{alarms,[{rabbit_3@rabbitmq,[]},
{rabbit_2@rabbitmq,[]},
{rabbit_1@rabbitmq,[]}]}]

移除集群节点

  • Step 1:停上 RabbitMQ 服务、重置节点元数据
1
2
# rabbitmqctl -n rabbit_3@rabbitmq stop_app
# rabbitmqctl -n rabbit_3@rabbitmq reset
  • Step 2:恢复独立模式启动
1
# rabbitmqctl -n rabbit_3@rabbitmq start_app

多节点集群

Erlang cookie

Erlang 节点间通过认证 Erlang cookie 的方式允许互相通。因此 rabbitmqctl 使用 Erlang OTP 通信机制来和 RabbitMQ 节点通信,运行 rabbitmqctl 的机器和所要连接的 RabbitMQ 节点必须使用相同的 Erlang cookie。此 cookie 默认存放于/var/lib/rabbitmq/.erlang.cookie文件中。

配置集群

  • Step 1:docker-compose.yml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    ### rabbitmq-n1 - Service define start
    rabbitmq-n1:
    image: 192.168.101.26/base/rabbitmq-3.6.6:1.0
    ports:
    - "5672:5672"
    - "15672:15672"
    hostname: rabbitmq-n1
    mem_limit: 2048M
    restart: always
    environment:
    RABBITMQ_DEFAULT_USER: admin
    RABBITMQ_DEFAULT_PASS: admin
    RABBITMQ_ERLANG_COOKIE: TFXKXUANVSKNDDOQHEMX
    command:
    - "rabbitmq-server"
    ### rabbitmq-n1 - Service define end
    ### rabbitmq-n2 - Service define start
    rabbitmq-n2:
    image: 192.168.101.26/base/rabbitmq-3.6.6:1.0
    ports:
    - "5673:5672"
    - "15673:15672"
    hostname: rabbitmq-n2
    mem_limit: 2048M
    restart: always
    environment:
    RABBITMQ_DEFAULT_USER: admin
    RABBITMQ_DEFAULT_PASS: admin
    RABBITMQ_ERLANG_COOKIE: TFXKXUANVSKNDDOQHEMX
    command:
    - "rabbitmq-server"
    links:
    - "rabbitmq-n1:rabbitmq-n1"
    ### rabbitmq-n2 - Service define end
    ### rabbitmq-n3 - Service define start
    rabbitmq-n3:
    image: 192.168.101.26/base/rabbitmq-3.6.6:1.0
    ports:
    - "5674:5672"
    - "15674:15672"
    hostname: rabbitmq-n3
    mem_limit: 2048M
    restart: always
    environment:
    RABBITMQ_DEFAULT_USER: admin
    RABBITMQ_DEFAULT_PASS: admin
    RABBITMQ_ERLANG_COOKIE: TFXKXUANVSKNDDOQHEMX
    command:
    - "rabbitmq-server"
    links:
    - "rabbitmq-n1:rabbitmq-n1"
    - "rabbitmq-n2:rabbitmq-n2"
    ### rabbitmq-n3 - Service define end
  • Step 2:启动节点

1
# docker-k8s-compose up -d
  • Step 3:配置节点
1
2
3
4
5
6
7
8
9
10
11
12
13
### rabbitmq-n2
docker-encap attach rabbitmq-n2
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@rabbitmq-n1
rabbitmqctl start_app
### rabbitmq-n3
docker-encap attach rabbitmq-n3
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@rabbitmq-n1
rabbitmqctl start_app

关闭、重启集群

  • When the entire cluster is brought down, the last node to go down must be the first node to be brought online. If this doesn’t happen, the nodes will wait 30 seconds for the last disc node to come back online, and fail afterwards. If the last node to go offline cannot be brought back up, it can be removed from the cluster using the forget_cluster_node comman。
  • If all cluster nodes stop in a simultaneous and uncontrolled manner (for example with a power cut) you can be left with a situation in which all nodes think that some other node stopped after them. In this case you can use the force_boot command on one node to make it bootable again

关闭集群的顺序

  1. 内存(RAM)节点

    • 可以在任何时候关闭、重启;
    • 若集群中没有存活的磁盘群点,那么内存节点会等到磁盘节点联机后,重新连接磁盘节点。
  2. 磁盘(Disc)节点

    • 若集群中只有一个磁盘节点,那么必须在所有内存节点关闭后,才最后关闭磁盘节点
    • 若有多个磁盘节点,也应该保证第一联机的磁盘节点,在最后关闭

重启集群的顺序

  1. 磁盘(Disc)节点

    • 先启动第一联机,后启动其它节点
  2. 内存(RAM)节点

RabbitMQ 高可用

实现方式不做具体介绍,集群相关,请参见 “Cluster” 序列学习笔记。

镜像队列

[Doc] http://www.rabbitmq.com/ha.html

默认情况下,RabbitMQ 集群中的队列位于单个节点(首次声明它的节点)上。 这与交换器和绑定形成对比,交换器和绑定总是被认为在所有节点上。 可以选择将队列镜像到多个节点。 每个镜像队列由一个主节点和一个或多个从节点组成,如果旧主节点由于任何原因消失,则最旧的从节点将升级为新主节点。

发布到队列的消息将复制到所有从属节点。无论用户连接到哪个节点,消费者都连接到主节点,而从节点会丢弃已在主节点上确认的消息。 队列镜像增强了可用性,但不跨节点分布负载(所有参与的节点都执行所有工作)。

这个解决方案需要一个 RabbitMQ 集群,这意味着它不会无缝地处理集群中的网络分区。因此,不推荐在 WAN 上使用(当然,客户端仍然可以根据需要尽可能地连接)。

配置镜像

队列的镜像设置可以通过策略(policy)进行,且 policies 可以随时更改。

HAproxy + RabbitMQ 集群

集群架构

HAproxy 配置

Warrnes: active/standby

[Doc] http://www.rabbitmq.com/pacemaker.html

从 1.8.0 版本开始,当拥有持久化队列的节点发生故障时,该队列无法被重新创建,任何尝试重新声明队列的客户端都会收到一个 “404 NOT FOUND AMQP” 错误。当故障节点恢复时,持久队列及其内容也跟着恢复了(假设消息是以 delivery_mode 2 的模式进行投递的)。但在节点恢复以前,任何应该投递到该队列的消息要么丢失了,要么由于设置了 mandatory 发布标志导致客户收到了错误。

如果应用程序无法承担丢失消息的风险,或者在故障队列恢复前无法处理不断重发消息的延迟,那么你可能需要我们提到的 warren 模式。

warren 是指一对主/备独立服务器,并前置一台负载均衡器来处理故障转移。

无共享的 warren 模式(推荐)

有共享的 warren 模式

对于有共享的 warren 模式,有两点你需要注意:

  • 如果由于某些原因主节点发生故障的话,那么这种错误同样会在备用节点上发生。

  • 你需要确保备用 RabbitMQ 和主节点上的 RabbitMQ 拥有相同的节点名称和 UID。

如果以上两点有一处不同,备用 RabbitMQ 将无法访问共享存储上的文件,也就无法启动了。

集群方案:HAproxy + Warren(Corosync + Pacemaker),共享存储的替代方案可以使用 DRBD。

笔记部分参见 “Cluster” 序列学习笔记之 “04 - Corosync HA Cluster” 和 “06 - DRBD HA Cluster”。

配置集群

  • 配置主备独立模式运行的 RabbitMQ 服务器。共享模式的主备节点,还需要配置 Corosync + Pacemaker,命令行工具可以使用 crmsh 或者 pcs。

  • 配置 HAproxy:指定备用节点的 RabbitMQ 集群标记为 backup。

RabbitMQ Docker

Dockerfile

single

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
FROM gd2a-harbor.service/library/rabbitmq:3.6.6
LABEL maintainer "hj.mallux@gmail.com"
ENV LANG en_US.UTF-8
ENV TZ Asia/Shanghai
RUN echo 'LANG="en_US.UTF-8"' > /etc/default/locale && \
echo 'LANGUAGE="en_US:en"' >> /etc/default/locale && \
ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && \
echo $TZ > /etc/timezone
COPY ./archives/debian.list /etc/apt/sources.list
COPY ./archives/vimrc /root/.vimrc
RUN apt-get update && \
apt-get install -y --no-install-recommends \
apt-transport-https curl vim wget lrzsz net-tools \
ca-certificates python && \
apt-get clean && rm -rf /var/lib/apt/lists/*
ENV RABBITMQ_USER rabbitmq
ENV RABBITMQ_GROUP rabbitmq
ENV RABBITMQ_VERSION 3.6.6
ENV RABBITMQ_DATA /var/lib/rabbitmq
RUN sed -i '/^#force_color_prompt=yes/ s/#//' /root/.bashrc ${RABBITMQ_DATA}/.bashrc && \
sed -i "/^if \[ \"\$color_prompt\" = yes \]/ { N; s/\(.*PS1\).*/\1='\${debian_chroot:+(\$debian_chroot)}[\\\[\\\e[0;32;1m\\\]\\\u\\\[\\\e[0m\\\]@\\\[\\\e[0;36;1m\\\]\\\h\\\[\\\e[0m\\\] \\\[\\\e[0;33;1m\\\]\\\W\\\[\\\e[0m\\\]]\\\\$ '/}" /root/.bashrc ${RABBITMQ_DATA}/.bashrc && \
sed -i '/^# PS1=/a PS1="\${debian_chroot:+(\$debian_chroot)}[\\\[\\\e[0;32;1m\\\]\\\u\\\[\\\e[0m\\\]@\\\[\\\e[0;36;1m\\\]\\\h\\\[\\\e[0m\\\] \\\[\\\e[0;33;1m\\\]\\\W\\\[\\\e[0m\\\]]\\\\$ "' /root/.bashrc && \
sed -i '/^# export/ s/^# // ; /^# alias ls=/ s/^# // ; /^# alias ll=/ s/^# //' /root/.bashrc
ENV TINI_VERSION 0.14.0
ENV TINI_SHA 6c41ec7d33e857d4779f14d9c74924cab0c7973485d2972419a3b7c7620ff5fd
ARG TINI_DOWNURL=http://192.168.251.4:88/archives/tini-static-amd64
## Use tini as subreaper in Docker container to adopt zombie processes
RUN curl -fsSL ${TINI_DOWNURL} -o /usr/local/bin/tini && \
chmod +x /usr/local/bin/tini /usr/local/bin/gosu && \
echo "${TINI_SHA} /usr/local/bin/tini" | sha256sum -c -
RUN mkdir -p /etc/bash_completion.d
COPY ./rabbitmq/rabbitmqadmin /usr/local/bin/rabbitmqadmin
COPY ./rabbitmq/rabbitmqadmin.completion /etc/bash_completion.d/rabbitmqadmin
COPY ./rabbitmq/rabbitmq-declare /
COPY ./rabbitmq/docker-entrypoint.sh /
RUN chmod +x /usr/local/bin/rabbitmqadmin /rabbitmq-declare && \
rabbitmq-plugins enable --offline rabbitmq_management && \
echo '' >> /etc/profile && \
echo 'alias ls="ls --color=auto"' >> /etc/profile && \
echo 'alias ll="ls -al --color=auto"' >> /etc/profile && \
echo 'alias grep="grep --color=auto"' >> /etc/profile && \
echo '' >> /etc/profile && \
echo 'source /etc/bash_completion.d/rabbitmqadmin' >> /etc/profile
VOLUME ${RABBITMQ_DATA}
EXPOSE 4369
EXPOSE 5672
EXPOSE 15672 25672
HEALTHCHECK --interval=1m --timeout=10s \
CMD curl -s -f http://localhost:15672 1>/dev/null 2>/dev/null || exit 1
ENTRYPOINT ["/usr/local/bin/tini", "--", "/docker-entrypoint.sh"]

Cluster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
FROM gd2a-harbor.service/library/ubuntu:14.04.5
LABEL maintainer "hj.mallux@gmail.com"
ENV LANG en_US.UTF-8
ENV TZ Asia/Shanghai
RUN echo 'LANG="en_US.UTF-8"' > /etc/default/locale && \
echo 'LANGUAGE="en_US:en"' >> /etc/default/locale && \
ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && \
echo $TZ > /etc/timezone
COPY ./archives/sources.list /etc/apt/sources.list
COPY ./archives/vimrc /root/.vimrc
RUN apt-key adv --keyserver hkp://keyserver.ubuntu.com --recv EA312927 && \
apt-get update && \
apt-get install -y --no-install-recommends \
apt-transport-https curl wget vim lrzsz net-tools \
ca-certificates python \
erlang-asn1 \
erlang-base-hipe \
erlang-crypto \
erlang-eldap \
erlang-inets \
erlang-mnesia \
erlang-nox \
erlang-os-mon \
erlang-public-key \
erlang-ssl \
erlang-xmerl \
socat \
libwrap0 && \
apt-get clean && rm -rf /var/lib/apt/lists/*
RUN sed -i '/^#force_color_prompt=yes/ s/#//' /root/.bashrc && \
sed -i "/^if \[ \"\$color_prompt\" = yes \]/ { N; s/\(.*PS1\).*/\1='\${debian_chroot:+(\$debian_chroot)}[\\\[\\\e[0;32;1m\\\]\\\u\\\[\\\e[0m\\\]@\\\[\\\e[0;36;1m\\\]\\\h\\\[\\\e[0m\\\] \\\[\\\e[0;33;1m\\\]\\\W\\\[\\\e[0m\\\]]\\\\$ '/}" /root/.bashrc && \
sed -i '/^# export/ s/^# // ; /^# alias ls=/ s/^# // ; /^# alias ll=/ s/^# //' /root/.bashrc
ENV TINI_VERSION 0.14.0
ENV TINI_SHA 6c41ec7d33e857d4779f14d9c74924cab0c7973485d2972419a3b7c7620ff5fd
ARG TINI_DOWNURL=http://192.168.251.4:88/archives/tini-static-amd64
## Use tini as subreaper in Docker container to adopt zombie processes
RUN curl -fsSL ${TINI_DOWNURL} -o /usr/local/bin/tini && \
chmod +x /usr/local/bin/tini && \
echo "${TINI_SHA} /usr/local/bin/tini" | sha256sum -c -
ENV RABBITMQ_USER rabbitmq
ENV RABBITMQ_GROUP rabbitmq
ENV RABBITMQ_VERSION 3.6.6
ENV RABBITMQ_DATA /var/lib/rabbitmq
ARG RABBITMQ_DOWNURL=http://192.168.251.4:88/archives/rabbitmq-server_${RABBITMQ_VERSION}-1_all.deb
RUN curl -s ${RABBITMQ_DOWNURL} > /rabbitmq-server_${RABBITMQ_VERSION}-1_all.deb && \
dpkg -i /rabbitmq-server_${RABBITMQ_VERSION}-1_all.deb && \
rm -rf /rabbitmq-server_${RABBITMQ_VERSION}-1_all.deb
RUN mkdir -p /etc/bash_completion.d ${RABBITMQ_DATA} /etc/rabbitmq && \
echo '[ { rabbit, [ { loopback_users, [ ] } ] } ].' > /etc/rabbitmq/rabbitmq.config && \
chown -R ${RABBITMQ_USER}.${RABBITMQ_GROUP} ${RABBITMQ_DATA} /etc/rabbitmq
COPY ./rabbitmq/rabbitmqadmin /usr/local/bin/rabbitmqadmin
COPY ./rabbitmq/rabbitmqadmin.completion /etc/bash_completion.d/rabbitmqadmin
COPY ./rabbitmq/rabbitmq-declare /
COPY ./rabbitmq/entrypoint.sh /
RUN chmod +x /usr/local/bin/rabbitmqadmin /rabbitmq-declare && \
chmod +x /entrypoint.sh && \
rabbitmq-plugins enable --offline rabbitmq_management && \
echo '' >> /etc/profile && \
echo 'source /etc/bash_completion.d/rabbitmqadmin' >> /etc/profile
VOLUME ${RABBITMQ_DATA}
EXPOSE 4369
EXPOSE 5672-5674
EXPOSE 15672-15674
EXPOSE 25672-25674
HEALTHCHECK --interval=1m --timeout=10s \
CMD curl -s -f http://localhost:15672 1>/dev/null 2>/dev/null || exit 1
ENTRYPOINT ["/usr/local/bin/tini", "--", "/entrypoint.sh"]

RabbitMQ 脚本

entrypoint.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#!/usr/bin/env bash
## --------------------------------------------------
## Filename: entrypoint.sh
## Revision: latest stable
## Author: Mallux
## E-mail: hj.mallux@gmail.com
## Blog: blog.mallux.me
## Description:
## --------------------------------------------------
## Copyright © 2014-2018 Mallux
#set -x #-e
## Exit
trap "__shutdown" EXIT
rabbitmq_nodefile="/.rabbitmq_cluster_done"
rabbitmq_userfile="/.rabbitmq_adduser_done"
__shutdown() {
for node in $(cat ${rabbitmq_nodefile}) ; do
RABBITMQ_NODENAME=$node
rabbitmqctl -n ${RABBITMQ_NODENAME} stop
done
echo "=> Done!"
}
set -- "$@" "${ENTRYPOINT_OPTS}" ; args_array=( $@ )
## starting up rabbitmq service
function gosu_rabbitmq {
for arg in ${args_array[@]} ; do
case "$arg" in
--cluster-nodes=*)
CLUSTER_NODES=$(echo $arg | awk -F'=' '{ print $2 }')
;;
--cluster-hostname=*)
CLUSTER_HOSTNAME=$(echo $arg | awk -F'=' '{ print $2 }')
;;
esac
shift
done
chown -R ${RABBITMQ_USER}.${RABBITMQ_USER} /etc/rabbitmq ${RABBITMQ_DATA}
: ${RABBITMQ_NODE_PORT:=5672}
: ${RABBITMQ_DEFAULT_USER:=admin}
: ${RABBITMQ_DEFAULT_PASS:=admin}
for node in $(seq 1 $CLUSTER_NODES) ; do
RABBITMQ_NODENAME="rabbit_${node}@${CLUSTER_HOSTNAME}"
MANAGEMENT_PORT="$[RABBITMQ_NODE_PORT+10000]"
RABBITMQ_DIST_PORT="$[RABBITMQ_NODE_PORT+20000]"
RABBITMQ_NODE_PORT=${RABBITMQ_NODE_PORT} RABBITMQ_DIST_PORT=${RABBITMQ_DIST_PORT} \
RABBITMQ_NODENAME=$RABBITMQ_NODENAME \
RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,$MANAGEMENT_PORT}]" \
rabbitmq-server -detached
RABBITMQ_NODE_PORT=$[RABBITMQ_NODE_PORT+1]
MANAGEMENT_PORT="${MANAGEMENT_PORT+1}"
RET=1
while [[ $RET -ne 0 ]] ; do
echo "=> Waiting for confirmation of $RABBITMQ_NODENAME service startup"
sleep 5
rabbitmqctl -n $RABBITMQ_NODENAME list_users >/dev/null 2>&1
RET=$?
done
if [ ! -f $rabbitmq_userfile ] ; then
rabbitmqctl -n $RABBITMQ_NODENAME delete_user guest
rabbitmqctl -n $RABBITMQ_NODENAME add_user $RABBITMQ_DEFAULT_USER $RABBITMQ_DEFAULT_PASS
rabbitmqctl -n $RABBITMQ_NODENAME set_user_tags $RABBITMQ_DEFAULT_USER administrator
rabbitmqctl -n $RABBITMQ_NODENAME set_permissions -p "/" $RABBITMQ_DEFAULT_USER ".*" ".*" ".*"
fi
touch $rabbitmq_nodefile
grep -q "$RABBITMQ_NODENAME" $rabbitmq_nodefile
if [ $? -gt 0 ] ; then
case "$node" in
1)
## Disc node
JOIN_CLUSTER_NODENAME="$RABBITMQ_NODENAME"
echo "$RABBITMQ_NODENAME" > $rabbitmq_nodefile
echo "" >> /etc/profile
echo "export RABBITMQ_NODENAME=$RABBITMQ_NODENAME" >> /etc/profile
;;
2)
## Disc node
rabbitmqctl -n $RABBITMQ_NODENAME stop_app
rabbitmqctl -n $RABBITMQ_NODENAME reset
rabbitmqctl -n $RABBITMQ_NODENAME join_cluster $JOIN_CLUSTER_NODENAME
rabbitmqctl -n $RABBITMQ_NODENAME start_app
sed -i "1i $RABBITMQ_NODENAME" $rabbitmq_nodefile
;;
*)
## RAM node
rabbitmqctl -n $RABBITMQ_NODENAME stop_app
rabbitmqctl -n $RABBITMQ_NODENAME reset
rabbitmqctl -n $RABBITMQ_NODENAME join_cluster --ram $JOIN_CLUSTER_NODENAME
rabbitmqctl -n $RABBITMQ_NODENAME start_app
sed -i "1i $RABBITMQ_NODENAME" $rabbitmq_nodefile
;;
esac
fi
done
echo "=> Done!" ; touch $rabbitmq_userfile $rabbitmq_nodefile
}
gosu_rabbitmq "$@"
while : ; do
## wait for 60 seconds
sleep 60
## check your rabbitmq server is running.
cluster_exist_nodes=$(ps -ef | grep rabbitmq_server | grep -v grep | wc -l)
[ x"$cluster_exist_nodes" != x"$CLUSTER_NODES" ] && {
echo "your rabbitmq cluster or partial node has stop, please restart it"
#gosu_rabbitmq $CLUSTER_NODE
}
done

rabbitmq-declare:交换器、队列、绑定声明脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#!/usr/bin/env bash
### --------------------------------------------------
### Filename: rabbitmq-declare
### Revision: latest stable
### Author: Mallux
### E-mail: hj.mallux@gmail.com
### Blog: blog.mallux.me
### Description:
### --------------------------------------------------
### Copyright © 2014-2016 Mallux
### Debug
#set -x
admin_user="admin" ; admin_passwd="admin" ; admin_role="administrator"
app_user="appuser" ; app_passwd="appasswd" ; app_role="administrator"
declare -A vhost_array=(
[app]="app"
[usercenter]="usercenter"
[lender]="leneder"
)
declare -A queue_exchange=(
### app
[sms_consumer@app]="rocket2.events"
[sms_failure@app]=""
[app.queue.partner.doc@app]="app.partner.doc"
[appx.queue.apply.req@app]="appx.apply.req"
[appx.queue.apply.rsq@app]="appx.apply.rsp"
[appx.queue.quota.req@app]="appx.quota.rsp"
### usercenter
[loan.application.change.queue@usercenter]="loan.application.change.exchange"
)
rabbitmqadmin_cmd() {
rabbitmqadmin -u $admin_user -p $admin_passwd "$@"
}
declare_user() {
eval rabbitmqadmin_cmd declare user name=$app_user password=$app_passwd tags=$app_role
}
declare_vhost() {
for key in ${!vhost_array[@]}
do
vHost=${vhost_array[$key]}
eval rabbitmqadmin_cmd declare vhost name=${vHost}
for vUser in $admin_user $app_user
do
eval rabbitmqadmin_cmd declare permission vhost=$vHost user=$vUser \
configure=".*" write=".*" read=".*"
done
done
}
declare_queue_exchange_binding() {
for key in ${!queue_exchange[@]}
do
queue_vHost=$key ; queue=${queue_vHost%@*} ; vHost=${queue_vHost##*@}
exchange=${queue_exchange[$key]}
eval rabbitmqadmin_cmd -V "$vHost" declare queue name=$queue
if [ x"$exchange" != x"" ]
then
case "$exchange" in
*)
exchange_type="fanout"
;;
esac
eval rabbitmqadmin_cmd -V "$vHost" declare queue name=$queue
eval rabbitmqadmin_cmd -V "$vHost" declare exchange name=$exchange type=$exchange_type
eval rabbitmqadmin_cmd -V "$vHost" declare binding source=$exchange destination=$queue
fi
done
}
### Main
__main__() {
declare_user
declare_vhost
declare_queue_exchange_binding
}
__main__

END