简介
本专题将带你使用 axum 和 gRPC 构建一个分布式的博客系统数据结构与Protobuf
本章对我们项目的数据结构和proto进行定义实现分类服务
本章我们实现分类服务,即 `category-srv`实现文章服务
本章将带你实现文章的 gPRC 服务。实现前台web服务
本章将通过使用 axum 调用分类和文章的 gRPC 服务,来实现博客前台Web服务实现管理员服务
本章我们将实现管理员服务实现后台管理web服务
本章将使用 axum 调用 gRPC 服务来实现后台管理的 web 服务安全与鉴权
本章将讨论使用jwt进行鉴权服务扩容、注册、发现和编排
本章将讨论服务管理相关的话题配置中心服务
本章讨论配置中心的实现总结
本专题试图通过一个分布式博客的案例来探讨使用 rust 实现 gRPC 微服务架构的可行性
实现分类服务
- 71787
- 2022-09-23 22:29:37
本章我们实现分类服务,即 category-srv
。
将 proto 独立为单独的 crate
为了代码一致性,我们将上一章的 proto 独立为一个 crate:
创建 crate:
cargo new --lib blog-proto
在项目根目录的 Cargo.toml
加上这个 crate:
[workspace]
members = [
"blog-proto",
]
将上一章的 proto 目录移入 blog-proto
生成代码
首先,为 blog-proto
添加依赖
[dependencies]
tokio = {version = "1", features = ["full"]}
prost = "0.11"
prost-types = "0.11"
tonic = "0.8"
[build-dependencies]
tonic-build = "0.8"
然后,创建 blog-proto/build.rs
,并输入以下内容:
use std::{env, fs, path::Path};
fn main() {
let current_dir = env::current_dir().unwrap();
let proto_path = Path::new(¤t_dir).join("proto");
let mut proto_files = vec![];
for entry in fs::read_dir(&proto_path).unwrap() {
let entry = entry.unwrap();
let md = entry.metadata().unwrap();
if md.is_file() && entry.path().extension().unwrap() == "proto" {
proto_files.push(entry.path().as_os_str().to_os_string())
}
}
tonic_build::configure()
.out_dir("src") // 生成代码的存放目录
.build_client(true)
.build_server(true)
.compile(
proto_files.as_slice(), // 欲生成的 proto 文件列表
&[&proto_path], // proto 依赖所在的根目录
)
.unwrap();
}
pub mod pb;
为什么不用 tonic::include_proto!
tonic支持include_proto!
宏,通过它可以不需要向 lib.rs
添加 pub mod pb;
,为什么我们不用?
由于切换分支后,tonic::include_proto!
不能正确工作,编辑器会一直报错,出于这个原因,我们放弃了这个宏。
如果你没有遇到我这个问题,建议还是使用 tonic::include_proto!
,因为它确实方便。
分类服务
现在,我们创建category-srv
crate,用来构建分类的 gRPC 服务。
cargo new category-srv
记得加入到 workspace 中
记得加入到 workspace 中
添加需要的依赖:
[dependencies]
tokio = {version = "1", features = ["full"]}
prost = "0.11"
prost-types = "0.11"
tonic = "0.8"
sqlx = { version = "0.6", features = [ "runtime-tokio-native-tls" , "postgres" ] }
blog-proto = {path="../blog-proto"}
回顾一下我们的 proto,为分类服务定义以下方法:
// 创建分类
rpc CreateCategory(CreateCategoryRequest) returns (CreateCategoryReply);
// 修改分类
rpc EditCategory(EditCategoryRequest) returns (EditCategoryReply);
// 分类列表
rpc ListCategory(ListCategoryRequest) returns (ListCategoryReply);
// 删除/恢复分类
rpc ToggleCategory(ToggleCategoryRequest) returns (ToggleCategoryReply);
// 分类是否存在
rpc CategoryExists(CategoryExistsRequest) returns (CategoryExistsReply);
// 获取分类详情
rpc GetCategory(GetCategoryRequest) returns (GetCategoryReply);
实现 CategoryService
以下代码位于 category-srv/src/server.rs
首先,定义我们自己的结构体:
pub struct Category {
pool: Arc<PgPool>,
}
impl Category {
pub fn new(pool: PgPool) -> Self {
Self {
pool: Arc::new(pool),
}
}
}
在这个结构体中,我们使用了 sqlx::PgPool
,用来从数据库连接池中获取连接。
接下来就是实现 CategoryService
:
#[tonic::async_trait]
impl CategoryService for Category {
// ...
}
分类是否存在category_exists
的实现
async fn category_exists(
&self,
request: tonic::Request<CategoryExistsRequest>,
) -> Result<tonic::Response<CategoryExistsReply>, tonic::Status> {
let request = request.into_inner();
let condition = request
.condition
.ok_or(tonic::Status::invalid_argument("参数错误"))?;
let query = match condition {
Condition::Name(name) => {
sqlx::query("SELECT COUNT(*) FROM categories WHERE name=$1").bind(name)
}
Condition::Id(id) => {
sqlx::query("SELECT COUNT(*) FROM categories WHERE id=$1").bind(id)
}
};
let row = query
.fetch_one(&*self.pool)
.await
.map_err(|err| tonic::Status::internal(err.to_string()))?;
let count: i64 = row.get(0);
let reply = CategoryExistsReply { exists: count > 0 };
Ok(tonic::Response::new(reply))
}
-
首先,从传入的
CategoryExistsRequest
中,获取判断分类是否存在的条件。 -
然后,根据不同的条件进行不同的查询
-
之后,将结果填充到
CategoryExistsReply
发送给服务调用者
然后,根据不同的条件进行不同的查询
之后,将结果填充到 CategoryExistsReply
发送给服务调用者
创建分类create_category
的实现
async fn create_category(
&self,
request: tonic::Request<CreateCategoryRequest>,
) -> Result<tonic::Response<CreateCategoryReply>, tonic::Status> {
let CreateCategoryRequest { name } = request.into_inner();
let exists_request = tonic::Request::new(CategoryExistsRequest {
condition: Some(Condition::Name(name.clone().into())),
});
let exists_reply = self.category_exists(exists_request).await?.into_inner();
if exists_reply.exists {
return Err(tonic::Status::already_exists("分类已存在"));
}
let res = sqlx::query("INSERT INTO categories (name) VALUES ($1) RETURNING id")
.bind(name)
.fetch_one(&*self.pool)
.await
.map_err(|err| tonic::Status::internal(err.to_string()))?;
let reply = CreateCategoryReply { id: res.get("id") };
Ok(tonic::Response::new(reply))
}
- 首先获取传入的参数
- 调用
category_exists
服务判断分类名称是否存在。这里,你当然也可以用 SQL 来判断 - 将数据入库,并将结果返回给调用者
修改分类 edit_category
的实现
async fn edit_category(
&self,
request: tonic::Request<EditCategoryRequest>,
) -> Result<tonic::Response<EditCategoryReply>, tonic::Status> {
let EditCategoryRequest { id, name } = request.into_inner();
let row = sqlx::query("SELECT COUNT(*) FROM categories WHERE name=$1 AND id<>$2")
.bind(&name)
.bind(id)
.fetch_one(&*self.pool)
.await
.map_err(|err| tonic::Status::internal(err.to_string()))?;
let count: i64 = row.get(0);
if count > 0i64 {
return Err(tonic::Status::already_exists("分类已存在"));
}
let rows_affected = sqlx::query("UPDATE categories SET name=$1 WHERE id=$2")
.bind(&name)
.bind(id)
.execute(&*self.pool)
.await
.map_err(|err| tonic::Status::internal(err.to_string()))?
.rows_affected();
let reply = EditCategoryReply {
id,
ok: rows_affected > 0,
};
Ok(tonic::Response::new(reply))
}
- 获取传入的参数
- 判断同名的分类是否存在。这里之所以没有调用
category_exists
,是因为修改时判断分类是否存在的逻辑有所不同,所以直接用 SQL来实现。 - 将数据入库并返回结果
获取分类 get_category
的实现
- 首先从传入的参数得到请求的条件
- 根据条件进行不同的查询
- 将查询的结果转换为消息
- 将结果发送给调用者
分类列表 list_category
的实现
async fn list_category(
&self,
request: tonic::Request<ListCategoryRequest>,
) -> Result<tonic::Response<ListCategoryReply>, tonic::Status> {
let ListCategoryRequest { name, is_del } = request.into_inner();
let query = match name {
Some(name) => {
let name = format!("%{}%", name);
match is_del {
Some(is_del) => {
sqlx::query("SELECT id,name,is_del FROM categories WHERE name ILIKE $1 AND is_del=$2 ORDER BY id")
.bind(name.clone())
.bind(is_del)
}
None => sqlx::query("SELECT id,name,is_del FROM categories WHERE name ILIKE $1 ORDER BY id")
.bind(name),
} },
None => match is_del {
Some(is_del) => {
sqlx::query("SELECT id,name,is_del FROM categories WHERE is_del=$1 ORDER BY id")
.bind(is_del)
}
None => sqlx::query("SELECT id,name,is_del FROM categories ORDER BY id"),
},
};
let rows = query.fetch_all(&*self.pool).await.map_err(|err|tonic::Status::internal(err.to_string()))?;
if rows.is_empty() {
return Err(tonic::Status::not_found("没有符合条件的分类"));
}
let mut categories = Vec::with_capacity(rows.len());
for row in rows {
categories.push(blog_proto::Category {
id: row.get("id"),
name: row.get("name"),
is_del: row.get("is_del"),
});
}
let reply = ListCategoryReply { categories };
Ok(tonic::Response::new(reply))
}
- 通过不同的条件进行查询
- 将结果进行返回
删除/恢复分类toggle_category
的实现
async fn toggle_category(
&self,
request: tonic::Request<ToggleCategoryRequest>,
) -> Result<tonic::Response<ToggleCategoryReply>, tonic::Status> {
let ToggleCategoryRequest { id } = request.into_inner();
let row =
sqlx::query("UPDATE categories SET is_del=(NOT is_del) WHERE id=$1 RETURNING is_del")
.bind(id)
.fetch_optional(&*self.pool)
.await
.map_err(|err| tonic::Status::internal(err.to_string()))?;
if let Some(row) = row {
return Ok(tonic::Response::new(ToggleCategoryReply {
id,
is_del: row.get(0),
}));
}
Err(tonic::Status::not_found("不存在的分类"))
}
代码很简单,将指定id的分类的is_del
取反。
启动分类服务
category-srv/src/main.rs
category-srv/src/main.rs
#[tokio::main]
async fn main() {
let addr = "127.0.0.1:19527";
println!("category-srv run at: {}", addr);
let dsn = env::var("PG_DSN").unwrap_or("postgres://[email protected]/axum_rs".to_string());
let pool = PgPool::connect(&dsn).await.unwrap();
let category_srv = server::Category::new(pool);
tonic::transport::Server::builder()
.add_service(CategoryServiceServer::new(category_srv))
.serve(addr.parse().unwrap())
.await
.unwrap();
}
这里需要一个环境变量PG_DSN
,用于指定数据库连接字符串。
方式一:
export PG_DSN='postgres://[email protected]/axum_rs'
cargo run --bin category-srv
方式二:
PG_DSN='postgres://[email protected]/axum_rs' cargo run --bin category-srv
测试分类服务
创建 category-srv/tests
目录,在其中添加 category_srv_test.rs
文件。
#[tokio::test]
async fn test_create_category() {
let mut client = CategoryServiceClient::connect("http://127.0.0.1:19527")
.await
.unwrap();
let request = tonic::Request::new(CreateCategoryRequest {
name: "分类1".into(),
});
let reply = client.create_category(request).await.unwrap();
let reply = reply.into_inner();
assert!(reply.id > 0);
}
其它方法的测试以此类推,请参考git上的源代码
其它方法的测试以此类推,请参考git上的源代码
运行测试:
cargo test test_create_category -- --nocapture
#[tokio::test]
:让 cargo test 支持async fn
test_create_category
:告诉 cargo test,只测试指定的test_create_category
函数--nocapture
:在测试中能使用println!
输出