lapin
是一个用 rust 实现的 AMQP 客户端,它可以实现与 RabbitMQ 的交互。
依赖
[dependencies]
# ...
lapin = "2"
tokio-executor-trait = "2"
tokio-reactor-trait = "1"
角色
消息队列一般分为2种角色:
- 消息生产者:将消息发送给消息队列
- 消息消费者:从消息队列中,将消息取出
基础使用
发送消息(生产者)
// src/rabbitmq/basic.rs
pub async fn send(dsn: &str, queue_name: &str, payload: &str) -> Result<Confirmation> {
// 定义连接属性
let options = ConnectionProperties::default()
.with_executor(tokio_executor_trait::Tokio::current())
.with_reactor(tokio_reactor_trait::Tokio);
// 连接到服务器
let conn = Connection::connect(dsn, options)
.await
.map_err(Error::from)?;
// 创建管道
let chan = conn.create_channel().await.map_err(Error::from)?;
//定义队列
let queue = chan
.queue_declare(
queue_name,
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.map_err(Error::from)?;
// 把要发送的数据转成字节数组
let payload = payload.as_bytes();
// 发布消息
chan.basic_publish(
"",
queue.name().as_str(),
BasicPublishOptions::default(),
payload,
BasicProperties::default(),
)
.await
.map_err(Error::from)?
.await
.map_err(Error::from)
}
- 定义连接属性:由于我们要集成到 tokio 中,所以这一步是必须的。在这个过程中,通过
with_executor()
和with_reactor()
,告诉lapin
,我们的异步运行时是 tokio - 连接到服务器:通过
Connection::connect(dsn, options)
就可以连接到 RabbitMQ 的服务器了dsn
:连接字符串,格式为:amqp://[用户名][:密码@][主机][:端口][/虚拟机]
,本地连接的话,通常简写成:amqp://127.0.0.1:5672
options
:就是上一步定义的连接属性
- 创建管道:使用
conn.create_channel()
来创建管道 - 定义队列:使用
chan.queue_declare()
来定义队列 - 发布消息:使用
chan.basic_publish()
来发布消息
测试
#[tokio::test]
async fn test_basic_send() {
let dsn = get_dsn().unwrap();
for i in 0..10 {
let msg = format!("#{} AXUM中文网-axum.rs", i);
let confirm = super::send(&dsn, QUEUE_NAME, &msg).await;
match confirm {
Ok(_) => tracing::info!("[x] 消息已发送成功!{}", msg),
Err(e) => tracing::error!("{:?}", e),
};
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
接收消息(消费者)
/// 接收消息
pub async fn receive<D: ConsumerDelegate + 'static>(
dsn: &str,
queue_name: &str,
tag: &str,
delegate: D,
) -> Result<()> {
// 定义连接属性
let options = ConnectionProperties::default()
.with_executor(tokio_executor_trait::Tokio::current())
.with_reactor(tokio_reactor_trait::Tokio);
// 连接到服务器
let conn = Connection::connect(dsn, options)
.await
.map_err(Error::from)?;
// 创建管道
let chan = conn.create_channel().await.map_err(Error::from)?;
//定义队列
let queue = chan
.queue_declare(
queue_name,
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.map_err(Error::from)?;
// 定义消息的消费者
let consumer = chan
.basic_consume(
queue.name().as_str(),
tag,
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.map_err(Error::from)?;
tracing::debug!("bbb");
consumer.set_delegate(delegate);
conn.run().map_err(Error::from)
}
- 泛型
<D: ConsumerDelegate + 'static>
:你可以把它理解为如何消费这个消息,通常通过这个泛型参数来传递业务逻辑。 - 前面几步和发布消息一样,这里不重复。
- 定义消息的消费者:通过
chan.basic_consume()
来定义一个消费者tag
参数:给它取个名字 ,用于标识该消费者
- 给消费者设置委托,其参数就是那个泛型。
consumer.set_delegate(delegate)
conn.run()
:让消费者持续运行
测试
注意:
move |delivery: DeliveryResult| async move {
tracing::debug!("aaa");
let delivery = match delivery {
Ok(Some(delivery)) => delivery,
Ok(None) => {
tracing::error!("None ");
return;
}
Err(err) => {
tracing::error!("Failed to consume queue message {}", err);
return;
}
};
let message = String::from_utf8_lossy(&delivery.data);
tracing::info!("Received a message: {}", message);
delivery.ack(BasicAckOptions::default()).await.unwrap();
}
这段代码就是那个泛型参数,它是一个匿名函数:
- 首先判断是否有消息
- 然后就是消费这个消息,我们的案例中很简单,就是把消息内容打印出来
delivery.ack(BasicAckOptions::default())
:确认消息,告诉 RabbitMQ,此条消息已经确认收到。
使用 topic
为了使消息更具组织性,我们可以通过 topic
给不同业务的消费进行分组。
发送消息
pub async fn send(
dsn: &str,
exchange: &str,
queue_name: &str,
routing_key: &str,
payload: &str,
) -> Result<Confirmation> {
let options = ConnectionProperties::default()
.with_executor(tokio_executor_trait::Tokio::current())
.with_reactor(tokio_reactor_trait::Tokio);
let conn = Connection::connect(dsn, options)
.await
.map_err(Error::from)?;
let chan = conn.create_channel().await.map_err(Error::from)?;
chan.exchange_declare(
exchange,
lapin::ExchangeKind::Topic,
ExchangeDeclareOptions::default(),
FieldTable::default(),
)
.await
.map_err(Error::from)?;
let queue = chan
.queue_declare(
queue_name,
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.map_err(Error::from)?;
chan.queue_bind(
queue.name().as_str(),
exchange,
routing_key,
QueueBindOptions::default(),
FieldTable::default(),
)
.await
.map_err(Error::from)?;
let payload = payload.as_bytes();
chan.basic_publish(
exchange,
routing_key,
BasicPublishOptions::default(),
payload,
BasicProperties::default(),
)
.await
.map_err(Error::from)?
.await
.map_err(Error::from)
}
测试
#[tokio::test]
async fn test_topic_send() {
let dsn = get_dsn().unwrap();
for i in 0..10 {
let msg = format!("#{} AXUM中文网-axum.rs", i);
let confirm = super::send(&dsn, EXCHANGE_NAME, QUEUE_NAME, ROUTING_KEY, &msg).await;
match confirm {
Ok(_) => tracing::info!("[x] 消息已发送成功!{}", msg),
Err(e) => tracing::error!("{:?}", e),
};
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
接收消息
pub async fn receive<D: ConsumerDelegate + 'static>(
dsn: &str,
exchange: &str,
queue_name: &str,
routing_key: &str,
tag: &str,
delegate: D,
) -> Result<()> {
let options = ConnectionProperties::default()
.with_executor(tokio_executor_trait::Tokio::current())
.with_reactor(tokio_reactor_trait::Tokio);
let conn = Connection::connect(dsn, options)
.await
.map_err(Error::from)?;
let chan = conn.create_channel().await.map_err(Error::from)?;
chan.exchange_declare(
exchange,
lapin::ExchangeKind::Topic,
ExchangeDeclareOptions::default(),
FieldTable::default(),
)
.await
.map_err(Error::from)?;
let queue = chan
.queue_declare(
queue_name,
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.map_err(Error::from)?;
chan.queue_bind(
queue.name().as_str(),
exchange,
routing_key,
QueueBindOptions::default(),
FieldTable::default(),
)
.await
.map_err(Error::from)?;
let consumer = chan
.basic_consume(
queue.name().as_str(),
tag,
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.map_err(Error::from)?;
consumer.set_delegate(delegate);
conn.run().map_err(Error::from)
}
测试
#[tokio::test]
async fn test_topic_receive() {
let dsn = get_dsn().unwrap();
super::receive(
&dsn,
EXCHANGE_NAME,
QUEUE_NAME,
ROUTING_KEY,
"TESTER",
move |delivery: DeliveryResult| async move {
tracing::debug!("aaa");
let delivery = match delivery {
Ok(Some(delivery)) => delivery,
Ok(None) => {
tracing::error!("None ");
return;
}
Err(err) => {
tracing::error!("Failed to consume queue message {}", err);
return;
}
};
let message = String::from_utf8_lossy(&delivery.data);
tracing::info!("Received a message: {}", message);
delivery.ack(BasicAckOptions::default()).await.unwrap();
},
)
.await
.unwrap();
}
本章代码位于01/lapin分支。