使用sqlx的事务实现转账

581
2023/06/04 14:32:05

本章我们将通过用户之间转账来讨论 sqlx 的事务。为了保证转账的完整性、正确性,我们必须使用事务来处理。

事务的常用操作

  • 开启事务:conn.begin().await
  • 提交事务:tx.commit().await
  • 回滚事务:tx.rollback().await

实现转账

// src/db/member.rs

pub async fn tran(conn: &sqlx::MySqlPool, t: &model::member::Tran) -> Result<(u64, u64)> {
    let mut tx = conn.begin().await.map_err(Error::from)?;

    let from_aff =
        match sqlx::query("UPDATE member SET balance=balance-? WHERE name=? AND balance>=?")
            .bind(&t.amount)
            .bind(&t.from_member)
            .bind(&t.amount)
            .execute(&mut tx)
            .await
        {
            Ok(r) => r.rows_affected(),
            Err(err) => {
                tx.rollback().await.map_err(Error::from)?;
                return Err(Error::from(err));
            }
        };

    if from_aff < 1 {
        tx.rollback().await.map_err(Error::from)?;
        return Err(Error::tran("转账失败,请检查转出账户是否有足够余额"));
    }

    let to_aff = match sqlx::query("UPDATE member SET balance=balance+? WHERE name=?")
        .bind(&t.amount)
        .bind(&t.to_member)
        .execute(&mut tx)
        .await
    {
        Ok(r) => r.rows_affected(),
        Err(err) => {
            tx.rollback().await.map_err(Error::from)?;
            return Err(Error::from(err));
        }
    };

    tx.commit().await.map_err(Error::from)?;

    Ok((from_aff, to_aff))
}

开启事务

  • 事务必须由 mut 修饰
  • &sqlx::MySqlPoolbegin() 方法可以开启一个事务

转出账户扣款

let from_aff =
    match sqlx::query("UPDATE member SET balance=balance-? WHERE name=? AND balance>=?")
        .bind(&t.amount)
        .bind(&t.from_member)
        .bind(&t.amount)
        .execute(&mut tx)
        .await
    {
        Ok(r) => r.rows_affected(),
        Err(err) => {
            tx.rollback().await.map_err(Error::from)?;
            return Err(Error::from(err));
        }
    };

if from_aff < 1 {
    tx.rollback().await.map_err(Error::from)?;
    return Err(Error::tran("转账失败,请检查转出账户是否有足够余额"));
}
  • 为了确保转出账户有足够的余额,我们的 SQL 语句的 WHERE 条件是:name=? AND balance>=?,即转出账户的名称等于传入的名称、账户余额大于等于转账金额
  • 通过 match 来处理不同的情况
    • 成功时,返回受影响的行数
    • 失败时,回滚事务,并返回错误
  • 注意,execute()的参数不再是 conn ,而是 &mut tx,即事务:execute(&mut tx)
  • if from_aff < 1 :如果受影响的行数小于1,说明扣款失败,这时应该回滚事务。扣款失败可能的原因有:
    • 账户余额不足
    • 输入的会员名称不存在

增加转入账户的余额

let to_aff = match sqlx::query("UPDATE member SET balance=balance+? WHERE name=?")
    .bind(&t.amount)
    .bind(&t.to_member)
    .execute(&mut tx)
    .await
{
    Ok(r) => r.rows_affected(),
    Err(err) => {
        tx.rollback().await.map_err(Error::from)?;
        return Err(Error::from(err));
    }
};
  • 增加转入账户的余额
  • 通过 match 来处理不同的情况,逻辑和转出扣款一致,不再重复
  • 同样注意 execute() 应该使用事务,而不是连接池
  • 你也可以增加 if to_aff < 1 的处理逻辑,这里就当是作业留给你完成

提交事务

tx.commit().await.map_err(Error::from)?;

只有提交了事务,修改的数据才会真实提交到数据库里。