通过 lapin 集成 RabbitMQ

431
2023/06/15 18:55:27

lapin 是一个用 rust 实现的 AMQP 客户端,它可以实现与 RabbitMQ 的交互。

依赖

[dependencies]
# ...
lapin = "2"
tokio-executor-trait = "2"
tokio-reactor-trait = "1"

角色

消息队列一般分为2种角色:

  • 消息生产者:将消息发送给消息队列
  • 消息消费者:从消息队列中,将消息取出

基础使用

发送消息(生产者)

// src/rabbitmq/basic.rs

pub async fn send(dsn: &str, queue_name: &str, payload: &str) -> Result<Confirmation> {
    // 定义连接属性
    let options = ConnectionProperties::default()
        .with_executor(tokio_executor_trait::Tokio::current())
        .with_reactor(tokio_reactor_trait::Tokio);

    // 连接到服务器
    let conn = Connection::connect(dsn, options)
        .await
        .map_err(Error::from)?;
    // 创建管道
    let chan = conn.create_channel().await.map_err(Error::from)?;

    //定义队列
    let queue = chan
        .queue_declare(
            queue_name,
            QueueDeclareOptions::default(),
            FieldTable::default(),
        )
        .await
        .map_err(Error::from)?;

    // 把要发送的数据转成字节数组
    let payload = payload.as_bytes();

    // 发布消息
    chan.basic_publish(
        "",
        queue.name().as_str(),
        BasicPublishOptions::default(),
        payload,
        BasicProperties::default(),
    )
    .await
    .map_err(Error::from)?
    .await
    .map_err(Error::from)
}
  • 定义连接属性:由于我们要集成到 tokio 中,所以这一步是必须的。在这个过程中,通过 with_executor()with_reactor(),告诉 lapin ,我们的异步运行时是 tokio
  • 连接到服务器:通过 Connection::connect(dsn, options)就可以连接到 RabbitMQ 的服务器了
    • dsn:连接字符串,格式为:amqp://[用户名][:密码@][主机][:端口][/虚拟机],本地连接的话,通常简写成:amqp://127.0.0.1:5672
    • options:就是上一步定义的连接属性
  • 创建管道:使用 conn.create_channel() 来创建管道
  • 定义队列:使用chan.queue_declare() 来定义队列
  • 发布消息:使用chan.basic_publish()来发布消息

测试

接收消息(消费者)

/// 接收消息
pub async fn receive<D: ConsumerDelegate + 'static>(
    dsn: &str,
    queue_name: &str,
    tag: &str,
    delegate: D,
) -> Result<()> {
    // 定义连接属性
    let options = ConnectionProperties::default()
        .with_executor(tokio_executor_trait::Tokio::current())
        .with_reactor(tokio_reactor_trait::Tokio);

    // 连接到服务器
    let conn = Connection::connect(dsn, options)
        .await
        .map_err(Error::from)?;
    // 创建管道
    let chan = conn.create_channel().await.map_err(Error::from)?;

    //定义队列
    let queue = chan
        .queue_declare(
            queue_name,
            QueueDeclareOptions::default(),
            FieldTable::default(),
        )
        .await
        .map_err(Error::from)?;

    // 定义消息的消费者
    let consumer = chan
        .basic_consume(
            queue.name().as_str(),
            tag,
            BasicConsumeOptions::default(),
            FieldTable::default(),
        )
        .await
        .map_err(Error::from)?;
    tracing::debug!("bbb");
    consumer.set_delegate(delegate);
    conn.run().map_err(Error::from)
}
  • 泛型 <D: ConsumerDelegate + 'static>:你可以把它理解为如何消费这个消息,通常通过这个泛型参数来传递业务逻辑。
  • 前面几步和发布消息一样,这里不重复。
  • 定义消息的消费者:通过 chan.basic_consume() 来定义一个消费者
    • tag参数:给它取个名字 ,用于标识该消费者
  • 给消费者设置委托,其参数就是那个泛型。consumer.set_delegate(delegate)
  • conn.run():让消费者持续运行

测试

#[tokio::test]
async fn test_basic_receive() {
    let dsn = get_dsn().unwrap();
    super::receive(
        &dsn,
        QUEUE_NAME,
        "TESTER",
        move |delivery: DeliveryResult| async move {
            tracing::debug!("aaa");
            let delivery = match delivery {
                Ok(Some(delivery)) => delivery,
                Ok(None) => {
                    tracing::error!("None ");
                    return;
                }
                Err(err) => {
                    tracing::error!("Failed to consume queue message {}", err);
                    return;
                }
            };

            let message = String::from_utf8_lossy(&delivery.data);
            tracing::info!("Received a message: {}", message);

            delivery.ack(BasicAckOptions::default()).await.unwrap();
        },
    )
    .await
    .unwrap();
}

注意:

move |delivery: DeliveryResult| async move {
        tracing::debug!("aaa");
        let delivery = match delivery {
            Ok(Some(delivery)) => delivery,
            Ok(None) => {
                tracing::error!("None ");
                return;
            }
            Err(err) => {
                tracing::error!("Failed to consume queue message {}", err);
                return;
            }
        };

        let message = String::from_utf8_lossy(&delivery.data);
        tracing::info!("Received a message: {}", message);

        delivery.ack(BasicAckOptions::default()).await.unwrap();
    }

这段代码就是那个泛型参数,它是一个匿名函数:

  • 首先判断是否有消息
  • 然后就是消费这个消息,我们的案例中很简单,就是把消息内容打印出来
  • delivery.ack(BasicAckOptions::default()):确认消息,告诉 RabbitMQ,此条消息已经确认收到。

使用 topic

发送消息

pub async fn send(
    dsn: &str,
    exchange: &str,
    queue_name: &str,
    routing_key: &str,
    payload: &str,
) -> Result<Confirmation> {
    let options = ConnectionProperties::default()
        .with_executor(tokio_executor_trait::Tokio::current())
        .with_reactor(tokio_reactor_trait::Tokio);

    let conn = Connection::connect(dsn, options)
        .await
        .map_err(Error::from)?;
    let chan = conn.create_channel().await.map_err(Error::from)?;

    chan.exchange_declare(
        exchange,
        lapin::ExchangeKind::Topic,
        ExchangeDeclareOptions::default(),
        FieldTable::default(),
    )
    .await
    .map_err(Error::from)?;

    let queue = chan
        .queue_declare(
            queue_name,
            QueueDeclareOptions::default(),
            FieldTable::default(),
        )
        .await
        .map_err(Error::from)?;

    chan.queue_bind(
        queue.name().as_str(),
        exchange,
        routing_key,
        QueueBindOptions::default(),
        FieldTable::default(),
    )
    .await
    .map_err(Error::from)?;

    let payload = payload.as_bytes();

    chan.basic_publish(
        exchange,
        routing_key,
        BasicPublishOptions::default(),
        payload,
        BasicProperties::default(),
    )
    .await
    .map_err(Error::from)?
    .await
    .map_err(Error::from)
}

和基础使用中的差不多,无非多了几个参数,它们的作用其实就是为了标识消息的分组。

测试

#[tokio::test]
async fn test_topic_send() {
    let dsn = get_dsn().unwrap();
    for i in 0..10 {
        let msg = format!("#{} AXUM中文网-axum.rs", i);
        let confirm = super::send(&dsn, EXCHANGE_NAME, QUEUE_NAME, ROUTING_KEY, &msg).await;
        match confirm {
            Ok(_) => tracing::info!("[x] 消息已发送成功!{}", msg),
            Err(e) => tracing::error!("{:?}", e),
        };

        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
    }
}

接收消息

pub async fn receive<D: ConsumerDelegate + 'static>(
    dsn: &str,
    exchange: &str,
    queue_name: &str,
    routing_key: &str,
    tag: &str,
    delegate: D,
) -> Result<()> {
    let options = ConnectionProperties::default()
        .with_executor(tokio_executor_trait::Tokio::current())
        .with_reactor(tokio_reactor_trait::Tokio);

    let conn = Connection::connect(dsn, options)
        .await
        .map_err(Error::from)?;
    let chan = conn.create_channel().await.map_err(Error::from)?;

    chan.exchange_declare(
        exchange,
        lapin::ExchangeKind::Topic,
        ExchangeDeclareOptions::default(),
        FieldTable::default(),
    )
    .await
    .map_err(Error::from)?;

    let queue = chan
        .queue_declare(
            queue_name,
            QueueDeclareOptions::default(),
            FieldTable::default(),
        )
        .await
        .map_err(Error::from)?;

    chan.queue_bind(
        queue.name().as_str(),
        exchange,
        routing_key,
        QueueBindOptions::default(),
        FieldTable::default(),
    )
    .await
    .map_err(Error::from)?;

    let consumer = chan
        .basic_consume(
            queue.name().as_str(),
            tag,
            BasicConsumeOptions::default(),
            FieldTable::default(),
        )
        .await
        .map_err(Error::from)?;

    consumer.set_delegate(delegate);

    conn.run().map_err(Error::from)
}

也是大同小异,多了几个标识分组的名字。

测试

本章代码位于01/lapin分支。