本章将带你实现文章的 gPRC 服务。

首先,创建项目并将到 workspace中:

cargo new topic-srv

实现 TopicService

以下代码位于 topic-srv/src/server.rs

定义自己的结构体

pub struct Topic {
    pool: Arc<PgPool>,
}

impl Topic {
    pub fn new(pool: PgPool) -> Self {
        Self {
            pool: Arc::new(pool),
        }
    }
}

#[tonic::async_trait]
impl TopicService for Topic {
  // ...
}

创建文章 create_topic 的实现

async fn create_topic(
        &self,
        request: tonic::Request<CreateTopicRequest>,
    ) -> Result<tonic::Response<CreateTopicReply>, tonic::Status> {
        let CreateTopicRequest {
            title,
            category_id,
            content,
            summary,
        } = request.into_inner();

        let summary = match summary {
            Some(summary) => summary,
            None => get_summary(&content),
        };
        let row = sqlx::query("INSERT INTO topics (title,category_id,content,summary) VALUES($1,$2,$3,$4) RETURNING id")
        .bind(title)
        .bind(category_id)
        .bind(content)
        .bind(summary)
        .fetch_one(&*self.pool)
        .await.map_err(|err|tonic::Status::internal(err.to_string()))?;
        let reply = CreateTopicReply { id: row.get("id") };
        Ok(tonic::Response::new(reply))
    }
  1. 获取传入的参数
  2. 判断是否传入了摘要summary,如果没有传入,则从内容正文中截取
  3. 将数据入库
  4. 将结果返回

截取内容的函数:

fn get_summary(content: &str) -> String {
    if content.len() <= 255 {
        return String::from(content);
    }
    content.chars().into_iter().take(255).collect()
}

修改文章 edit_topic 的实现

async fn edit_topic(
  &self,
  request: tonic::Request<EditTopicRequest>,
) -> Result<tonic::Response<EditTopicReply>, tonic::Status> {
  let r = request.into_inner();
  let summary = match r.summary {
    Some(s) => s,
    None => get_summary(&r.content),
  };
  let rows_affected = sqlx::query(
    "UPDATE topics SET title=$1,content=$2,summary=$3,category_id=$4 WHERE id=$5",
  )
  .bind(r.title)
  .bind(r.content)
  .bind(summary)
  .bind(r.category_id)
  .bind(r.id)
  .execute(&*self.pool)
  .await
  .map_err(|err| tonic::Status::internal(err.to_string()))?
  .rows_affected();
  Ok(tonic::Response::new(EditTopicReply {
    id: r.id,
    ok: rows_affected > 0,
  }))
}

删除/恢复文章 toggle_topic 的实现

async fn toggle_topic(
        &self,
        request: tonic::Request<ToggleTopicRequest>,
    ) -> Result<tonic::Response<ToggleTopicReply>, tonic::Status> {
        let ToggleTopicRequest { id } = request.into_inner();
        let row = sqlx::query("UPDATE topics SET is_del=(NOT is_del) WHERE id=$1 RETURNING is_del")
            .bind(id)
            .fetch_optional(&*self.pool)
            .await
            .map_err(|err| tonic::Status::internal(err.to_string()))?;
        if row.is_none() {
            return Err(tonic::Status::not_found("不存在的文章"));
        }
        Ok(tonic::Response::new(ToggleTopicReply {
            id,
            is_del: row.unwrap().get("is_del"),
        }))
    }

获取文章 get_topic 的实现

async fn get_topic(
        &self,
        request: tonic::Request<GetTopicRequest>,
    ) -> Result<tonic::Response<GetTopicReply>, tonic::Status> {
        let GetTopicRequest {
            id,
            is_del,
            inc_hit,
        } = request.into_inner();

        let inc_hit = inc_hit.unwrap_or(false); // 增加点击量
        if inc_hit {
            sqlx::query("UPDATE topics SET hit=hit+1 WHERE id=$1")
                .bind(id)
                .execute(&*self.pool)
                .await
                .map_err(|err| tonic::Status::internal(err.to_string()))?;
        }

        let query = match is_del {
            Some(is_del) => sqlx::query("SELECT id,title,content,summary,is_del,category_id,dateline,hit FROM topics WHERE id=$1 AND is_del=$2")
            .bind(id).bind(is_del),
            None => sqlx::query("SELECT id,title,content,summary,is_del,category_id,dateline,hit FROM topics WHERE id=$1")
            .bind(id),
        };
        let row = query
            .fetch_optional(&*self.pool)
            .await
            .map_err(|err| tonic::Status::internal(err.to_string()))?;
        if row.is_none() {
            return Err(tonic::Status::not_found("不存在的文章"));
        }
        let row = row.unwrap();
        let dt: DateTime<Local> = row.get("dateline");
        let dateline = dt_conver(&dt);

        Ok(tonic::Response::new(GetTopicReply {
            topic: Some(blog_proto::Topic {
                id: row.get("id"),
                title: row.get("title"),
                category_id: row.get("category_id"),
                content: row.get("content"),
                summary: row.get("summary"),
                hit: row.get("hit"),
                is_del: row.get("is_del"),
                dateline,
            }),
        }))
    }
  • 从传入的参数中判断是否需要同时对点击量进行递增,如果需要,则执行对应的 SQL
  • 从数据库中获取对应的记录
  • 将结果返回

时间的处理

  • 在 proto 中,使用 google 定义的 timestamp
  • 在将 proto 生成 rust 代码时,使用了 prost_types::Timestamp
  • 在 PostgreSQL 中,使用的是 TIMESTAMP WITH TIME ZONE,通常会简写成 TIMESTAMPTZ

如果将这些不同的定义进行统一处理?

prost_types::Timestamp 本身就是对 proto 中 google 定义的 timestamp 的映射,所以它自然提供了转换功能,它们本质其实就是 i64。而且 PostgreSQL 中的 TIMESTAMPTZ 更为丰富,从它展示的结果来看,更像是 DateTime,所以我们使用的方法是:

chrono::DateTime转为prost_types::Timestampdt_conver函数:

fn dt_conver(dt: &DateTime<Local>) -> Option<prost_types::Timestamp> {
    if let Ok(dt) = prost_types::Timestamp::date_time(
        dt.year().into(),
        dt.month() as u8,
        dt.day() as u8,
        dt.hour() as u8,
        dt.minute() as u8,
        dt.second() as u8,
    ) {
        Some(dt)
    } else {
        None
    }
}

文章列表 list_topic 的实现

async fn list_topic(
        &self,
        request: tonic::Request<ListTopicRequest>,
    ) -> Result<tonic::Response<ListTopicReply>, tonic::Status> {
        let ListTopicRequest {
            page,
            category_id,
            keyword,
            is_del,
            dateline_range,
        } = request.into_inner();
        let page = page.unwrap_or(0);
        let page_size = 30;
        let offset = page * page_size;
        let mut start = None;
        let mut end = None;
        if let Some(dr) = dateline_range {
            start = tm_cover(dr.start);
            end = tm_cover(dr.end);
        }

        let row = sqlx::query(
            r#"
            SELECT 
                COUNT(*)
            FROM 
            topics
            WHERE 1=1
                AND ($1::int IS NULL OR category_id = $1::int)
                AND ($2::text IS NULL OR title ILIKE CONCAT('%',$2::text,'%'))
                AND ($3::boolean IS NULL OR is_del = $3::boolean)
                AND (
                    ($4::TIMESTAMPTZ IS NULL OR $5::TIMESTAMPTZ IS NULL)
                    OR
                    (dateline BETWEEN $4::TIMESTAMPTZ AND $5::TIMESTAMPTZ)
                )"#,
        )
        .bind(&category_id)
        .bind(&keyword)
        .bind(&is_del)
        .bind(&start)
        .bind(&end)
        .fetch_one(&*self.pool)
        .await
        .map_err(|err| tonic::Status::internal(err.to_string()))?;

        let record_total: i64 = row.get(0);
        let page_totoal = f64::ceil(record_total as f64 / page_size as f64) as i64;

        let rows = sqlx::query(
            r#"
        SELECT 
            id,title,content,summary,is_del,category_id,dateline,hit FROM topics
         WHERE 1=1
            AND ($3::int IS NULL OR category_id = $3::int)
            AND ($4::text IS NULL OR title ILIKE CONCAT('%',$4::text,'%'))
            AND ($5::boolean IS NULL OR is_del = $5::boolean)
            AND (
                ($6::TIMESTAMPTZ IS NULL OR $7::TIMESTAMPTZ IS NULL)
                OR
                (dateline BETWEEN $6::TIMESTAMPTZ AND $7::TIMESTAMPTZ)
            )
        ORDER BY 
            id DESC
        LIMIT 
            $1
        OFFSET
            $2
        "#,
        )
        .bind(page_size)
        .bind(offset)
        .bind(&category_id)
        .bind(&keyword)
        .bind(&is_del)
        .bind(&start)
        .bind(&end)
        .fetch_all(&*self.pool)
        .await
        .map_err(|err| tonic::Status::internal(err.to_string()))?;

        let mut topics = Vec::with_capacity(rows.len());

        for row in rows {
            let dt: DateTime<Local> = row.get("dateline");
            let dateline = dt_conver(&dt);
            topics.push(blog_proto::Topic {
                id: row.get("id"),
                title: row.get("title"),
                category_id: row.get("category_id"),
                content: row.get("content"),
                summary: row.get("summary"),
                hit: row.get("hit"),
                is_del: row.get("is_del"),
                dateline,
            });
        }

        Ok(tonic::Response::new(ListTopicReply {
            page,
            page_size,
            topics,
            record_total,
            page_totoal,
        }))
    }
}

由于该方法接收的筛选条件过于复杂,如果使用 rust 来拼接 SQL 将会非常麻烦,我们利用 PostgreSQL 来处理。

AND ($4::text IS NULL OR title ILIKE CONCAT('%',$4::text,'%')) 为例:

  • $4::text :将绑定的第4号位的参数转换成 TEXT。由于我们传入的4号位的参数是 keyword,它是一个 Option<String>,则:
    • 如果是 Some(s),转换成功,且参数的值是s,比如传入 Some("axum.rs".to_string()),那么 $4::text 将转换成 'axum.rs'
    • 如果是None,则无法转换成 TEXT,结果为 NULL
  • ILIKE:不区分大小写的模糊匹配
  • CONCAT:字符串拼接。CONCAT('%', 'axum.rs', '%') 的结果是:'%axum.rs%'

运行和测试

运行、测试文章服务和运行、测试分类服务相似,请参考上一章节,以及通过 git 查看代码。

本章代码位于03/实现文章服务分支