本章我们实现分类服务,即 category-srv

将 proto 独立为单独的 crate

为了代码一致性,我们将上一章的 proto 独立为一个 crate:

创建 crate:

cargo new --lib blog-proto
[workspace]
members = [
   "blog-proto",
]

生成代码

首先,为 blog-proto 添加依赖

[dependencies]
tokio = {version = "1", features = ["full"]}
prost = "0.11"
prost-types = "0.11"
tonic = "0.8"

[build-dependencies]
tonic-build = "0.8"

然后,创建 blog-proto/build.rs,并输入以下内容:

use std::{env, fs, path::Path};

fn main() {
    let current_dir = env::current_dir().unwrap();
    let proto_path = Path::new(&current_dir).join("proto");
    let mut proto_files = vec![];
    for entry in fs::read_dir(&proto_path).unwrap() {
        let entry = entry.unwrap();
        let md = entry.metadata().unwrap();
        if md.is_file() && entry.path().extension().unwrap() == "proto" {
            proto_files.push(entry.path().as_os_str().to_os_string())
        }
    }

    tonic_build::configure()
        .out_dir("src") // 生成代码的存放目录
        .build_client(true)
        .build_server(true)
        .compile(
            proto_files.as_slice(), // 欲生成的 proto 文件列表
            &[&proto_path],         // proto 依赖所在的根目录
        )
        .unwrap();
}
pub mod pb;

为什么不用 tonic::include_proto!

tonic支持include_proto! 宏,通过它可以不需要向 lib.rs 添加 pub mod pb;,为什么我们不用?

由于切换分支后,tonic::include_proto! 不能正确工作,编辑器会一直报错,出于这个原因,我们放弃了这个宏。

如果你没有遇到我这个问题,建议还是使用 tonic::include_proto!,因为它确实方便。

分类服务

现在,我们创建category-srvcrate,用来构建分类的 gRPC 服务。

cargo new category-srv

记得加入到 workspace 中

添加需要的依赖:

[dependencies]
tokio = {version = "1", features = ["full"]}
prost = "0.11"
prost-types = "0.11"
tonic = "0.8"
sqlx = { version = "0.6", features = [ "runtime-tokio-native-tls" , "postgres" ] }
blog-proto = {path="../blog-proto"}

回顾一下我们的 proto,为分类服务定义以下方法:

// 创建分类
  rpc CreateCategory(CreateCategoryRequest) returns (CreateCategoryReply);
  // 修改分类
  rpc EditCategory(EditCategoryRequest) returns (EditCategoryReply);
  // 分类列表
  rpc ListCategory(ListCategoryRequest) returns (ListCategoryReply);
  // 删除/恢复分类
  rpc ToggleCategory(ToggleCategoryRequest) returns (ToggleCategoryReply);
  // 分类是否存在
  rpc CategoryExists(CategoryExistsRequest) returns (CategoryExistsReply);
  // 获取分类详情
  rpc GetCategory(GetCategoryRequest) returns (GetCategoryReply);

实现 CategoryService

以下代码位于 category-srv/src/server.rs

首先,定义我们自己的结构体:

pub struct Category {
    pool: Arc<PgPool>,
}

impl Category {
    pub fn new(pool: PgPool) -> Self {
        Self {
            pool: Arc::new(pool),
        }
    }
}

在这个结构体中,我们使用了 sqlx::PgPool,用来从数据库连接池中获取连接。

接下来就是实现 CategoryService

#[tonic::async_trait]
impl CategoryService for Category {
    // ...
}

分类是否存在category_exists的实现

async fn category_exists(
        &self,
        request: tonic::Request<CategoryExistsRequest>,
    ) -> Result<tonic::Response<CategoryExistsReply>, tonic::Status> {
        let request = request.into_inner();
        let condition = request
            .condition
            .ok_or(tonic::Status::invalid_argument("参数错误"))?;
        let query = match condition {
            Condition::Name(name) => {
                sqlx::query("SELECT COUNT(*) FROM categories WHERE name=$1").bind(name)
            }
            Condition::Id(id) => {
                sqlx::query("SELECT COUNT(*) FROM categories WHERE id=$1").bind(id)
            }
        };
        let row = query
            .fetch_one(&*self.pool)
            .await
            .map_err(|err| tonic::Status::internal(err.to_string()))?;
        let count: i64 = row.get(0);
        let reply = CategoryExistsReply { exists: count > 0 };
        Ok(tonic::Response::new(reply))
    }
  • 首先,从传入的 CategoryExistsRequest 中,获取判断分类是否存在的条件。

  • 然后,根据不同的条件进行不同的查询

  • 之后,将结果填充到 CategoryExistsReply 发送给服务调用者

创建分类create_category 的实现

async fn create_category(
        &self,
        request: tonic::Request<CreateCategoryRequest>,
    ) -> Result<tonic::Response<CreateCategoryReply>, tonic::Status> {
        let CreateCategoryRequest { name } = request.into_inner();
        let exists_request = tonic::Request::new(CategoryExistsRequest {
            condition: Some(Condition::Name(name.clone().into())),
        });
        let exists_reply = self.category_exists(exists_request).await?.into_inner();
        if exists_reply.exists {
            return Err(tonic::Status::already_exists("分类已存在"));
        }
        let res = sqlx::query("INSERT INTO categories (name) VALUES ($1) RETURNING id")
            .bind(name)
            .fetch_one(&*self.pool)
            .await
            .map_err(|err| tonic::Status::internal(err.to_string()))?;
        let reply = CreateCategoryReply { id: res.get("id") };
        Ok(tonic::Response::new(reply))
    }
  • 首先获取传入的参数
  • 调用 category_exists 服务判断分类名称是否存在。这里,你当然也可以用 SQL 来判断
  • 将数据入库,并将结果返回给调用者

修改分类 edit_category 的实现

async fn edit_category(
        &self,
        request: tonic::Request<EditCategoryRequest>,
    ) -> Result<tonic::Response<EditCategoryReply>, tonic::Status> {
        let EditCategoryRequest { id, name } = request.into_inner();
        let row = sqlx::query("SELECT COUNT(*) FROM categories WHERE name=$1 AND id<>$2")
            .bind(&name)
            .bind(id)
            .fetch_one(&*self.pool)
            .await
            .map_err(|err| tonic::Status::internal(err.to_string()))?;
        let count: i64 = row.get(0);
        if count > 0i64 {
            return Err(tonic::Status::already_exists("分类已存在"));
        }
        let rows_affected = sqlx::query("UPDATE categories SET name=$1 WHERE id=$2")
            .bind(&name)
            .bind(id)
            .execute(&*self.pool)
            .await
            .map_err(|err| tonic::Status::internal(err.to_string()))?
            .rows_affected();
        let reply = EditCategoryReply {
            id,
            ok: rows_affected > 0,
        };
        Ok(tonic::Response::new(reply))
    }
  • 获取传入的参数
  • 判断同名的分类是否存在。这里之所以没有调用 category_exists,是因为修改时判断分类是否存在的逻辑有所不同,所以直接用 SQL来实现。
  • 将数据入库并返回结果

获取分类 get_category的实现

async fn get_category(
        &self,
        request: tonic::Request<GetCategoryRequest>,
    ) -> Result<tonic::Response<GetCategoryReply>, tonic::Status> {
        let GetCategoryRequest { id, is_del } = request.into_inner();
        let query = match is_del {
            Some(is_del) => {
                sqlx::query("SELECT id,name,is_del FROM categories WHERE id=$1 AND is_del=$2")
                    .bind(id)
                    .bind(is_del)
            }
            None => sqlx::query("SELECT id,name,is_del FROM categories WHERE id=$1").bind(id),
        };
        let row = query
            .fetch_optional(&*self.pool)
            .await
            .map_err(|err| tonic::Status::internal(err.to_string()))?;
        let reply = match row {
            Some(row) => GetCategoryReply {
                category: Some(blog_proto::Category {
                    id: row.get("id"),
                    name: row.get("name"),
                    is_del: row.get("is_del"),
                }),
            },
            None => GetCategoryReply { category: None },
        };
        Ok(tonic::Response::new(reply))
    }
  • 首先从传入的参数得到请求的条件
  • 根据条件进行不同的查询
  • 将查询的结果转换为消息
  • 将结果发送给调用者

分类列表 list_category 的实现

async fn list_category(
        &self,
        request: tonic::Request<ListCategoryRequest>,
    ) -> Result<tonic::Response<ListCategoryReply>, tonic::Status> {
        let ListCategoryRequest { name, is_del } = request.into_inner();
        let query = match name {
            Some(name) => {
                let name = format!("%{}%", name);
             match is_del { 
                Some(is_del) => {
                    sqlx::query("SELECT id,name,is_del FROM categories WHERE name ILIKE $1 AND is_del=$2 ORDER BY id")
                    .bind(name.clone())
                        .bind(is_del)
                }
                None =>  sqlx::query("SELECT id,name,is_del FROM categories WHERE name ILIKE $1  ORDER BY id")
                    .bind(name),
           
             } },
            None => match is_del {
                Some(is_del) => {
                    sqlx::query("SELECT id,name,is_del FROM categories WHERE is_del=$1 ORDER BY id")
                        .bind(is_del)
                }
                None => sqlx::query("SELECT id,name,is_del FROM categories ORDER BY id"),
            },
        };
        let rows = query.fetch_all(&*self.pool).await.map_err(|err|tonic::Status::internal(err.to_string()))?;
        if rows.is_empty() {
            return Err(tonic::Status::not_found("没有符合条件的分类"));
        }
        let mut categories = Vec::with_capacity(rows.len());
        for row in rows {
            categories.push(blog_proto::Category {
                id: row.get("id"),
                name: row.get("name"),
                is_del: row.get("is_del"),
            });
        }
        let reply = ListCategoryReply { categories };
        Ok(tonic::Response::new(reply))
    }
  • 通过不同的条件进行查询
  • 将结果进行返回

删除/恢复分类toggle_category的实现

    async fn toggle_category(
        &self,
        request: tonic::Request<ToggleCategoryRequest>,
    ) -> Result<tonic::Response<ToggleCategoryReply>, tonic::Status> {
        let ToggleCategoryRequest { id } = request.into_inner();
        let row =
            sqlx::query("UPDATE categories SET is_del=(NOT is_del) WHERE id=$1 RETURNING is_del")
                .bind(id)
                .fetch_optional(&*self.pool)
                .await
                .map_err(|err| tonic::Status::internal(err.to_string()))?;
        if let Some(row) = row {
            return Ok(tonic::Response::new(ToggleCategoryReply {
                id,
                is_del: row.get(0),
            }));
        }
        Err(tonic::Status::not_found("不存在的分类"))
    }

代码很简单,将指定id的分类的is_del取反。

启动分类服务

category-srv/src/main.rs

#[tokio::main]
async fn main() {
    let addr = "127.0.0.1:19527";
    println!("category-srv run at: {}", addr);

    let dsn = env::var("PG_DSN").unwrap_or("postgres://[email protected]/axum_rs".to_string());
    let pool = PgPool::connect(&dsn).await.unwrap();
    let category_srv = server::Category::new(pool);
    tonic::transport::Server::builder()
        .add_service(CategoryServiceServer::new(category_srv))
        .serve(addr.parse().unwrap())
        .await
        .unwrap();
}

这里需要一个环境变量PG_DSN ,用于指定数据库连接字符串。

方式一:

export PG_DSN='postgres://[email protected]/axum_rs'
cargo run --bin category-srv

方式二:

PG_DSN='postgres://[email protected]/axum_rs' cargo run --bin category-srv

测试分类服务

创建 category-srv/tests 目录,在其中添加 category_srv_test.rs 文件。

#[tokio::test]
async fn test_create_category() {
    let mut client = CategoryServiceClient::connect("http://127.0.0.1:19527")
        .await
        .unwrap();
    let request = tonic::Request::new(CreateCategoryRequest {
        name: "分类1".into(),
    });
    let reply = client.create_category(request).await.unwrap();
    let reply = reply.into_inner();
    assert!(reply.id > 0);
}

其它方法的测试以此类推,请参考git上的源代码

运行测试:

cargo test test_create_category -- --nocapture
  • #[tokio::test]:让 cargo test 支持 async fn
  • test_create_category :告诉 cargo test,只测试指定的 test_create_category 函数
  • --nocapture:在测试中能使用 println!输出

本章代码位于02/实现分类服务分支。