本章将讨论 Consul 的安装、部署、API操作以及实现一个简单的 Rust 集成 Consul API。

安装 Consul

Consul 可以在各大主流系统中安装运行。本章主要讨论在 Linux 中安装 Consul。Mac 和 Windows 的安装请查看我们提供的示例脚本

在 Linux 中,也可以使用多种方式安装,比如使用包管理器(apt/yum等)安装、源码安装、docker、下载预编译的二进制文件安装等。下面介绍通过预编译的二进制文件安装,APT 安装也请通过上面的示例脚本查看。

wget https://releases.hashicorp.com/consul/1.13.2/consul_1.13.2_linux_amd64.zip
unzip consul_1.13.2_linux_amd64.zip

Consul 是用 Go 语言开发的,所以它的安装文件只有一个,非常简单方便。

我们提供了 Linux 下的一键安装脚本,可供参考:

DST_DIR=/usr/local/consul/
CONSUL_VERION=1.13.2
TMP_NAME="/tmp/consul.${CONSUL_VERION}.zip"
RC_FILE=~/.bashrc
wget -O $TMP_NAME  "https://releases.hashicorp.com/consul/${CONSUL_VERION}/consul_${CONSUL_VERION}_linux_amd64.zip" && \
unzip $TMP_NAME -d $DST_DIR && \
chmod a+x "${DST_DIR}/consul" && \
echo "export PATH=\$PATH:${DST_DIR}" >> $RC_FILE && \
source $RC_FILE && \
rm -rf $TMP_NAME

Consul 中的角色

Consul 的可执行文件只有一个,那么如何作为服务器提供服务,以及作为客户端进行消费呢?这就涉及到 Consul 的角色。

  • 客户端(client):无状态,将 HTTP 和 DNS 接口请求转发给局域网内的服务端集群。
  • 服务端(server):保存配置信息,高可用集群,每个数据中心的 server 数量推荐为 3 个或 5 个
  • 代理(agent):客户端和服务端统称为 agent

Consul 通过参数来决定是哪个角色,比如:

consul agent -server # 服务端
consul agent -client # 客户端

部署 Consul

Consul Web 的控制面板

Consul 提供了一个 web 版的控制面板,通过它可以直观的对 Consul 进行维护。它的访问地址是:

http://ip:8500

比如对于本地开发而言,它的地址是:

http://127.0.0.1:8500

单节点部署(开发模式)

对于本地开发来说,部署本地单节点的 Consul 是最方便的。 使用 Consul 的开发模式(-dev) 即可:

consul agent -dev -client=0.0.0.0
  • -dev:开发模式
  • -client=0.0.0.0:客户端绑定的IP,这里绑定的是全网卡地址,就是说,其它电脑也可以访问到。

对于 ipv6 来说,可以使用下面的命令进行单节点开发模式的部署:

consul agent -dev -client=[::]

集群部署

非常重要⚠️

官方文档

  • 由于 Raft 算法的要求,服务器的数量一定要是单数,以便胜利进行仲裁。
  • 集群中,服务器数量最少应该是3台
  • 集群中,服务器的数量最好不要超过7台,最好是5台。以减轻节点之间数据同步的负担

下面是对官方描述的简单引述:

下表显示了各种集群大小的仲裁大小和容错性。 推荐的部署是 3 或 5 台服务器。 非常不鼓励单服务器部署,因为在故障情况下数据丢失是不可避免的。

服务器数量仲裁大小容错性
110
220
321
431
532
642
743

服务端部署

# 节点名称
NODE_NAME="server-01" && \
# 数据保存目录
DATA_DIR="/var/consul/data/${NODE_NAME}" && \
# 集群中节点数量
BOOTSTRAP_EXPECT=3 && \
mkdir -p $DATA_DIR && \
consul agent -server \
	-bind=0.0.0.0 \
	-bootstrap-expect=$BOOTSTRAP_EXPECT \
	-data-dir=$DATA_DIR \
	-node=$NODE_NAME
  • -server:服务端
  • -bind:绑定的地址
  • -bootstrap-expect:集群中,服务端节点的数量
  • -data-dir:数据保存目录
  • -node:本节点的名称

这里设置了3台服务器节点,其它两台按相同的方法部署,注意更改NODE_NAME的值。

客户端部署

# 节点名称
NODE_NAME="client-01" && \
# 数据保存目录
DATA_DIR="/var/consul/data/${NODE_NAME}" && \
mkdir -p $DATA_DIR && \
consul agent \
	-client=0.0.0.0 \
	-bind=0.0.0.0 \
	-data-dir=$DATA_DIR \
	-node=$NODE_NAME

加入到服务端主节点

按以上方法部署之后,并没有形成集群。需要:

  • 将其中一个 Server 节点选定为主节点
  • 其它 Server 节点和 Client 加入该主节点

Consul 的原理很简单,主节点什么都不用做,其它节点加入到这个节点,它自动就成为了主节点。

加入主节点的命令很简单:

consul join '主节点IP/主机名/域名'

查询节点

你可以在任意节点上查询节点,以便验证是否成功加入到集群中:

常用的 Consul API

对于集群部署,只需要操作 client 节点的 RESTFul API

Consul API 的 URL 前缀

Consul API 的前缀是 /v1/agent,比如 http://127.0.0.1:8500/v1/agent,希望你要注意的是,Consul 的官方文档里并没有将这个前缀写全,你在看文档的时候不要忘了。

服务注册

 curl -X PUT 'http://127.0.0.1:8500/v1/agent/service/register' -H 'content-type:application/json' -d '{"Name":"axum.rs","ID":"axum.rs","Address":"127.0.0.1","Port":54321}'
  • method: PUT
  • Name:要注册的服务名称
  • ID:要注册的服务ID,如果省略将使用 Name 的值
  • Address:要注册的服务的地址
  • Port:要注册的服务的端口
  • Tags:要注册的服务的标签
  • Check:健康检查
  • 官方文档

服务注册时,Consul 会覆盖相同ID的服务,重复注册不会报错。

服务列表

 curl -X GET 'http://127.0.0.1:8500/v1/agent/services' -H 'content-type:application/json'

服务过滤

接口和服务列表一样,只是增加一个 Query 参数。

 curl -X GET 'http://127.0.0.1:8500/v1/agent/services?filter=Service%20%3D%3D%20axum.rs' -H 'content-type:application/json'

取消已注册的服务

curl -X PUT 'http://127.0.0.1:8500/v1/agent/service/deregister/axum.rs' -H 'content-type:application/json'

健康检查

如果在注册服务的时候指定了健康检查,Consul 会定时对其进行健康检查,当健康检查失败时,该服务会自动被 Consul 移除。

如果没有指定健康检查,Consul 将其视为永远健康。

Consul 对 HTTP 和 gRPC 都提供了健康检查。

在 Rust 中集成 Consul API

我们将使用 reqwest 发起 Consul RESTFul 的 HTTP 请求,来将 Consul API 集成到我们的项目中。

数据结构的定义

src/consul_api/model.rs

ConsulOption - Consul RESTFul API 配置

#[derive(Serialize, Deserialize)]
pub struct ConsulOption {
    pub addr: String,
    pub timeout_sec: u64,
    pub protocol: String,
}
impl Default for ConsulOption {
    fn default() -> Self {
        Self {
            addr: String::from("127.0.0.1:8500"),
            timeout_sec: 1u64,
            protocol: "http".to_string(),
        }
    }
}
  • addr:Consul API 地址
  • timeout_sec:HTTP 请求超时时间
  • protocol:Consul API 使用的协议(HTTPHTTPS

Registration - 服务的注册信息

#[derive(Default, Serialize, Deserialize)]
pub struct Registration {
    pub name: String,
    pub id: String,
    pub tags: Vec<String>,
    pub address: String,
    pub port: i32,
}

impl Registration {
    pub fn new(name: &str, id: &str, tags: Vec<&str>, addr: &str, port: i32) -> Self {
        Self {
            name: name.to_string(),
            id: id.to_string(),
            tags: tags.iter().map(|t| t.to_string()).collect(),
            address: addr.to_string(),
            port,
        }
    }
    pub fn simple_with_tags(name: &str, tags: Vec<&str>, addr: &str, port: i32) -> Self {
        Self::new(name, name, tags, addr, port)
    }
    pub fn simple(name: &str, addr: &str, port: i32) -> Self {
        Self::simple_with_tags(name, vec![], addr, port)
    }
}
  • name:要注册的服务的名称
  • id:要注册的服务的ID
  • tags:要注册的服务的标签
  • address:要注册的服务的地址
  • port:要注册的服务的端口

Service - Consul 返回的服务的信息

#[derive(Default, Serialize, Deserialize, Debug)]
#[serde(rename_all = "PascalCase")]
pub struct Service {
    #[serde(rename = "ID")]
    pub id: String,
    pub service: String,
    pub tags: Vec<String>,
    pub address: String,
    pub port: i32,
    pub datacenter: String,
}
  • id:服务的ID
  • service:服务的名称
  • tags:服务的标签
  • address:服务的地址
  • port:服务的端口
  • datacenter:Consule 数据中心

Services - Consul 返回的服务列表

pub type Services = HashMap<String, Service>;

注意,它是复数形式,不要跟上面的单个服务搞混了

Filter - 过滤

#[derive(Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub enum Filter {
    Service(String),
    ID(String),
}
  • Service(String):按服务的名称来过滤
  • ID(String):按服务的ID来过滤

实现

src/consul_api/mod.rs

结构定义

pub struct Consul {
    option: ConsulOption,
    client: reqwest::Client,
}
  • option : Consul API 配置
  • client:reqwest 客户端

new()

pub fn new(option: ConsulOption) -> Result<Self, reqwest::Error> {
  let client = reqwest::Client::builder()
  .timeout(Duration::from_secs(option.timeout_sec))
  .build()?;
  Ok(Self { option, client })
}
  • 根据配置创建 reqwest 客户端
  • 保存配置

api_url()

fn api_url(&self, api_name: &str) -> String {
  format!(
    "{}://{}/v1/agent/{}",
    &self.option.protocol, &self.option.addr, api_name
  )
}

生成目标 API 的地址。

register() - 服务注册

pub async fn register(&self, registration: &Registration) -> Result<(), reqwest::Error> {
  self.client
  .put(self.api_url("service/register"))
  .json(registration)
  .send()
  .await?;
  Ok(())
}
测试服务注册
#[tokio::test]
async fn test_register_service() {
  let opt = ConsulOption::default();
  let cs = Consul::new(opt);
  assert!(cs.is_ok());
  let cs = cs.unwrap();
  let registration = Registration::simple_with_tags(
    "axum.rs",
    vec!["axum", "tokio", "grpc", "tonic"],
    "127.0.0.1",
    12345,
  );
  let r = cs.register(&registration).await;
  assert!(r.is_ok());
}

deregister() - 取消注册的服务

pub async fn deregister(&self, service_id: &str) -> Result<(), reqwest::Error> {
  let deregister_api = format!("service/deregister/{}", service_id);
  self.client
  .put(self.api_url(&deregister_api))
  .json(&())
  .send()
  .await?;
  Ok(())
}

services() - 服务列表

pub async fn services(&self) -> Result<Services, reqwest::Error> {
  let list: Services = self
  .client
  .get(self.api_url("services"))
  .send()
  .await?
  .json()
  .await?;
  Ok(list)
}

get_service() - 服务发现

pub async fn get_service(&self, filter: &Filter) -> Result<Option<Service>, reqwest::Error> {
  let list = self.services().await?;
  for (_, s) in list {
    let has = match &filter {
      &Filter::ID(id) => id == &s.id,
      &Filter::Service(srv) => srv == &s.service,
    };
    if has {
      return Ok(Some(s));
    }
  }
  Ok(None)
}
测试服务发现
#[tokio::test]
async fn test_get_services() {
  let opt = ConsulOption::default();
  let cs = Consul::new(opt);
  assert!(cs.is_ok());
  let cs = cs.unwrap();
  let filter = Filter::ID("axum.rs".to_string());
  let srv = cs.get_service(&filter).await;
  assert!(srv.is_ok());
  let srv = srv.unwrap();
  assert!(srv.is_some());
  let srv = srv.unwrap();
  println!("{:?}", srv);
}

小结

本章讨论了 Consul 的安装、部署、常用API以及 Rust 集成 Consul 的简单实现。请完成以下作业

  • 在自己电脑上安装 Consul
  • 在自己电脑上部署本地节点开发模式的 Consul
  • 我们的 get_service()【服务发现】没有使用到 Consul 的 Filter 参数,而是通过遍历所有服务来实现。请用 Consul 的 Filter 来改写这个方法。
  • 请自行了解并实现健康检查