Skip to content
On this page

Prisma 事务处理

Prisma 提供了强大的事务支持,允许您将多个数据库操作组合在一起,确保数据一致性和完整性。本指南将详细介绍如何在 Prisma 中使用事务。

事务基础概念

事务是一组数据库操作,它们作为一个整体执行。事务具有 ACID 特性:

  • 原子性 (Atomicity): 事务中的所有操作要么全部成功,要么全部失败
  • 一致性 (Consistency): 事务执行前后数据库都处于一致状态
  • 隔离性 (Isolation): 并发执行的事务彼此隔离
  • 持久性 (Durability): 事务提交后,更改永久保存

事务类型

1. 函数式事务 (Functional Transactions)

使用函数式方法处理事务:

typescript
import { PrismaClient } from '@prisma/client'
const prisma = new PrismaClient()

// 函数式事务 - 推荐方式
async function transferMoney(fromUserId: number, toUserId: number, amount: number) {
  try {
    const result = await prisma.$transaction(async (tx) => {
      // 从源用户扣除金额
      const fromUser = await tx.user.update({
        where: { id: fromUserId },
        data: {
          balance: {
            decrement: amount,
          },
        },
      })

      // 检查余额是否足够
      if (fromUser.balance < 0) {
        throw new Error('Insufficient funds')
      }

      // 向目标用户添加金额
      const toUser = await tx.user.update({
        where: { id: toUserId },
        data: {
          balance: {
            increment: amount,
          },
        },
      })

      // 创建转账记录
      const transactionRecord = await tx.transactionLog.create({
        data: {
          fromUserId,
          toUserId,
          amount,
          timestamp: new Date(),
        },
      })

      return { fromUser, toUser, transactionRecord }
    })

    console.log('Transfer successful:', result)
    return result
  } catch (error) {
    console.error('Transfer failed:', error)
    throw error
  }
}

2. 语句式事务 (Statement Transactions)

使用语句数组方式处理事务:

typescript
// 语句式事务
async function createUserWithProfileAndPosts() {
  try {
    const [user, profile, post] = await prisma.$transaction([
      // 创建用户
      prisma.user.create({
        data: {
          email: 'user@example.com',
          name: 'New User',
        },
      }),

      // 创建用户资料
      prisma.profile.create({
        data: {
          bio: 'New user bio',
          userId: 1, // 这里应该使用上面创建的用户ID,但在语句式事务中无法引用
        },
      }),

      // 创建初始帖子
      prisma.post.create({
        data: {
          title: 'First Post',
          content: 'Hello world!',
          authorId: 1, // 同样,这里也需要上面创建的用户ID
        },
      }),
    ])

    return { user, profile, post }
  } catch (error) {
    console.error('Transaction failed:', error)
    throw error
  }
}

注意:语句式事务不能在语句之间共享数据,因此对于需要依赖前面操作结果的场景,应使用函数式事务。

高级事务用例

1. 带隔离级别的事务

typescript
// 在支持的数据库上设置隔离级别
async function transactionWithIsolation() {
  try {
    const result = await prisma.$transaction(
      async (tx) => {
        // 事务内的操作
        const user = await tx.user.findUnique({
          where: { id: 1 },
        })

        // 更新操作
        const updatedUser = await tx.user.update({
          where: { id: 1 },
          data: { lastAccessed: new Date() },
        })

        return updatedUser
      },
      {
        isolationLevel: 'ReadCommitted', // 可选的隔离级别
      }
    )
    return result
  } catch (error) {
    console.error('Transaction failed:', error)
    throw error
  }
}

2. 复杂业务逻辑事务

typescript
// 订单处理事务
async function processOrder(orderData: OrderInput) {
  try {
    const result = await prisma.$transaction(
      async (tx) => {
        // 1. 创建订单
        const order = await tx.order.create({
          data: {
            userId: orderData.userId,
            totalAmount: orderData.items.reduce((sum, item) => sum + item.price * item.quantity, 0),
            status: 'PENDING',
          },
        })

        // 2. 检查并更新库存
        for (const item of orderData.items) {
          const product = await tx.product.findUnique({
            where: { id: item.productId },
          })

          if (!product || product.stock < item.quantity) {
            throw new Error(`Insufficient stock for product ${item.productId}`)
          }

          await tx.product.update({
            where: { id: item.productId },
            data: {
              stock: {
                decrement: item.quantity,
              },
              salesCount: {
                increment: item.quantity,
              },
            },
          })
        }

        // 3. 创建订单项
        const orderItems = await Promise.all(
          orderData.items.map((item) =>
            tx.orderItem.create({
              data: {
                orderId: order.id,
                productId: item.productId,
                quantity: item.quantity,
                price: item.price,
              },
            })
          )
        )

        // 4. 扣减用户余额或处理支付
        const user = await tx.user.findUnique({
          where: { id: orderData.userId },
        })

        if (user!.balance < order.totalAmount) {
          throw new Error('Insufficient balance')
        }

        await tx.user.update({
          where: { id: orderData.userId },
          data: {
            balance: {
              decrement: order.totalAmount,
            },
          },
        })

        // 5. 更新订单状态为已完成
        const completedOrder = await tx.order.update({
          where: { id: order.id },
          data: { status: 'COMPLETED' },
        })

        return {
          order: completedOrder,
          orderItems,
        }
      },
      {
        maxWait: 5000, // 等待获取锁的最大时间(毫秒)
        timeout: 10000, // 事务超时时间(毫秒)
      }
    )

    return result
  } catch (error) {
    console.error('Order processing failed:', error)
    throw error
  }
}

3. 带重试机制的事务

typescript
// 带重试的事务
async function transactionWithRetry<T>(
  fn: (tx: any) => Promise<T>,
  maxRetries: number = 3
): Promise<T> {
  let lastError: Error

  for (let i = 0; i < maxRetries; i++) {
    try {
      return await prisma.$transaction(fn)
    } catch (error) {
      lastError = error as Error
      
      // 如果是死锁或其他可重试错误,则重试
      if (
        error instanceof Error &&
        (error.message.includes('deadlock') ||
          error.message.includes('serialization') ||
          error.message.includes('40001'))
      ) {
        console.log(`Transaction attempt ${i + 1} failed, retrying...`)
        await new Promise(resolve => setTimeout(resolve, Math.pow(2, i) * 100)) // 指数退避
        continue
      }

      // 其他错误不重试
      throw error
    }
  }

  throw lastError!
}

// 使用带重试的事务
async function updateUserBalanceWithRetry(userId: number, amount: number) {
  return await transactionWithRetry(async (tx) => {
    const user = await tx.user.findUnique({
      where: { id: userId },
    })

    if (user!.balance + amount < 0) {
      throw new Error('Insufficient funds')
    }

    return await tx.user.update({
      where: { id: userId },
      data: {
        balance: {
          increment: amount,
        },
      },
    })
  })
}

事务中的错误处理

1. 事务回滚

typescript
// 事务中的错误会导致自动回滚
async function transactionWithErrorHandling() {
  try {
    const result = await prisma.$transaction(async (tx) => {
      // 第一步:创建用户
      const user = await tx.user.create({
        data: {
          email: 'test@example.com',
          name: 'Test User',
        },
      })

      // 第二步:创建相关记录
      await tx.profile.create({
        data: {
          bio: 'Test bio',
          userId: user.id,
        },
      })

      // 模拟错误 - 这会导致整个事务回滚
      if (user.name === 'Test User') {
        throw new Error('Invalid user name for this operation')
      }

      // 这行代码不会执行,因为上面抛出了错误
      return user
    })

    return result
  } catch (error) {
    console.error('Transaction rolled back due to error:', error)
    // 在这里可以执行清理操作或记录日志
    throw error
  }
}

2. 自定义错误类型

typescript
// 自定义事务错误
class InsufficientFundsError extends Error {
  constructor(message: string) {
    super(message)
    this.name = 'InsufficientFundsError'
  }
}

class ProductOutOfStockError extends Error {
  constructor(productId: number) {
    super(`Product ${productId} is out of stock`)
    this.name = 'ProductOutOfStockError'
  }
}

// 使用自定义错误的事务
async function purchaseProduct(userId: number, productId: number, quantity: number) {
  try {
    const result = await prisma.$transaction(async (tx) => {
      // 检查产品库存
      const product = await tx.product.findUnique({
        where: { id: productId },
      })

      if (!product || product.stock < quantity) {
        throw new ProductOutOfStockError(productId)
      }

      // 检查用户余额
      const user = await tx.user.findUnique({
        where: { id: userId },
      })

      if (user!.balance < product.price * quantity) {
        throw new InsufficientFundsError('Not enough balance to complete purchase')
      }

      // 更新库存
      await tx.product.update({
        where: { id: productId },
        data: {
          stock: {
            decrement: quantity,
          },
        },
      })

      // 扣减用户余额
      await tx.user.update({
        where: { id: userId },
        data: {
          balance: {
            decrement: product.price * quantity,
          },
        },
      })

      // 创建购买记录
      const purchase = await tx.purchase.create({
        data: {
          userId,
          productId,
          quantity,
          totalPrice: product.price * quantity,
        },
      })

      return purchase
    })

    return result
  } catch (error) {
    if (error instanceof ProductOutOfStockError) {
      console.error('Purchase failed - product out of stock:', error.message)
    } else if (error instanceof InsufficientFundsError) {
      console.error('Purchase failed - insufficient funds:', error.message)
    } else {
      console.error('Purchase failed with unexpected error:', error)
    }
    throw error
  }
}

事务性能优化

1. 事务范围最小化

typescript
// 不好的做法 - 事务范围过大
async function badTransactionExample(userId: number) {
  // 这些操作不需要在事务中
  const externalData = await fetchExternalApi() // 外部 API 调用
  const processedData = processSomeData(externalData) // 数据处理

  // 应该只把数据库操作放在事务中
  const result = await prisma.$transaction(async (tx) => {
    return await tx.user.update({
      where: { id: userId },
      data: { lastActivity: new Date() },
    })
  })

  return result
}

// 更好的做法 - 只将必要的数据库操作放在事务中
async function goodTransactionExample(userId: number) {
  // 外部操作在事务外执行
  const externalData = await fetchExternalApi()
  const processedData = processSomeData(externalData)

  // 事务只包含数据库操作
  const result = await prisma.$transaction(async (tx) => {
    return await tx.user.update({
      where: { id: userId },
      data: {
        lastActivity: new Date(),
        externalDataRef: processedData.id,
      },
    })
  })

  return result
}

2. 避免长事务

typescript
// 避免长时间持有锁
async function avoidLongTransactions() {
  // 不好的做法 - 长时间运行的操作在事务中
  await prisma.$transaction(async (tx) => {
    await tx.user.update({ where: { id: 1 }, data: { status: 'processing' } })
    
    // 模拟长时间运行的操作 - 不要在事务中做这个
    await new Promise(resolve => setTimeout(resolve, 5000)) // 5秒延迟
    
    await tx.user.update({ where: { id: 1 }, data: { status: 'completed' } })
  })

  // 更好的做法 - 长时间操作在事务外
  await prisma.$transaction(async (tx) => {
    await tx.user.update({ where: { id: 1 }, data: { status: 'processing' } })
  })

  // 长时间运行的操作
  await new Promise(resolve => setTimeout(resolve, 5000))

  await prisma.$transaction(async (tx) => {
    await tx.user.update({ where: { id: 1 }, data: { status: 'completed' } })
  })
}

事务监控和日志

1. 事务日志记录

typescript
// 带日志的事务包装器
async function loggedTransaction<T>(
  operationName: string,
  fn: (tx: any) => Promise<T>
): Promise<T> {
  const startTime = Date.now()
  console.log(`Starting transaction: ${operationName}`)

  try {
    const result = await prisma.$transaction(async (tx) => {
      return await fn(tx)
    })

    const duration = Date.now() - startTime
    console.log(`Completed transaction: ${operationName} in ${duration}ms`)
    
    return result
  } catch (error) {
    const duration = Date.now() - startTime
    console.error(`Failed transaction: ${operationName} after ${duration}ms`, error)
    throw error
  }
}

// 使用带日志的事务
async function createUserWithLogging(userData: any) {
  return await loggedTransaction('createUser', async (tx) => {
    const user = await tx.user.create({
      data: userData,
    })

    await tx.auditLog.create({
      data: {
        action: 'USER_CREATED',
        userId: user.id,
        timestamp: new Date(),
      },
    })

    return user
  })
}

通过正确使用 Prisma 的事务功能,您可以确保数据的一致性和完整性,同时处理复杂的业务逻辑。