- 支持试读
Axum与Websocket
我们将通过几个案例由浅入深地带你掌握Websocket及其应用场景 - 支持试读
简单ECHO服务
本章我们将使用 AXUM 和 Websocket 实现一个简单的 Echo 服务。 - 支持试读
并发读写的ECHO服务
本章我们将实现并发读写的 ECHO 服务 - 支持试读
Javscript实现WebSocket客户端
本章我们将给我们的 ECHO 服务实现一个客户端。你要明确的是,很多语言都能写 WebSocket 客户端,包括 Rust 在内。但基于我们当前的环境,我们使用 JavaScript(TypeScript) 配合 React(Next.js)来实现。 用户在线检测
本章我们将实现用户在线检测功能:用户登录之后,前端通过 WebSocket 来检测 JWT Token 是否依然有效。
并发读写的ECHO服务
本章我们将实现并发读写的 ECHO 服务。我们将学到如何将 WebSocket 流进行拆分,让它实现双工;同时会学习到,如何在并发编程中使用管道进行数据通讯。
前置知识
拆分 WebSocket 流
我们可以将 WebSocket 流拆分为接受者和发送者,实现并发读(接收)写(发送)数据。
AXUM 的 WebSocket
实现了futures
的StreamExt::split()
方法,通过它可以对 WebSocket 进行拆分:
let (发送者, 接收者) = 流.split();
使用管道进行数据通讯
数据共享一直是并发编程面临的挑战,有很多解决方法,比如:锁机制、管道等。Rust 内置了一些管道,但它们是同步的,而 tokio 提供了对应的异步版本。本章我们将使用 mpsc
模式(多生产者、单消费者)的管道来实现并发编程中的数据通讯。
let (生产者, 消费者) = tokio::sync::mpsc::channel(初始容量);
为了拆分 WebSocket 流,我们需要加入 futures
依赖:
[dependencies]
futures = "0.3"
tokio = { version = "1", features = ["full"] }
axum = { version = "0.7", features = ["ws"] }
并发读写
我们在上一章实现的简单ECHO的基础上进行改造。首先,我们要实现并发读写。涉及 handle_socket
函数:
async fn handle_socket(socket: WebSocket) {
// 拆分 WebSocket 流
let (sender, mut receiver) = socket.split();
// 接收消息
tokio::spawn(async move {
while let Some(Ok(msg)) = receiver.next().await {
match msg {
Message::Close(_) => {
println!("客户端断开连接");
break;
}
Message::Text(text) => {
println!("收到客户端文本消息:{}", text);
// 将接收到的消息传递给发送句柄
}
_ => println!("收到客户端消息:{:?}", msg),
};
}
});
// 发送消息
tokio::spawn(async move {
// 从接收句柄接收消息
// 然后将消息原样发送给客户端
});
}
- 参数不再需要
mut
:因为我们不直接使用它来发送、接收消息,所以不需要mut
修饰。 - 将流拆分为发送者(
sender
)和接收者(receiver
)两个部分:let (sender, mut receiver) = socket.split();
,这样,它们就可以在不同的异步代码中并发工作。 - 使用2个
tokio::spawn
分别创建接收消息和发送消息的异步任务 - 接收消息:
- 使用
while let
从receiver.next()
中接收客户端发送过来的消息 - 我们只处理
Close()
和Text()
消息 - 问题:我们如何把这个从客户端接收过来的消息共享给发送消息的异步任务呢?
- 使用
- 发送消息:
- 问题:我们如何从接收消息的异步任务中获得从客户端接收过来的消息呢?
- 使用
while let
从receiver.next()
中接收客户端发送过来的消息 - 我们只处理
Close()
和Text()
消息 - 问题:我们如何把这个从客户端接收过来的消息共享给发送消息的异步任务呢?
- 问题:我们如何从接收消息的异步任务中获得从客户端接收过来的消息呢?
我们使用 tokio::sync::mpsc::channel
来实现异步任务之间的数据通讯。
- 为了发送消息,我们要把
sender
定义为mut
:let (mut sender, mut receiver) = socket.split();
- 通过
tokio::sync::mpsc::channel()
创建管道,它分为生产者(发送方)和消费者(接收方):let (tx, mut rx) = tokio::sync::mpsc::channel::<Message>(100);
,我们创建的管道发送的数据是Message
类型。 - 接收消息异步任务:
tx.send(Message::Text(text)).await.unwrap();
将从客户端接受到的数据发送给管道的消费者 - 发送消息异步任务:
while let Some(msg) = rx.recv().await
从管道接收数据,以便消费。sender.send(msg).await.unwrap();
将接收到的数据进行发送。由于我们调用的是WebSocket流拆分出来的sender
(发送者)进行发送,所以实际是发送给当前连接的客户端
while let Some(msg) = rx.recv().await
从管道接收数据,以便消费。sender.send(msg).await.unwrap();
将接收到的数据进行发送。由于我们调用的是WebSocket流拆分出来的sender
(发送者)进行发送,所以实际是发送给当前连接的客户端
将异步任务改写为函数
对于接收消息的异步任务而言:
receiver
:SplitStream<WebSocket>
,它是被拆分的流的部分。tx
:Sender<Message>
,用于发送数据的管道端。
/// 接收消息
async fn read(mut receiver: SplitStream<WebSocket>, tx: tokio::sync::mpsc::Sender<Message>) {
while let Some(Ok(msg)) = receiver.next().await {
match msg {
Message::Close(_) => {
println!("客户端断开连接");
break;
}
Message::Text(text) => {
println!("收到客户端文本消息:{}", text);
// 通过管道,将接收到的消息传递给发送句柄
tx.send(Message::Text(text)).await.unwrap();
}
_ => println!("收到客户端消息:{:?}", msg),
};
}
}
对于发送消息的异步任务而言:
sender
:SplitSink<WebSocket,Message>
,它是被拆分的操作部分。rx
:Receiver<Message>
,用于接收数据的管道端。
对应的,异步任务可以直接调用这两个函数:
async fn handle_socket(socket: WebSocket) {
// 拆分 WebSocket 流
let (sender, receiver) = socket.split();
// 创建管道
let (tx, rx) = tokio::sync::mpsc::channel::<Message>(100);
// 接收消息异步任务
tokio::spawn(read(receiver, tx));
// 发送消息异步任务
tokio::spawn(write(sender, rx));
}
本章代码位于echo
分支。