前置知识
拆分 WebSocket 流
我们可以将 WebSocket 流拆分为接受者和发送者,实现并发读(接收)写(发送)数据。
AXUM 的 WebSocket
实现了futures
的StreamExt::split()
方法,通过它可以对 WebSocket 进行拆分:
let (发送者, 接收者) = 流.split();
使用管道进行数据通讯
数据共享一直是并发编程面临的挑战,有很多解决方法,比如:锁机制、管道等。Rust 内置了一些管道,但它们是同步的,而 tokio 提供了对应的异步版本。本章我们将使用 mpsc
模式(多生产者、单消费者)的管道来实现并发编程中的数据通讯。
let (生产者, 消费者) = tokio::sync::mpsc::channel(初始容量);
依赖
为了拆分 WebSocket 流,我们需要加入 futures
依赖:
[dependencies]
futures = "0.3"
tokio = { version = "1", features = ["full"] }
axum = { version = "0.7", features = ["ws"] }
并发读写
我们在上一章实现的简单ECHO的基础上进行改造。首先,我们要实现并发读写。涉及 handle_socket
函数:
async fn handle_socket(socket: WebSocket) {
// 拆分 WebSocket 流
let (sender, mut receiver) = socket.split();
// 接收消息
tokio::spawn(async move {
while let Some(Ok(msg)) = receiver.next().await {
match msg {
Message::Close(_) => {
println!("客户端断开连接");
break;
}
Message::Text(text) => {
println!("收到客户端文本消息:{}", text);
// 将接收到的消息传递给发送句柄
}
_ => println!("收到客户端消息:{:?}", msg),
};
}
});
// 发送消息
tokio::spawn(async move {
// 从接收句柄接收消息
// 然后将消息原样发送给客户端
});
}
- 参数不再需要
mut
:因为我们不直接使用它来发送、接收消息,所以不需要mut
修饰。 - 将流拆分为发送者(
sender
)和接收者(receiver
)两个部分:let (sender, mut receiver) = socket.split();
,这样,它们就可以在不同的异步代码中并发工作。 - 使用2个
tokio::spawn
分别创建接收消息和发送消息的异步任务 - 接收消息:
- 使用
while let
从receiver.next()
中接收客户端发送过来的消息 - 我们只处理
Close()
和Text()
消息 - 问题:我们如何把这个从客户端接收过来的消息共享给发送消息的异步任务呢?
- 使用
- 发送消息:
- 问题:我们如何从接收消息的异步任务中获得从客户端接收过来的消息呢?
数据通信
我们使用 tokio::sync::mpsc::channel
来实现异步任务之间的数据通讯。
async fn handle_socket(socket: WebSocket) {
// 拆分 WebSocket 流
let (mut sender, mut receiver) = socket.split();
// 创建管道
let (tx, mut rx) = tokio::sync::mpsc::channel::<Message>(100);
// 接收消息
tokio::spawn(async move {
while let Some(Ok(msg)) = receiver.next().await {
match msg {
Message::Close(_) => {
println!("客户端断开连接");
break;
}
Message::Text(text) => {
println!("收到客户端文本消息:{}", text);
// 通过管道,将接收到的消息传递给发送句柄
tx.send(Message::Text(text)).await.unwrap();
}
_ => println!("收到客户端消息:{:?}", msg),
};
}
});
// 发送消息
tokio::spawn(async move {
// 通过管道,从接收句柄接收消息
while let Some(msg) = rx.recv().await {
// 然后将消息原样发送给客户端
sender.send(msg).await.unwrap();
}
});
}
- 为了发送消息,我们要把
sender
定义为mut
:let (mut sender, mut receiver) = socket.split();
- 通过
tokio::sync::mpsc::channel()
创建管道,它分为生产者(发送方)和消费者(接收方):let (tx, mut rx) = tokio::sync::mpsc::channel::<Message>(100);
,我们创建的管道发送的数据是Message
类型。 - 接收消息异步任务:
tx.send(Message::Text(text)).await.unwrap();
将从客户端接受到的数据发送给管道的消费者 - 发送消息异步任务:
while let Some(msg) = rx.recv().await
从管道接收数据,以便消费。sender.send(msg).await.unwrap();
将接收到的数据进行发送。由于我们调用的是WebSocket流拆分出来的sender
(发送者)进行发送,所以实际是发送给当前连接的客户端
将异步任务改写为函数
我们可以将以上两个异步任务改写为函数实现,以便更好的组织和维护代码。开始之前,我们需要知道各个参数的数据类型。
对于接收消息的异步任务而言:
receiver
:SplitStream<WebSocket>
,它是被拆分的流的部分。tx
:Sender<Message>
,用于发送数据的管道端。
/// 接收消息
async fn read(mut receiver: SplitStream<WebSocket>, tx: tokio::sync::mpsc::Sender<Message>) {
while let Some(Ok(msg)) = receiver.next().await {
match msg {
Message::Close(_) => {
println!("客户端断开连接");
break;
}
Message::Text(text) => {
println!("收到客户端文本消息:{}", text);
// 通过管道,将接收到的消息传递给发送句柄
tx.send(Message::Text(text)).await.unwrap();
}
_ => println!("收到客户端消息:{:?}", msg),
};
}
}
sender
:SplitSink<WebSocket,Message>
,它是被拆分的操作部分。rx
:Receiver<Message>
,用于接收数据的管道端。
/// 发送消息
async fn write(
mut sender: SplitSink<WebSocket, Message>,
mut rx: tokio::sync::mpsc::Receiver<Message>,
) {
// 通过管道,从接收句柄接收消息
while let Some(msg) = rx.recv().await {
// 然后将消息原样发送给客户端
sender.send(msg).await.unwrap();
}
}
async fn handle_socket(socket: WebSocket) {
// 拆分 WebSocket 流
let (sender, receiver) = socket.split();
// 创建管道
let (tx, rx) = tokio::sync::mpsc::channel::<Message>(100);
// 接收消息异步任务
tokio::spawn(read(receiver, tx));
// 发送消息异步任务
tokio::spawn(write(sender, rx));
}
本章代码位于echo
分支。