go操作rabbitmq
简单案例(默认交换机)
使用默认交换机的时候要用队列name作为key
- 生产者 通过指定路由键来发送消息,这里的路由键是队列名称。
- 消费者 必须监听相同的路由键(即队列名称),以接收生产者发送的消息。
生产者
1 | |
消费者
1 | |
这段代码将消息发布到默认交换机,使用队列名称作为路由键,不要求消息必须立即被消费,也不要求消息一定要被路由到队列。消息被标记为持久的,并以纯文本格式发送。
交换机(fanout模式)
消费者可以创建交换机,防止 启动消费者的时候没有交换机导致的错误
生产者
1 | |
消费者
1 | |
路由(direct模式)
绑定可以采用额外的routing_key参数。绑定密钥的含义取决于交换器的类型。我们以前使用的fanout交换器只是忽略了这个值
1 | |
绑定类型
直连交换器
一个key仅仅和一个queue绑定
多重绑定
多个key可以和一个queue绑定
example
生产者
1 | |
消费者
1 | |
主题(topic模式)
绑定类型
发送到topic交换器的消息不能具有随意的routing_key——它必须是单词列表,以点分隔。这些词可以是任何东西,但通常它们指定与消息相关的某些功能。一些有效的routing_key示例:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。routing_key中可以包含任意多个单词,最多255个字节。
绑定键也必须采用相同的形式。topic交换器背后的逻辑类似于direct交换器——用特定路由键发送的消息将传递到所有匹配绑定键绑定的队列。但是,绑定键有两个重要的特殊情况:
*(星号)可以代替一个单词。#(井号)可以替代零个或多个单词。
topic交换器功能强大,可以像其他交换器一样运行。
当队列用“
#”(井号)绑定键绑定时,它将接收所有消息,而与路由键无关,就像在fanout交换器中一样。当在绑定中不使用特殊字符“
*”(星号)和“#”(井号)时,topic交换器的行为就像direct交换器一样。
example
生产者
1 | |
消费者
1 | |
RPC
使用默认交换机
通常,通过RabbitMQ进行RPC很容易。客户端发送请求消息,服务器发送响应消息。为了接收响应,我们需要发送带有“回调”队列地址的请求。我们可以使用默认队列。
1 | |
我们的RPC工作流程如下:
- 客户端启动时,它将创建一个匿名排他回调队列。
- 对于RPC请求,客户端发送一条消息,该消息具有两个属性:
reply_to(设置为回调队列)和correlation_id(设置为每个请求的唯一值)。 - 该请求被发送到
rpc_queue队列。 - RPC工作程序(又名:服务器)正在等待该队列上的请求。当出现请求时,它会完成计算工作并把结果作为消息使用
replay_to字段中的队列发回给客户端。 - 客户端等待回调队列上的数据。出现消息时,它将检查
correlation_id属性。如果它与请求中的值匹配,则将响应返回给应用程序。
example
服务器
1 | |
客户端
1 | |
总结
- 声明队列是幂等的——仅当队列不存在时才创建。消息内容是一个字节数组,因此你可以在此处编码任何内容。
各种参数的含义
ExchangeDeclare 参数
当你声明一个交换器时,你定义了一个消息传递的环节,它决定了消息如何根据路由键和绑定键被路由到队列中。
- name: 交换器的名称,这在后续操作中用于指向这个特定的交换器。
- type: 交换器的类型,决定了消息路由的行为。常见类型有
direct,topic,fanout, 和headers。topic类型允许你根据多重条件(路由键的模式匹配)来路由消息。 - durable: 如果设置为
true,交换器将在服务器重启后继续存在。这有助于确保消息不会因为服务器重启而丢失。 - auto-delete: 如果设置为
true,当最后一个绑定到交换器上的队列被删除后,交换器也会自动删除。 - internal: 如果设置为
true,交换器不能被客户端直接用于消息发布,只能被其他交换器用于交换器到交换器的绑定。 - no-wait: 如果设置为
true,服务器将不会响应方法。客户端不会等待声明完成的确认。 - arguments: 一些交换器类型特有的参数,用于扩展AMQP的功能。
Publish 参数
用于发布消息到交换器。
- exchange: 要发布到的交换器名称。消息会被路由到与此交换器绑定的队列。
- routing key: 用于路由消息的键。根据交换器类型的不同,这个键可以用于不同的匹配规则。
- mandatory: 如果设置为
true,当消息无法路由到队列时,它将返回给发送者。 - immediate: 这是一个不再被使用的参数,用于指示如果没有消费者立即可用来处理消息,就将消息返回给发送者。建议不要使用它,因为它可能不被所有服务器支持。
- Publishing: 消息的内容和属性,比如
ContentType和Body。
QueueDeclare 参数
声明一个队列,如果它不存在,则创建。
- name: 队列的名称。如果留空,服务器将为队列分配一个随机名称。
- durable: 如果设置为
true,队列将在服务器重启后继续存在。 - delete when unused: 如果设置为
true,当没有任何消费者时,队列会被自动删除。 - exclusive: 如果设置为
true,队列将只对首次声明它的连接可见,并在连接关闭时自动删除。 - no-wait: 类似于交换器的
no-wait,如果设置为true,服务器不会对这个操作发送确认。 - arguments: 一些队列特有的参数,用于扩展AMQP的功能。
Consume 参数
用于接收消息的消费者设置。
- queue: 从哪个队列接收消息。
- consumer: 消费者标签,用于区分多个消费者。
- auto ack: 如果设置为
true,服务器会自动确认收到的消息,这意味着一旦消息被送达,它就会从队列中移除。如果设置为false,消费者需要显式地发送acknowledgment来确认已经处理了消息。 - exclusive: 如果设置为
true,这个消费者将成为队列的唯一消费者。 - no local: 如果设置为
true,服务器将不会将消息发送给发布它们的连接。 - no wait: 类似于其他
no-wait,如果设置为true,不等待服务器的响应。 - args: 消费者特有的参数。
持久性的含义
- 交换器的持久性(Durable Exchange):当一个交换器被声明为持久的时,它指的是交换器的定义和配置信息在服务器重启后仍然存在。这样,你就不需要在每次服务器启动时重新声明交换器的配置。但这与交换器内的消息无关,因为交换器不存储消息。
- 队列的持久性(Durable Queue):声明一个队列为持久的,意味着队列的定义将在服务器重启后仍然存在。更重要的是,只有当队列是持久的,并且消息也被标记为持久的(即在发布时将
delivery_mode属性设置为2),这些消息才会在服务器重启后保持在队列中。需要注意的是,即使队列是持久的,如果消息没有被标记为持久的,那么在服务器重启时这些消息还是会丢失。
保证消息持久
- 声明一个持久的队列:在创建队列时,将
durable参数设置为true。这表示队列会在服务器重启后继续存在,并且能够存储持久化消息。 - 发送消息时设置消息为持久:在发布消息时,需要将消息的
delivery_mode属性设置为2(表示消息是持久的)。这确保了消息即使在服务器重启的情况下也不会丢失,前提是它们存储在持久的队列中。
控制消息流向消费者
默认情况下,RabbitMQ将按顺序将每个消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。
在一个有两个worker的情况下,当所有的奇数消息都是重消息而偶数消息都是轻消息时,一个worker将持续忙碌,而另一个worker几乎不做任何工作。嗯,RabbitMQ对此一无所知,仍然会均匀地发送消息。
设置预取计数(prefetch count)是一种控制消息流向消费者(workers)的有效方式,特别是在处理涉及多个消费者的任务分发时。这个设置影响了 RabbitMQ 如何平衡任务负载,确保工作不会被过载给某个特定的消费者,而是更公平地分配给所有可用的消费者。
1 | |
- prefetch count: 控制 RabbitMQ 发送给单个消费者的消息数量,直到该消费者确认(ack)一些消息之前。设置为
1意味着 RabbitMQ 在消费者确认处理完当前消息之前,不会给它发送新的消息,这样可以确保工作负载在多个消费者之间公平分配。 - prefetch size: 控制 RabbitMQ 发送给消费者的消息总体大小的限制。设置为
0表示不对消息体大小做限制。 - global: 指示预取设置是应用于整个通道上的所有消费者(
true),还是仅仅应用于当前命令调用的消费者(false)。对于大多数应用场景,将其设置为false表示这个限制是针对每个消费者的。