域名 AXUM.RS 将于2025年10月到期。我们无意再对其进行续费,我们希望你能够接续这个域名,让更多 AXUM 开发者继续受益。
  • 方案1️⃣AXUM.RS 域名 = 3000
  • 方案2️⃣方案1️⃣ + 本站所有专题原始 Markdown 文档 = 5000
  • 方案3️⃣方案2️⃣ + 本站原始数据库 = 5500
如果你有意接续这份 AXUM 情怀,请与我们取得联系。
说明:
  1. 如果有人购买 AXUM.RS 域名(方案1️⃣),或者该域名到期,本站将启用新的免费域名继续提供服务。
  2. 如果有人购买了 AXUM.RS 域名,且同时购买了内容和/或数据库(方案2️⃣/方案3️⃣),本站将关闭。届时我们或许会以另一种方式与你再相遇。

并发读写的ECHO服务

本章我们将实现并发读写的 ECHO 服务。我们将学到如何将 WebSocket 流进行拆分,让它实现双工;同时会学习到,如何在并发编程中使用管道进行数据通讯。

前置知识

拆分 WebSocket 流

我们可以将 WebSocket 流拆分为接受者和发送者,实现并发读(接收)写(发送)数据。

AXUM 的 WebSocket实现了futuresStreamExt::split()方法,通过它可以对 WebSocket 进行拆分:

let (发送者, 接收者) = 流.split();

使用管道进行数据通讯

数据共享一直是并发编程面临的挑战,有很多解决方法,比如:锁机制、管道等。Rust 内置了一些管道,但它们是同步的,而 tokio 提供了对应的异步版本。本章我们将使用 mpsc 模式(多生产者、单消费者)的管道来实现并发编程中的数据通讯。

let (生产者, 消费者) = tokio::sync::mpsc::channel(初始容量);

为了拆分 WebSocket 流,我们需要加入 futures 依赖:

[dependencies]
futures = "0.3"
tokio = { version = "1", features = ["full"] }
axum = { version = "0.7", features = ["ws"] }

并发读写

我们在上一章实现的简单ECHO的基础上进行改造。首先,我们要实现并发读写。涉及 handle_socket 函数:

async fn handle_socket(socket: WebSocket) {
    // 拆分 WebSocket 流
    let (sender, mut receiver) = socket.split();

    // 接收消息
    tokio::spawn(async move {
        while let Some(Ok(msg)) = receiver.next().await {
            match msg {
                Message::Close(_) => {
                    println!("客户端断开连接");
                    break;
                }
                Message::Text(text) => {
                    println!("收到客户端文本消息:{}", text);
                    // 将接收到的消息传递给发送句柄
                }
                _ => println!("收到客户端消息:{:?}", msg),
            };
        }
    });

    // 发送消息
    tokio::spawn(async move {
        // 从接收句柄接收消息
        // 然后将消息原样发送给客户端
    });
}
  • 参数不再需要 mut:因为我们不直接使用它来发送、接收消息,所以不需要 mut 修饰。
  • 将流拆分为发送者(sender)和接收者(receiver)两个部分:let (sender, mut receiver) = socket.split(); ,这样,它们就可以在不同的异步代码中并发工作。
  • 使用2个 tokio::spawn 分别创建接收消息和发送消息的异步任务
  • 接收消息:
    • 使用 while letreceiver.next() 中接收客户端发送过来的消息
    • 我们只处理 Close()Text() 消息
    • 问题:我们如何把这个从客户端接收过来的消息共享给发送消息的异步任务呢?
  • 发送消息:
    • 问题:我们如何从接收消息的异步任务中获得从客户端接收过来的消息呢?
  • 使用 while letreceiver.next() 中接收客户端发送过来的消息
  • 我们只处理 Close()Text() 消息
  • 问题:我们如何把这个从客户端接收过来的消息共享给发送消息的异步任务呢?
  • 问题:我们如何从接收消息的异步任务中获得从客户端接收过来的消息呢?

我们使用 tokio::sync::mpsc::channel来实现异步任务之间的数据通讯。

  • 为了发送消息,我们要把 sender 定义为 mutlet (mut sender, mut receiver) = socket.split();
  • 通过 tokio::sync::mpsc::channel() 创建管道,它分为生产者(发送方)和消费者(接收方):let (tx, mut rx) = tokio::sync::mpsc::channel::<Message>(100);,我们创建的管道发送的数据是 Message 类型。
  • 接收消息异步任务: tx.send(Message::Text(text)).await.unwrap();将从客户端接受到的数据发送给管道的消费者
  • 发送消息异步任务:
    • while let Some(msg) = rx.recv().await 从管道接收数据,以便消费。
    • sender.send(msg).await.unwrap();将接收到的数据进行发送。由于我们调用的是WebSocket流拆分出来的 sender(发送者)进行发送,所以实际是发送给当前连接的客户端
  • while let Some(msg) = rx.recv().await 从管道接收数据,以便消费。
  • sender.send(msg).await.unwrap();将接收到的数据进行发送。由于我们调用的是WebSocket流拆分出来的 sender(发送者)进行发送,所以实际是发送给当前连接的客户端

将异步任务改写为函数

对于接收消息的异步任务而言:

/// 接收消息
async fn read(mut receiver: SplitStream<WebSocket>, tx: tokio::sync::mpsc::Sender<Message>) {
    while let Some(Ok(msg)) = receiver.next().await {
        match msg {
            Message::Close(_) => {
                println!("客户端断开连接");
                break;
            }
            Message::Text(text) => {
                println!("收到客户端文本消息:{}", text);
                // 通过管道,将接收到的消息传递给发送句柄
                tx.send(Message::Text(text)).await.unwrap();
            }
            _ => println!("收到客户端消息:{:?}", msg),
        };
    }
}

对于发送消息的异步任务而言:

对应的,异步任务可以直接调用这两个函数:

async fn handle_socket(socket: WebSocket) {
    // 拆分 WebSocket 流
    let (sender, receiver) = socket.split();

    // 创建管道
    let (tx, rx) = tokio::sync::mpsc::channel::<Message>(100);

    // 接收消息异步任务
    tokio::spawn(read(receiver, tx));

    // 发送消息异步任务
    tokio::spawn(write(sender, rx));
}

本章代码位于echo分支。

要查看完整内容,请先登录