内容介绍
本专题将带你使用逐步实现一个非常常见的功能:在用户注册时,通过发送验证码。RabbitMQ 消息队列
RabbitMQ 是目前市场上最流行的消息队列之一,本章将讨论如何安装部署 RabbitMQ。通过 lapin 集成 RabbitMQ
`lapin` 是一个用 rust 实现的 AMQP 客户端,它可以实现与 RabbitMQ 的交互。通过 lettre 发送邮件
本章将讨论使用 `lettre` 在 rust 中实现发送邮件。我们将分别使用 Gmail 和 Mail.ee 来作测试。实现用户注册与激活
本章将实现最终的功能:用户注册,并发送激活邮件。
通过 lapin 集成 RabbitMQ
lapin
是一个用 rust 实现的 AMQP 客户端,它可以实现与 RabbitMQ 的交互。
依赖
[dependencies]
# ...
lapin = "2"
tokio-executor-trait = "2"
tokio-reactor-trait = "1"
角色
- 消息生产者:将消息发送给消息队列
- 消息消费者:从消息队列中,将消息取出
基础使用
发送消息(生产者)
// src/rabbitmq/basic.rs
pub async fn send(dsn: &str, queue_name: &str, payload: &str) -> Result<Confirmation> {
// 定义连接属性
let options = ConnectionProperties::default()
.with_executor(tokio_executor_trait::Tokio::current())
.with_reactor(tokio_reactor_trait::Tokio);
// 连接到服务器
let conn = Connection::connect(dsn, options)
.await
.map_err(Error::from)?;
// 创建管道
let chan = conn.create_channel().await.map_err(Error::from)?;
//定义队列
let queue = chan
.queue_declare(
queue_name,
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.map_err(Error::from)?;
// 把要发送的数据转成字节数组
let payload = payload.as_bytes();
// 发布消息
chan.basic_publish(
"",
queue.name().as_str(),
BasicPublishOptions::default(),
payload,
BasicProperties::default(),
)
.await
.map_err(Error::from)?
.await
.map_err(Error::from)
}
dsn
:连接字符串,格式为:amqp://[用户名][:密码@][主机][:端口][/虚拟机]
,本地连接的话,通常简写成:amqp://127.0.0.1:5672
options
:就是上一步定义的连接属性
测试
#[tokio::test]
async fn test_basic_send() {
let dsn = get_dsn().unwrap();
for i in 0..10 {
let msg = format!("#{} AXUM中文网-axum.rs", i);
let confirm = super::send(&dsn, QUEUE_NAME, &msg).await;
match confirm {
Ok(_) => tracing::info!("[x] 消息已发送成功!{}", msg),
Err(e) => tracing::error!("{:?}", e),
};
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
接收消息(消费者)
/// 接收消息
pub async fn receive<D: ConsumerDelegate + 'static>(
dsn: &str,
queue_name: &str,
tag: &str,
delegate: D,
) -> Result<()> {
// 定义连接属性
let options = ConnectionProperties::default()
.with_executor(tokio_executor_trait::Tokio::current())
.with_reactor(tokio_reactor_trait::Tokio);
// 连接到服务器
let conn = Connection::connect(dsn, options)
.await
.map_err(Error::from)?;
// 创建管道
let chan = conn.create_channel().await.map_err(Error::from)?;
//定义队列
let queue = chan
.queue_declare(
queue_name,
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.map_err(Error::from)?;
// 定义消息的消费者
let consumer = chan
.basic_consume(
queue.name().as_str(),
tag,
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.map_err(Error::from)?;
tracing::debug!("bbb");
consumer.set_delegate(delegate);
conn.run().map_err(Error::from)
}
tag
参数:给它取个名字 ,用于标识该消费者
测试
#[tokio::test]
async fn test_basic_receive() {
let dsn = get_dsn().unwrap();
super::receive(
&dsn,
QUEUE_NAME,
"TESTER",
move |delivery: DeliveryResult| async move {
tracing::debug!("aaa");
let delivery = match delivery {
Ok(Some(delivery)) => delivery,
Ok(None) => {
tracing::error!("None ");
return;
}
Err(err) => {
tracing::error!("Failed to consume queue message {}", err);
return;
}
};
let message = String::from_utf8_lossy(&delivery.data);
tracing::info!("Received a message: {}", message);
delivery.ack(BasicAckOptions::default()).await.unwrap();
},
)
.await
.unwrap();
}
注意:
move |delivery: DeliveryResult| async move {
tracing::debug!("aaa");
let delivery = match delivery {
Ok(Some(delivery)) => delivery,
Ok(None) => {
tracing::error!("None ");
return;
}
Err(err) => {
tracing::error!("Failed to consume queue message {}", err);
return;
}
};
let message = String::from_utf8_lossy(&delivery.data);
tracing::info!("Received a message: {}", message);
delivery.ack(BasicAckOptions::default()).await.unwrap();
}
这段代码就是那个泛型参数,它是一个匿名函数:
- 首先判断是否有消息
- 然后就是消费这个消息,我们的案例中很简单,就是把消息内容打印出来
delivery.ack(BasicAckOptions::default())
:确认消息,告诉 RabbitMQ,此条消息已经确认收到。
使用 topic
为了使消息更具组织性,我们可以通过 topic
给不同业务的消费进行分组。
pub async fn send(
dsn: &str,
exchange: &str,
queue_name: &str,
routing_key: &str,
payload: &str,
) -> Result<Confirmation> {
let options = ConnectionProperties::default()
.with_executor(tokio_executor_trait::Tokio::current())
.with_reactor(tokio_reactor_trait::Tokio);
let conn = Connection::connect(dsn, options)
.await
.map_err(Error::from)?;
let chan = conn.create_channel().await.map_err(Error::from)?;
chan.exchange_declare(
exchange,
lapin::ExchangeKind::Topic,
ExchangeDeclareOptions::default(),
FieldTable::default(),
)
.await
.map_err(Error::from)?;
let queue = chan
.queue_declare(
queue_name,
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.map_err(Error::from)?;
chan.queue_bind(
queue.name().as_str(),
exchange,
routing_key,
QueueBindOptions::default(),
FieldTable::default(),
)
.await
.map_err(Error::from)?;
let payload = payload.as_bytes();
chan.basic_publish(
exchange,
routing_key,
BasicPublishOptions::default(),
payload,
BasicProperties::default(),
)
.await
.map_err(Error::from)?
.await
.map_err(Error::from)
}
和基础使用中的差不多,无非多了几个参数,它们的作用其实就是为了标识消息的分组。
测试
#[tokio::test]
async fn test_topic_send() {
let dsn = get_dsn().unwrap();
for i in 0..10 {
let msg = format!("#{} AXUM中文网-axum.rs", i);
let confirm = super::send(&dsn, EXCHANGE_NAME, QUEUE_NAME, ROUTING_KEY, &msg).await;
match confirm {
Ok(_) => tracing::info!("[x] 消息已发送成功!{}", msg),
Err(e) => tracing::error!("{:?}", e),
};
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
接收消息
pub async fn receive<D: ConsumerDelegate + 'static>(
dsn: &str,
exchange: &str,
queue_name: &str,
routing_key: &str,
tag: &str,
delegate: D,
) -> Result<()> {
let options = ConnectionProperties::default()
.with_executor(tokio_executor_trait::Tokio::current())
.with_reactor(tokio_reactor_trait::Tokio);
let conn = Connection::connect(dsn, options)
.await
.map_err(Error::from)?;
let chan = conn.create_channel().await.map_err(Error::from)?;
chan.exchange_declare(
exchange,
lapin::ExchangeKind::Topic,
ExchangeDeclareOptions::default(),
FieldTable::default(),
)
.await
.map_err(Error::from)?;
let queue = chan
.queue_declare(
queue_name,
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.map_err(Error::from)?;
chan.queue_bind(
queue.name().as_str(),
exchange,
routing_key,
QueueBindOptions::default(),
FieldTable::default(),
)
.await
.map_err(Error::from)?;
let consumer = chan
.basic_consume(
queue.name().as_str(),
tag,
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.map_err(Error::from)?;
consumer.set_delegate(delegate);
conn.run().map_err(Error::from)
}
也是大同小异,多了几个标识分组的名字。
测试
本章代码位于01/lapin分支。