简介
本专题将带你使用 axum 和 gRPC 构建一个分布式的博客系统数据结构与Protobuf
本章对我们项目的数据结构和proto进行定义实现分类服务
本章我们实现分类服务,即 `category-srv`实现文章服务
本章将带你实现文章的 gPRC 服务。实现前台web服务
本章将通过使用 axum 调用分类和文章的 gRPC 服务,来实现博客前台Web服务实现管理员服务
本章我们将实现管理员服务实现后台管理web服务
本章将使用 axum 调用 gRPC 服务来实现后台管理的 web 服务安全与鉴权
本章将讨论使用jwt进行鉴权服务扩容、注册、发现和编排
本章将讨论服务管理相关的话题配置中心服务
本章讨论配置中心的实现总结
本专题试图通过一个分布式博客的案例来探讨使用 rust 实现 gRPC 微服务架构的可行性
实现文章服务
本章将带你实现文章的 gPRC 服务。
首先,创建项目并将到 workspace
中:
cargo new topic-srv
实现 TopicService
以下代码位于 topic-srv/src/server.rs
pub struct Topic {
pool: Arc<PgPool>,
}
impl Topic {
pub fn new(pool: PgPool) -> Self {
Self {
pool: Arc::new(pool),
}
}
}
#[tonic::async_trait]
impl TopicService for Topic {
// ...
}
创建文章 create_topic
的实现
async fn create_topic(
&self,
request: tonic::Request<CreateTopicRequest>,
) -> Result<tonic::Response<CreateTopicReply>, tonic::Status> {
let CreateTopicRequest {
title,
category_id,
content,
summary,
} = request.into_inner();
let summary = match summary {
Some(summary) => summary,
None => get_summary(&content),
};
let row = sqlx::query("INSERT INTO topics (title,category_id,content,summary) VALUES($1,$2,$3,$4) RETURNING id")
.bind(title)
.bind(category_id)
.bind(content)
.bind(summary)
.fetch_one(&*self.pool)
.await.map_err(|err|tonic::Status::internal(err.to_string()))?;
let reply = CreateTopicReply { id: row.get("id") };
Ok(tonic::Response::new(reply))
}
- 获取传入的参数
- 判断是否传入了摘要
summary
,如果没有传入,则从内容正文中截取 - 将数据入库
- 将结果返回
fn get_summary(content: &str) -> String {
if content.len() <= 255 {
return String::from(content);
}
content.chars().into_iter().take(255).collect()
}
修改文章 edit_topic
的实现
async fn edit_topic(
&self,
request: tonic::Request<EditTopicRequest>,
) -> Result<tonic::Response<EditTopicReply>, tonic::Status> {
let r = request.into_inner();
let summary = match r.summary {
Some(s) => s,
None => get_summary(&r.content),
};
let rows_affected = sqlx::query(
"UPDATE topics SET title=$1,content=$2,summary=$3,category_id=$4 WHERE id=$5",
)
.bind(r.title)
.bind(r.content)
.bind(summary)
.bind(r.category_id)
.bind(r.id)
.execute(&*self.pool)
.await
.map_err(|err| tonic::Status::internal(err.to_string()))?
.rows_affected();
Ok(tonic::Response::new(EditTopicReply {
id: r.id,
ok: rows_affected > 0,
}))
}
async fn toggle_topic(
&self,
request: tonic::Request<ToggleTopicRequest>,
) -> Result<tonic::Response<ToggleTopicReply>, tonic::Status> {
let ToggleTopicRequest { id } = request.into_inner();
let row = sqlx::query("UPDATE topics SET is_del=(NOT is_del) WHERE id=$1 RETURNING is_del")
.bind(id)
.fetch_optional(&*self.pool)
.await
.map_err(|err| tonic::Status::internal(err.to_string()))?;
if row.is_none() {
return Err(tonic::Status::not_found("不存在的文章"));
}
Ok(tonic::Response::new(ToggleTopicReply {
id,
is_del: row.unwrap().get("is_del"),
}))
}
获取文章 get_topic
的实现
async fn get_topic(
&self,
request: tonic::Request<GetTopicRequest>,
) -> Result<tonic::Response<GetTopicReply>, tonic::Status> {
let GetTopicRequest {
id,
is_del,
inc_hit,
} = request.into_inner();
let inc_hit = inc_hit.unwrap_or(false); // 增加点击量
if inc_hit {
sqlx::query("UPDATE topics SET hit=hit+1 WHERE id=$1")
.bind(id)
.execute(&*self.pool)
.await
.map_err(|err| tonic::Status::internal(err.to_string()))?;
}
let query = match is_del {
Some(is_del) => sqlx::query("SELECT id,title,content,summary,is_del,category_id,dateline,hit FROM topics WHERE id=$1 AND is_del=$2")
.bind(id).bind(is_del),
None => sqlx::query("SELECT id,title,content,summary,is_del,category_id,dateline,hit FROM topics WHERE id=$1")
.bind(id),
};
let row = query
.fetch_optional(&*self.pool)
.await
.map_err(|err| tonic::Status::internal(err.to_string()))?;
if row.is_none() {
return Err(tonic::Status::not_found("不存在的文章"));
}
let row = row.unwrap();
let dt: DateTime<Local> = row.get("dateline");
let dateline = dt_conver(&dt);
Ok(tonic::Response::new(GetTopicReply {
topic: Some(blog_proto::Topic {
id: row.get("id"),
title: row.get("title"),
category_id: row.get("category_id"),
content: row.get("content"),
summary: row.get("summary"),
hit: row.get("hit"),
is_del: row.get("is_del"),
dateline,
}),
}))
}
- 从传入的参数中判断是否需要同时对点击量进行递增,如果需要,则执行对应的 SQL
- 从数据库中获取对应的记录
- 将结果返回
- 在 proto 中,使用 google 定义的 timestamp
- 在将 proto 生成 rust 代码时,使用了
prost_types::Timestamp
- 在 PostgreSQL 中,使用的是
TIMESTAMP WITH TIME ZONE
,通常会简写成TIMESTAMPTZ
如果将这些不同的定义进行统一处理?
prost_types::Timestamp
本身就是对 proto 中 google 定义的 timestamp 的映射,所以它自然提供了转换功能,它们本质其实就是 i64
。而且 PostgreSQL 中的 TIMESTAMPTZ
更为丰富,从它展示的结果来看,更像是 DateTime
,所以我们使用的方法是:
将 PostgreSQL 中的 TIMESTAMPTZ
对应到 Rust 中的 chrono::DateTime
。所以出现了两个函数:
将 chrono::DateTime
转为prost_types::Timestamp
的dt_conver
函数:
fn dt_conver(dt: &DateTime<Local>) -> Option<prost_types::Timestamp> {
if let Ok(dt) = prost_types::Timestamp::date_time(
dt.year().into(),
dt.month() as u8,
dt.day() as u8,
dt.hour() as u8,
dt.minute() as u8,
dt.second() as u8,
) {
Some(dt)
} else {
None
}
}
将prost_types::Timestamp
转为chrono::DateTime
的tm_conver
函数:
fn tm_cover(tm: Option<prost_types::Timestamp>) -> Option<DateTime<Local>> {
match tm {
Some(tm) => Some(Local.timestamp(tm.seconds, 0)),
None => None,
}
}
文章列表 list_topic
的实现
async fn list_topic(
&self,
request: tonic::Request<ListTopicRequest>,
) -> Result<tonic::Response<ListTopicReply>, tonic::Status> {
let ListTopicRequest {
page,
category_id,
keyword,
is_del,
dateline_range,
} = request.into_inner();
let page = page.unwrap_or(0);
let page_size = 30;
let offset = page * page_size;
let mut start = None;
let mut end = None;
if let Some(dr) = dateline_range {
start = tm_cover(dr.start);
end = tm_cover(dr.end);
}
let row = sqlx::query(
r#"
SELECT
COUNT(*)
FROM
topics
WHERE 1=1
AND ($1::int IS NULL OR category_id = $1::int)
AND ($2::text IS NULL OR title ILIKE CONCAT('%',$2::text,'%'))
AND ($3::boolean IS NULL OR is_del = $3::boolean)
AND (
($4::TIMESTAMPTZ IS NULL OR $5::TIMESTAMPTZ IS NULL)
OR
(dateline BETWEEN $4::TIMESTAMPTZ AND $5::TIMESTAMPTZ)
)"#,
)
.bind(&category_id)
.bind(&keyword)
.bind(&is_del)
.bind(&start)
.bind(&end)
.fetch_one(&*self.pool)
.await
.map_err(|err| tonic::Status::internal(err.to_string()))?;
let record_total: i64 = row.get(0);
let page_totoal = f64::ceil(record_total as f64 / page_size as f64) as i64;
let rows = sqlx::query(
r#"
SELECT
id,title,content,summary,is_del,category_id,dateline,hit FROM topics
WHERE 1=1
AND ($3::int IS NULL OR category_id = $3::int)
AND ($4::text IS NULL OR title ILIKE CONCAT('%',$4::text,'%'))
AND ($5::boolean IS NULL OR is_del = $5::boolean)
AND (
($6::TIMESTAMPTZ IS NULL OR $7::TIMESTAMPTZ IS NULL)
OR
(dateline BETWEEN $6::TIMESTAMPTZ AND $7::TIMESTAMPTZ)
)
ORDER BY
id DESC
LIMIT
$1
OFFSET
$2
"#,
)
.bind(page_size)
.bind(offset)
.bind(&category_id)
.bind(&keyword)
.bind(&is_del)
.bind(&start)
.bind(&end)
.fetch_all(&*self.pool)
.await
.map_err(|err| tonic::Status::internal(err.to_string()))?;
let mut topics = Vec::with_capacity(rows.len());
for row in rows {
let dt: DateTime<Local> = row.get("dateline");
let dateline = dt_conver(&dt);
topics.push(blog_proto::Topic {
id: row.get("id"),
title: row.get("title"),
category_id: row.get("category_id"),
content: row.get("content"),
summary: row.get("summary"),
hit: row.get("hit"),
is_del: row.get("is_del"),
dateline,
});
}
Ok(tonic::Response::new(ListTopicReply {
page,
page_size,
topics,
record_total,
page_totoal,
}))
}
}
由于该方法接收的筛选条件过于复杂,如果使用 rust 来拼接 SQL 将会非常麻烦,我们利用 PostgreSQL 来处理。
以 AND ($4::text IS NULL OR title ILIKE CONCAT('%',$4::text,'%'))
为例:
$4::text
:将绑定的第4号位的参数转换成TEXT
。由于我们传入的4号位的参数是keyword
,它是一个Option<String>
,则:- 如果是
Some(s)
,转换成功,且参数的值是s
,比如传入Some("axum.rs".to_string())
,那么$4::text
将转换成'axum.rs'
- 如果是
None
,则无法转换成TEXT
,结果为NULL
- 如果是
ILIKE
:不区分大小写的模糊匹配CONCAT
:字符串拼接。CONCAT('%', 'axum.rs', '%')
的结果是:'%axum.rs%'
- 如果是
Some(s)
,转换成功,且参数的值是s
,比如传入Some("axum.rs".to_string())
,那么$4::text
将转换成'axum.rs'
- 如果是
None
,则无法转换成TEXT
,结果为NULL
运行和测试
运行、测试文章服务和运行、测试分类服务相似,请参考上一章节,以及通过 git 查看代码。
本章代码位于03/实现文章服务分支