域名 AXUM.RS 将于2025年10月到期。我们无意再对其进行续费,我们希望你能够接续这个域名,让更多 AXUM 开发者继续受益。
  • 方案1️⃣AXUM.RS 域名 = 3000
  • 方案2️⃣方案1️⃣ + 本站所有专题原始 Markdown 文档 = 5000
  • 方案3️⃣方案2️⃣ + 本站原始数据库 = 5500
如果你有意接续这份 AXUM 情怀,请与我们取得联系。
说明:
  1. 如果有人购买 AXUM.RS 域名(方案1️⃣),或者该域名到期,本站将启用新的免费域名继续提供服务。
  2. 如果有人购买了 AXUM.RS 域名,且同时购买了内容和/或数据库(方案2️⃣/方案3️⃣),本站将关闭。届时我们或许会以另一种方式与你再相遇。

前置知识:etcd及配置中心

本章我们将学习分布式KV存储:etcd的基础知识、单节点部署、集群部署、rust集成以及基于etcd实现的配置中心:写入配置、读取配置、配置的热加载。

etcd的安装

etcd 安装非常简单,只需要下载对应平台的预编译二进制程序即可。本章以 debian 11 x64 为例进行演示。

# 下载
wget https://github.com/etcd-io/etcd/releases/download/v3.4.26/etcd-v3.4.26-linux-amd64.tar.gz

# 解压
tar zxvf etcd-v3.4.26-linux-amd64.tar.gz -C /usr/local

# 加入环境变量 
export PATH=$PATH:/usr/local/etcd-v3.4.26-linux-amd64

# 测试是否安装成功
etcd -version # etcd 版本

为了便于操作,你可以将 export PATH=$PATH:/usr/local/etcd-v3.4.26-linux-amd64 加入到 ~/.bashrc

为了便于操作,你可以将 export PATH=$PATH:/usr/local/etcd-v3.4.26-linux-amd64 加入到 ~/.bashrc

etcd 单机部署

单节点部署

单机单节点部署非常简单,你需要运行 etcd 命令即可:

# 开启单节点
etcd

# 添加/修改:key 为 `name`,值为 `axum.rs`
etcdctl put name 'axum.rs'

# 获取
etcdctl get name

多节点部署

etcd 关键参数说明

参数说明
--name节点名
--data-dir节点数据存储目录
--listen-client-urls监听客户端请求的地址列表,多个地址用逗号分隔
--advertise-client-urls如果 --listen-client-urls 配置了多个地址,这个选项可以给出建议客户端使用的访问地址
--listen-peer-urls 服务端(集群)节点之间通讯的监听地址
--initial-advertise-peer-urls建议服务端(集群)节点之间通讯使用的地址
--initial-cluster-token初始化时,集群使用的 token
--initial-cluster初始化时,集群中的节点及地址列表:节点1=节点地址1[,节点2=节点地址2]...
--initial-cluster-state可选值:newexistingnew 表示新建集群;existing 表示加入已存在集群

etcd集群部署

我们将使用 3 台 Woiden 的 EU-2 数据中心的免费 VPS 进行演示:

由于实验环境是 ipv6,如果你使用的是 ipv4 不用紧张,把 http://[IPv6地址] 换成 http://ipv4地址 即可。

由于实验环境是 ipv6,如果你使用的是 ipv4 不用紧张,把 http://[IPv6地址] 换成 http://ipv4地址 即可。

静态部署

静态部署和单机多节点部署非常类似,改成机器的公网IP即可:

# 节点1
etcd --name axumrs-1 --data-dir /var/etcd/axumrs \
--listen-client-urls http://[2a01:*:3159:0001]:2379,http://127.0.0.1:2379  \
--advertise-client-urls http://[2a01:*:3159:0001]:2379 \
--listen-peer-urls http://[2a01:*:3159:0001]:2380 \
--initial-advertise-peer-urls http://[2a01:*:3159:0001]:2380 \
--initial-cluster-token axumrs-etcd-1 \
--initial-cluster 'axumrs-1=http://[2a01:*:3159:0001]:2380,axumrs-2=http://[2a01:*:3206:0001]:2380,axumrs-3=http://[2a01:*:31be:0001]:2380' \
--initial-cluster-state new

# 节点2
etcd --name axumrs-2 --data-dir /var/etcd/axumrs \
--listen-client-urls http://[2a01:*:3206:0001]:2379,http://127.0.0.1:2379  \
--advertise-client-urls http://[2a01:*:3206:0001]:2379 \
--listen-peer-urls http://[2a01:*:3206:0001]:2380 \
--initial-advertise-peer-urls http://[2a01:*:3206:0001]:2380 \
--initial-cluster-token axumrs-etcd-1 \
--initial-cluster 'axumrs-1=http://[2a01:*:3159:0001]:2380,axumrs-2=http://[2a01:*:3206:0001]:2380,axumrs-3=http://[2a01:*:31be:0001]:2380' \
--initial-cluster-state new

# 节点3
etcd --name axumrs-3 --data-dir /var/etcd/axumrs \
--listen-client-urls http://[2a01:*:31be:0001]:2379,http://127.0.0.1:2379  \
--advertise-client-urls http://[2a01:*:31be:0001]:2379 \
--listen-peer-urls http://[2a01:*:31be:0001]:2380 \
--initial-advertise-peer-urls http://[2a01:*:31be:0001]:2380 \
--initial-cluster-token axumrs-etcd-1 \
--initial-cluster 'axumrs-1=http://[2a01:*:3159:0001]:2380,axumrs-2=http://[2a01:*:3206:0001]:2380,axumrs-3=http://[2a01:*:31be:0001]:2380' \
--initial-cluster-state new

# 集群节点列表
etcdctl member list
# 5789559cc124f2e1, started, axumrs-3, http://[2a01:*:31be:0001]:2380, http://[2a01:*:31be:0001]:2379, false
# d9c662f0b70e1866, started, axumrs-1, http://[2a01:*:3159:0001]:2380, http://[2a01:*:3159:0001]:2379, false
# f03d1d47f9d679de, started, axumrs-2, http://[2a01:*:3206:0001]:2380, http://[2a01:*:3206:0001]:2379, false

更为灵活的是,使用公共的自动发现服务来部署,比如 etcd 官方提供的 https://discovery.etcd.io

# 获取专属的服务发现地址
# `size` 是预期的节点数
curl 'https://discovery.etcd.io/new?size=3'
# 结果:(请保存好,后面要用到)
# https://discovery.etcd.io/40c056239fc114368f2bd74f2660c809

# 节点1
etcd --name axumrs-1 --data-dir /var/etcd/axumrs \
--listen-client-urls http://[2a01:*:3159:0001]:2379,http://127.0.0.1:2379 \
--advertise-client-urls http://[2a01:*:3159:0001]:2379 \
--listen-peer-urls http://[2a01:*:3159:0001]:2380 \
--initial-advertise-peer-urls http://[2a01:*:3159:0001]:2380 \
--discovery https://discovery.etcd.io/40c056239fc114368f2bd74f2660c809

# 节点2
etcd --name axumrs-2 --data-dir /var/etcd/axumrs \
--listen-client-urls http://[2a01:*:3206:0001]:2379,http://127.0.0.1:2379 \
--advertise-client-urls http://[2a01:*:3206:0001]:2379 \
--listen-peer-urls http://[2a01:*:3206:0001]:2380 \
--initial-advertise-peer-urls http://[2a01:*:3206:0001]:2380 \
--discovery https://discovery.etcd.io/40c056239fc114368f2bd74f2660c809

# 节点3
etcd --name axumrs-3 --data-dir /var/etcd/axumrs \
--listen-client-urls http://[2a01:*:31be:0001]:2379,http://127.0.0.1:2379 \
--advertise-client-urls http://[2a01:*:31be:0001]:2379 \
--listen-peer-urls http://[2a01:*:31be:0001]:2380 \
--initial-advertise-peer-urls http://[2a01:*:31be:0001]:2380 \
--discovery https://discovery.etcd.io/40c056239fc114368f2bd74f2660c809

# 集群节点列表
etcdctl member list
# 5789559cc124f2e1, started, axumrs-3, http://[2a01:*:31be:0001]:2380, http://[2a01:*:31be:0001]:2379, false
# d9c662f0b70e1866, started, axumrs-1, http://[2a01:*:3159:0001]:2380, http://[2a01:*:3159:0001]:2379, false
# f03d1d47f9d679de, started, axumrs-2, http://[2a01:*:3206:0001]:2380, http://[2a01:*:3206:0001]:2379, false

与Rust集成

我们使用 etcd-rs 来集成 etcd。我们模拟使用 etcd 管理配置:写入配置、载入配置以及配置的热加载

依赖

[dependencies]
tokio={version="1", features=["full"]}
etcd-rs = "1.0"
serde = { version="1", features = ["derive"] }
serde_json = "1"

配置

// src/config.rs

use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize, Debug)]
pub struct Config {
    pub machine_id: i32,
    pub node_id: i32,
    pub addr: String,
}

etcd 操作

// src/main.rs

use etcd_rs::{Client, ClientConfig, KeyRange, KeyValueOp, WatchOp};

mod config;

const KEY: &str = "/axum.rs";

#[tokio::main]
async fn main() {
    let cli = Client::connect(ClientConfig::new(["http://localhost:2379".into()]))
        .await
        .unwrap();
    // save_cfg(&cli).await;
    // load_cfg(&cli).await;
    hot_load(&cli).await;
}

#[allow(unused)]
async fn save_cfg(cli: &Client) {
    let cfg = config::Config {
        machine_id: 1,
        node_id: 1,
        addr: String::from("127.0.0.1:9527"),
    };
    let cfg_str = serde_json::to_string(&cfg).unwrap();
    cli.put((KEY, cfg_str)).await.unwrap();
}

#[allow(unused)]
async fn load_cfg(cli: &Client) {
    let res = cli.get(KEY).await.unwrap();
    let kvs = res.kvs;
    if kvs.is_empty() {
        println!("没有数据");
        return;
    }
    let cfg_str = kvs[0].value_str();
    let cfg: config::Config = serde_json::from_str(cfg_str).unwrap();
    println!("{:?}", cfg);
}

#[allow(unused)]
async fn hot_load(cli: &Client) {
    let (mut stream, cancel) = cli.watch(KeyRange::prefix(KEY)).await.unwrap();

    tokio::spawn(async move {
        tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
        cancel.cancel().await.unwrap();
    });

    loop {
        match stream.inbound().await {
            etcd_rs::WatchInbound::Ready(resp) => {
                let kv = &resp.events[0].kv;
                let cfg_str = kv.value_str();
                let cfg: config::Config = serde_json::from_str(cfg_str).unwrap();
                println!("配置发生变化:{:?}", cfg);
            }
            etcd_rs::WatchInbound::Interrupted(e) => {
                println!("发生错误:{:?}", e);
            }
            etcd_rs::WatchInbound::Closed => {
                println!("监视流已关闭");
                break;
            }
        }
    }
}

连接到etcd

let cli = Client::connect(ClientConfig::new(["http://localhost:2379".into()]))
        .await
        .unwrap();

将配置写入 etcd

async fn save_cfg(cli: &Client) {
    let cfg = config::Config {
        machine_id: 1,
        node_id: 1,
        addr: String::from("127.0.0.1:9527"),
    };
    let cfg_str = serde_json::to_string(&cfg).unwrap();
    cli.put((KEY, cfg_str)).await.unwrap();
}

从 etcd 加载配置

async fn load_cfg(cli: &Client) {
    let res = cli.get(KEY).await.unwrap();
    let kvs = res.kvs;
    if kvs.is_empty() {
        println!("没有数据");
        return;
    }
    let cfg_str = kvs[0].value_str();
    let cfg: config::Config = serde_json::from_str(cfg_str).unwrap();
    println!("{:?}", cfg);
}
  • 从 etcd 中读取配置的 JSON 字符串
  • 将其反序列化为 Config 配置实例
async fn hot_load(cli: &Client) {
    let (mut stream, cancel) = cli.watch(KeyRange::prefix(KEY)).await.unwrap();

    tokio::spawn(async move {
        tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
        cancel.cancel().await.unwrap();
    });

    loop {
        match stream.inbound().await {
            etcd_rs::WatchInbound::Ready(resp) => {
                let kv = &resp.events[0].kv;
                let cfg_str = kv.value_str();
                let cfg: config::Config = serde_json::from_str(cfg_str).unwrap();
                println!("配置发生变化:{:?}", cfg);
            }
            etcd_rs::WatchInbound::Interrupted(e) => {
                println!("发生错误:{:?}", e);
            }
            etcd_rs::WatchInbound::Closed => {
                println!("监视流已关闭");
                break;
            }
        }
    }
}
  • 热加载是通过 etcd 的 watch 来实现的
    • 注意,为了简化演示,本例假设监视的所有改变都是 PUT 操作
    • 实际上,PUTDELETE 操作都能监视到,实际开发中应该区分事件的类型
  • 为了演示取消监视功能,我们特意加了一个 tokio::spawn,让它休眠 30 秒后,取消监视
  • 注意,为了简化演示,本例假设监视的所有改变都是 PUT 操作
  • 实际上,PUTDELETE 操作都能监视到,实际开发中应该区分事件的类型
要查看完整内容,请先登录