- 支持试读
内容介绍
本专题将带你实现一个高可用的微服务架构的分布式商城。 - 支持试读
前置知识:etcd及配置中心
本章我们将学习分布式KV存储:etcd的基础知识、单节点部署、集群部署、rust集成以及基于etcd实现的配置中心:写入配置、读取配置、配置的热加载。 - 支持试读
前置知识:分布式ID
本章将以雪花算法(snowflake)为例,讨论分布式ID的生成。 前置知识:etcd鉴权
本章将讨论 etcd 的鉴权:如果没有鉴权,你的 etcd 服务(集群)相当于公共服务器,任何人都可以对数据进行操作。在之前章节中,我们分别用不同方式部署了 etcd。本文针对这些不同的部署方法,来讨论如何开启鉴权。以及如何在 Rust 中访问开启鉴权的 etcd。用户服务
本章我们将开始编写第一个微服务:用户服务 `user-srv`。品牌服务
本章我们将编写品牌微服务。前置知识:MySQL树
继续学习之前,我们来讨论一个话题:MySQL 维护树状态结构。这是一个很常见的需求,最典型的就是分类、评论等等。前置知识:PostgreSQL 树
我们来讨论 PostgreSQL 维护树的数据结构。由于 PostgreSQL 支持递归查询和递归视图,所以很多模式实现起来都非常方便。本章将讨论使用递归视图,实现《邻接表模式》和《路径枚举模式》结合的树模型。前置知识:使用 XID 作为分布式ID
本章讨论使用 xid 来生成分布式ID方案。用户服务【新编】
本章我们将使用 PostgreSQL 重写第一个微服务:用户服务 `user-srv`。品牌服务【新编】
本章我们将继续使用 PostgreSQL 重新编写品牌微服务。商品分类服务
本章我们将实现商品分类服务。整合分类与品牌
本章我们将讨论如何将分类与品牌整合为一个服务。轮播图服务
学习完略显复杂的分类和品牌之后,让我们稍微轻松一下:实现相对简单的轮播图服务。商品服务
本章我们将实现整个项目最核心的服务之一:商品服务。【前置知识】分布式锁(上)
本章我们来讨论一个非常重要的功能:分布式锁。典型场景是:在高并发下,库存的扣减,其中尤为典型的是秒杀场景。【前置知识】分布式锁(下)
我们继续讨论实现分布式锁的第二种方式。SKU和库存服务
本章我们将实现商品的 SKU 和库存、价格等属性。前置知识:Web 3 支付
本章我们将讨论 Web 3支付。购物车服务
本章我们实现购物车服务。地址服务
本章开始,我们将实现最核心的服务:订单服务。它由:订单核心服务、支付服务、地址服务等组成。本章我们实现地址服务。订单服务
本章我们实现订单核心服务。订单商品服务
上一章我们实现了订单核心服务,本章我们实现订单商品服务支付服务
本章我们将实现支付服务,我们将使用 WEB3 支付。服务注册、发现及健康检查
本章我们将实现服务的注册、发现及健康检查。注意,我们现在说的服务,不但包括 gRPC 微服务,还包括基于 axum 的 RESTful API 服务。
前置知识:etcd及配置中心
本章我们将学习分布式KV存储:etcd的基础知识、单节点部署、集群部署、rust集成以及基于etcd实现的配置中心:写入配置、读取配置、配置的热加载。
etcd的安装
etcd 安装非常简单,只需要下载对应平台的预编译二进制程序即可。本章以 debian 11 x64 为例进行演示。
# 下载
wget https://github.com/etcd-io/etcd/releases/download/v3.4.26/etcd-v3.4.26-linux-amd64.tar.gz
# 解压
tar zxvf etcd-v3.4.26-linux-amd64.tar.gz -C /usr/local
# 加入环境变量
export PATH=$PATH:/usr/local/etcd-v3.4.26-linux-amd64
# 测试是否安装成功
etcd -version # etcd 版本
为了便于操作,你可以将
export PATH=$PATH:/usr/local/etcd-v3.4.26-linux-amd64
加入到~/.bashrc
中
为了便于操作,你可以将 export PATH=$PATH:/usr/local/etcd-v3.4.26-linux-amd64
加入到 ~/.bashrc
中
etcd 单机部署
单节点部署
单机单节点部署非常简单,你需要运行 etcd
命令即可:
# 开启单节点
etcd
# 添加/修改:key 为 `name`,值为 `axum.rs`
etcdctl put name 'axum.rs'
# 获取
etcdctl get name
多节点部署
etcd 关键参数说明
参数 | 说明 |
---|---|
--name | 节点名 |
--data-dir | 节点数据存储目录 |
--listen-client-urls | 监听客户端请求的地址列表,多个地址用逗号分隔 |
--advertise-client-urls | 如果 --listen-client-urls 配置了多个地址,这个选项可以给出建议客户端使用的访问地址 |
--listen-peer-urls | 服务端(集群)节点之间通讯的监听地址 |
--initial-advertise-peer-urls | 建议服务端(集群)节点之间通讯使用的地址 |
--initial-cluster-token | 初始化时,集群使用的 token |
--initial-cluster | 初始化时,集群中的节点及地址列表:节点1=节点地址1[,节点2=节点地址2]... |
--initial-cluster-state | 可选值:new 或existing 。new 表示新建集群;existing 表示加入已存在集群 |
etcd集群部署
我们将使用 3 台 Woiden 的 EU-2 数据中心的免费 VPS 进行演示:
由于实验环境是 ipv6,如果你使用的是 ipv4 不用紧张,把
http://[IPv6地址]
换成http://ipv4地址
即可。
由于实验环境是 ipv6,如果你使用的是 ipv4 不用紧张,把 http://[IPv6地址]
换成 http://ipv4地址
即可。
静态部署
静态部署和单机多节点部署非常类似,改成机器的公网IP即可:
# 节点1
etcd --name axumrs-1 --data-dir /var/etcd/axumrs \
--listen-client-urls http://[2a01:*:3159:0001]:2379,http://127.0.0.1:2379 \
--advertise-client-urls http://[2a01:*:3159:0001]:2379 \
--listen-peer-urls http://[2a01:*:3159:0001]:2380 \
--initial-advertise-peer-urls http://[2a01:*:3159:0001]:2380 \
--initial-cluster-token axumrs-etcd-1 \
--initial-cluster 'axumrs-1=http://[2a01:*:3159:0001]:2380,axumrs-2=http://[2a01:*:3206:0001]:2380,axumrs-3=http://[2a01:*:31be:0001]:2380' \
--initial-cluster-state new
# 节点2
etcd --name axumrs-2 --data-dir /var/etcd/axumrs \
--listen-client-urls http://[2a01:*:3206:0001]:2379,http://127.0.0.1:2379 \
--advertise-client-urls http://[2a01:*:3206:0001]:2379 \
--listen-peer-urls http://[2a01:*:3206:0001]:2380 \
--initial-advertise-peer-urls http://[2a01:*:3206:0001]:2380 \
--initial-cluster-token axumrs-etcd-1 \
--initial-cluster 'axumrs-1=http://[2a01:*:3159:0001]:2380,axumrs-2=http://[2a01:*:3206:0001]:2380,axumrs-3=http://[2a01:*:31be:0001]:2380' \
--initial-cluster-state new
# 节点3
etcd --name axumrs-3 --data-dir /var/etcd/axumrs \
--listen-client-urls http://[2a01:*:31be:0001]:2379,http://127.0.0.1:2379 \
--advertise-client-urls http://[2a01:*:31be:0001]:2379 \
--listen-peer-urls http://[2a01:*:31be:0001]:2380 \
--initial-advertise-peer-urls http://[2a01:*:31be:0001]:2380 \
--initial-cluster-token axumrs-etcd-1 \
--initial-cluster 'axumrs-1=http://[2a01:*:3159:0001]:2380,axumrs-2=http://[2a01:*:3206:0001]:2380,axumrs-3=http://[2a01:*:31be:0001]:2380' \
--initial-cluster-state new
# 集群节点列表
etcdctl member list
# 5789559cc124f2e1, started, axumrs-3, http://[2a01:*:31be:0001]:2380, http://[2a01:*:31be:0001]:2379, false
# d9c662f0b70e1866, started, axumrs-1, http://[2a01:*:3159:0001]:2380, http://[2a01:*:3159:0001]:2379, false
# f03d1d47f9d679de, started, axumrs-2, http://[2a01:*:3206:0001]:2380, http://[2a01:*:3206:0001]:2379, false
更为灵活的是,使用公共的自动发现服务来部署,比如 etcd 官方提供的 https://discovery.etcd.io。
# 获取专属的服务发现地址
# `size` 是预期的节点数
curl 'https://discovery.etcd.io/new?size=3'
# 结果:(请保存好,后面要用到)
# https://discovery.etcd.io/40c056239fc114368f2bd74f2660c809
# 节点1
etcd --name axumrs-1 --data-dir /var/etcd/axumrs \
--listen-client-urls http://[2a01:*:3159:0001]:2379,http://127.0.0.1:2379 \
--advertise-client-urls http://[2a01:*:3159:0001]:2379 \
--listen-peer-urls http://[2a01:*:3159:0001]:2380 \
--initial-advertise-peer-urls http://[2a01:*:3159:0001]:2380 \
--discovery https://discovery.etcd.io/40c056239fc114368f2bd74f2660c809
# 节点2
etcd --name axumrs-2 --data-dir /var/etcd/axumrs \
--listen-client-urls http://[2a01:*:3206:0001]:2379,http://127.0.0.1:2379 \
--advertise-client-urls http://[2a01:*:3206:0001]:2379 \
--listen-peer-urls http://[2a01:*:3206:0001]:2380 \
--initial-advertise-peer-urls http://[2a01:*:3206:0001]:2380 \
--discovery https://discovery.etcd.io/40c056239fc114368f2bd74f2660c809
# 节点3
etcd --name axumrs-3 --data-dir /var/etcd/axumrs \
--listen-client-urls http://[2a01:*:31be:0001]:2379,http://127.0.0.1:2379 \
--advertise-client-urls http://[2a01:*:31be:0001]:2379 \
--listen-peer-urls http://[2a01:*:31be:0001]:2380 \
--initial-advertise-peer-urls http://[2a01:*:31be:0001]:2380 \
--discovery https://discovery.etcd.io/40c056239fc114368f2bd74f2660c809
# 集群节点列表
etcdctl member list
# 5789559cc124f2e1, started, axumrs-3, http://[2a01:*:31be:0001]:2380, http://[2a01:*:31be:0001]:2379, false
# d9c662f0b70e1866, started, axumrs-1, http://[2a01:*:3159:0001]:2380, http://[2a01:*:3159:0001]:2379, false
# f03d1d47f9d679de, started, axumrs-2, http://[2a01:*:3206:0001]:2380, http://[2a01:*:3206:0001]:2379, false
与Rust集成
我们使用 etcd-rs
来集成 etcd。我们模拟使用 etcd 管理配置:写入配置、载入配置以及配置的热加载
依赖
[dependencies]
tokio={version="1", features=["full"]}
etcd-rs = "1.0"
serde = { version="1", features = ["derive"] }
serde_json = "1"
配置
// src/config.rs
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize, Debug)]
pub struct Config {
pub machine_id: i32,
pub node_id: i32,
pub addr: String,
}
etcd 操作
// src/main.rs
use etcd_rs::{Client, ClientConfig, KeyRange, KeyValueOp, WatchOp};
mod config;
const KEY: &str = "/axum.rs";
#[tokio::main]
async fn main() {
let cli = Client::connect(ClientConfig::new(["http://localhost:2379".into()]))
.await
.unwrap();
// save_cfg(&cli).await;
// load_cfg(&cli).await;
hot_load(&cli).await;
}
#[allow(unused)]
async fn save_cfg(cli: &Client) {
let cfg = config::Config {
machine_id: 1,
node_id: 1,
addr: String::from("127.0.0.1:9527"),
};
let cfg_str = serde_json::to_string(&cfg).unwrap();
cli.put((KEY, cfg_str)).await.unwrap();
}
#[allow(unused)]
async fn load_cfg(cli: &Client) {
let res = cli.get(KEY).await.unwrap();
let kvs = res.kvs;
if kvs.is_empty() {
println!("没有数据");
return;
}
let cfg_str = kvs[0].value_str();
let cfg: config::Config = serde_json::from_str(cfg_str).unwrap();
println!("{:?}", cfg);
}
#[allow(unused)]
async fn hot_load(cli: &Client) {
let (mut stream, cancel) = cli.watch(KeyRange::prefix(KEY)).await.unwrap();
tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
cancel.cancel().await.unwrap();
});
loop {
match stream.inbound().await {
etcd_rs::WatchInbound::Ready(resp) => {
let kv = &resp.events[0].kv;
let cfg_str = kv.value_str();
let cfg: config::Config = serde_json::from_str(cfg_str).unwrap();
println!("配置发生变化:{:?}", cfg);
}
etcd_rs::WatchInbound::Interrupted(e) => {
println!("发生错误:{:?}", e);
}
etcd_rs::WatchInbound::Closed => {
println!("监视流已关闭");
break;
}
}
}
}
连接到etcd
let cli = Client::connect(ClientConfig::new(["http://localhost:2379".into()]))
.await
.unwrap();
将配置写入 etcd
async fn save_cfg(cli: &Client) {
let cfg = config::Config {
machine_id: 1,
node_id: 1,
addr: String::from("127.0.0.1:9527"),
};
let cfg_str = serde_json::to_string(&cfg).unwrap();
cli.put((KEY, cfg_str)).await.unwrap();
}
从 etcd 加载配置
async fn load_cfg(cli: &Client) {
let res = cli.get(KEY).await.unwrap();
let kvs = res.kvs;
if kvs.is_empty() {
println!("没有数据");
return;
}
let cfg_str = kvs[0].value_str();
let cfg: config::Config = serde_json::from_str(cfg_str).unwrap();
println!("{:?}", cfg);
}
- 从 etcd 中读取配置的 JSON 字符串
- 将其反序列化为
Config
配置实例
async fn hot_load(cli: &Client) {
let (mut stream, cancel) = cli.watch(KeyRange::prefix(KEY)).await.unwrap();
tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
cancel.cancel().await.unwrap();
});
loop {
match stream.inbound().await {
etcd_rs::WatchInbound::Ready(resp) => {
let kv = &resp.events[0].kv;
let cfg_str = kv.value_str();
let cfg: config::Config = serde_json::from_str(cfg_str).unwrap();
println!("配置发生变化:{:?}", cfg);
}
etcd_rs::WatchInbound::Interrupted(e) => {
println!("发生错误:{:?}", e);
}
etcd_rs::WatchInbound::Closed => {
println!("监视流已关闭");
break;
}
}
}
}
- 热加载是通过 etcd 的
watch
来实现的- 注意,为了简化演示,本例假设监视的所有改变都是
PUT
操作 - 实际上,
PUT
和DELETE
操作都能监视到,实际开发中应该区分事件的类型
- 注意,为了简化演示,本例假设监视的所有改变都是
- 为了演示取消监视功能,我们特意加了一个
tokio::spawn
,让它休眠 30 秒后,取消监视
- 注意,为了简化演示,本例假设监视的所有改变都是
PUT
操作 - 实际上,
PUT
和DELETE
操作都能监视到,实际开发中应该区分事件的类型