域名 AXUM.RS 将于2025年10月到期。我们无意再对其进行续费,我们希望你能够接续这个域名,让更多 AXUM 开发者继续受益。
  • 方案1️⃣AXUM.RS 域名 = 3000
  • 方案2️⃣方案1️⃣ + 本站所有专题原始 Markdown 文档 = 5000
  • 方案3️⃣方案2️⃣ + 本站原始数据库 = 5500
如果你有意接续这份 AXUM 情怀,请与我们取得联系。
说明:
  1. 如果有人购买 AXUM.RS 域名(方案1️⃣),或者该域名到期,本站将启用新的免费域名继续提供服务。
  2. 如果有人购买了 AXUM.RS 域名,且同时购买了内容和/或数据库(方案2️⃣/方案3️⃣),本站将关闭。届时我们或许会以另一种方式与你再相遇。

通过 lapin 集成 RabbitMQ

lapin 是一个用 rust 实现的 AMQP 客户端,它可以实现与 RabbitMQ 的交互。

依赖

[dependencies]
# ...
lapin = "2"
tokio-executor-trait = "2"
tokio-reactor-trait = "1"

角色

  • 消息生产者:将消息发送给消息队列
  • 消息消费者:从消息队列中,将消息取出

基础使用

发送消息(生产者)

// src/rabbitmq/basic.rs

pub async fn send(dsn: &str, queue_name: &str, payload: &str) -> Result<Confirmation> {
    // 定义连接属性
    let options = ConnectionProperties::default()
        .with_executor(tokio_executor_trait::Tokio::current())
        .with_reactor(tokio_reactor_trait::Tokio);

    // 连接到服务器
    let conn = Connection::connect(dsn, options)
        .await
        .map_err(Error::from)?;
    // 创建管道
    let chan = conn.create_channel().await.map_err(Error::from)?;

    //定义队列
    let queue = chan
        .queue_declare(
            queue_name,
            QueueDeclareOptions::default(),
            FieldTable::default(),
        )
        .await
        .map_err(Error::from)?;

    // 把要发送的数据转成字节数组
    let payload = payload.as_bytes();

    // 发布消息
    chan.basic_publish(
        "",
        queue.name().as_str(),
        BasicPublishOptions::default(),
        payload,
        BasicProperties::default(),
    )
    .await
    .map_err(Error::from)?
    .await
    .map_err(Error::from)
}
  • dsn:连接字符串,格式为:amqp://[用户名][:密码@][主机][:端口][/虚拟机],本地连接的话,通常简写成:amqp://127.0.0.1:5672
  • options:就是上一步定义的连接属性

测试

#[tokio::test]
async fn test_basic_send() {
    let dsn = get_dsn().unwrap();
    for i in 0..10 {
        let msg = format!("#{} AXUM中文网-axum.rs", i);
        let confirm = super::send(&dsn, QUEUE_NAME, &msg).await;
        match confirm {
            Ok(_) => tracing::info!("[x] 消息已发送成功!{}", msg),
            Err(e) => tracing::error!("{:?}", e),
        };

        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
    }
}

接收消息(消费者)

/// 接收消息
pub async fn receive<D: ConsumerDelegate + 'static>(
    dsn: &str,
    queue_name: &str,
    tag: &str,
    delegate: D,
) -> Result<()> {
    // 定义连接属性
    let options = ConnectionProperties::default()
        .with_executor(tokio_executor_trait::Tokio::current())
        .with_reactor(tokio_reactor_trait::Tokio);

    // 连接到服务器
    let conn = Connection::connect(dsn, options)
        .await
        .map_err(Error::from)?;
    // 创建管道
    let chan = conn.create_channel().await.map_err(Error::from)?;

    //定义队列
    let queue = chan
        .queue_declare(
            queue_name,
            QueueDeclareOptions::default(),
            FieldTable::default(),
        )
        .await
        .map_err(Error::from)?;

    // 定义消息的消费者
    let consumer = chan
        .basic_consume(
            queue.name().as_str(),
            tag,
            BasicConsumeOptions::default(),
            FieldTable::default(),
        )
        .await
        .map_err(Error::from)?;
    tracing::debug!("bbb");
    consumer.set_delegate(delegate);
    conn.run().map_err(Error::from)
}
  • tag参数:给它取个名字 ,用于标识该消费者

测试

#[tokio::test]
async fn test_basic_receive() {
    let dsn = get_dsn().unwrap();
    super::receive(
        &dsn,
        QUEUE_NAME,
        "TESTER",
        move |delivery: DeliveryResult| async move {
            tracing::debug!("aaa");
            let delivery = match delivery {
                Ok(Some(delivery)) => delivery,
                Ok(None) => {
                    tracing::error!("None ");
                    return;
                }
                Err(err) => {
                    tracing::error!("Failed to consume queue message {}", err);
                    return;
                }
            };

            let message = String::from_utf8_lossy(&delivery.data);
            tracing::info!("Received a message: {}", message);

            delivery.ack(BasicAckOptions::default()).await.unwrap();
        },
    )
    .await
    .unwrap();
}

注意:

move |delivery: DeliveryResult| async move {
        tracing::debug!("aaa");
        let delivery = match delivery {
            Ok(Some(delivery)) => delivery,
            Ok(None) => {
                tracing::error!("None ");
                return;
            }
            Err(err) => {
                tracing::error!("Failed to consume queue message {}", err);
                return;
            }
        };

        let message = String::from_utf8_lossy(&delivery.data);
        tracing::info!("Received a message: {}", message);

        delivery.ack(BasicAckOptions::default()).await.unwrap();
    }

这段代码就是那个泛型参数,它是一个匿名函数:

  • 首先判断是否有消息
  • 然后就是消费这个消息,我们的案例中很简单,就是把消息内容打印出来
  • delivery.ack(BasicAckOptions::default()):确认消息,告诉 RabbitMQ,此条消息已经确认收到。

使用 topic

为了使消息更具组织性,我们可以通过 topic 给不同业务的消费进行分组。

pub async fn send(
    dsn: &str,
    exchange: &str,
    queue_name: &str,
    routing_key: &str,
    payload: &str,
) -> Result<Confirmation> {
    let options = ConnectionProperties::default()
        .with_executor(tokio_executor_trait::Tokio::current())
        .with_reactor(tokio_reactor_trait::Tokio);

    let conn = Connection::connect(dsn, options)
        .await
        .map_err(Error::from)?;
    let chan = conn.create_channel().await.map_err(Error::from)?;

    chan.exchange_declare(
        exchange,
        lapin::ExchangeKind::Topic,
        ExchangeDeclareOptions::default(),
        FieldTable::default(),
    )
    .await
    .map_err(Error::from)?;

    let queue = chan
        .queue_declare(
            queue_name,
            QueueDeclareOptions::default(),
            FieldTable::default(),
        )
        .await
        .map_err(Error::from)?;

    chan.queue_bind(
        queue.name().as_str(),
        exchange,
        routing_key,
        QueueBindOptions::default(),
        FieldTable::default(),
    )
    .await
    .map_err(Error::from)?;

    let payload = payload.as_bytes();

    chan.basic_publish(
        exchange,
        routing_key,
        BasicPublishOptions::default(),
        payload,
        BasicProperties::default(),
    )
    .await
    .map_err(Error::from)?
    .await
    .map_err(Error::from)
}

和基础使用中的差不多,无非多了几个参数,它们的作用其实就是为了标识消息的分组。

测试

#[tokio::test]
async fn test_topic_send() {
    let dsn = get_dsn().unwrap();
    for i in 0..10 {
        let msg = format!("#{} AXUM中文网-axum.rs", i);
        let confirm = super::send(&dsn, EXCHANGE_NAME, QUEUE_NAME, ROUTING_KEY, &msg).await;
        match confirm {
            Ok(_) => tracing::info!("[x] 消息已发送成功!{}", msg),
            Err(e) => tracing::error!("{:?}", e),
        };

        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
    }
}

接收消息

pub async fn receive<D: ConsumerDelegate + 'static>(
    dsn: &str,
    exchange: &str,
    queue_name: &str,
    routing_key: &str,
    tag: &str,
    delegate: D,
) -> Result<()> {
    let options = ConnectionProperties::default()
        .with_executor(tokio_executor_trait::Tokio::current())
        .with_reactor(tokio_reactor_trait::Tokio);

    let conn = Connection::connect(dsn, options)
        .await
        .map_err(Error::from)?;
    let chan = conn.create_channel().await.map_err(Error::from)?;

    chan.exchange_declare(
        exchange,
        lapin::ExchangeKind::Topic,
        ExchangeDeclareOptions::default(),
        FieldTable::default(),
    )
    .await
    .map_err(Error::from)?;

    let queue = chan
        .queue_declare(
            queue_name,
            QueueDeclareOptions::default(),
            FieldTable::default(),
        )
        .await
        .map_err(Error::from)?;

    chan.queue_bind(
        queue.name().as_str(),
        exchange,
        routing_key,
        QueueBindOptions::default(),
        FieldTable::default(),
    )
    .await
    .map_err(Error::from)?;

    let consumer = chan
        .basic_consume(
            queue.name().as_str(),
            tag,
            BasicConsumeOptions::default(),
            FieldTable::default(),
        )
        .await
        .map_err(Error::from)?;

    consumer.set_delegate(delegate);

    conn.run().map_err(Error::from)
}

也是大同小异,多了几个标识分组的名字。

测试

本章代码位于01/lapin分支。

要查看完整内容,请先登录