Skip to content
On this page

Prisma 性能优化

Prisma 提供了多种性能优化策略,帮助您构建高效的数据库应用程序。本指南将详细介绍如何优化 Prisma 查询、减少数据库负载并提升应用性能。

查询优化

使用 Select 和 Include 优化

typescript
import { PrismaClient } from '@prisma/client'

const prisma = new PrismaClient()

// 不推荐:获取所有字段
async function getAllUserDataInefficient() {
  const users = await prisma.user.findMany()
  // 返回所有字段,包括可能不需要的大字段
  return users
}

// 推荐:只选择需要的字段
async function getUserDataEfficient() {
  const users = await prisma.user.findMany({
    select: {
      id: true,
      email: true,
      name: true,
      // 不选择大型字段如 avatar, bio 等
    },
  })
  return users
}

// 智能关联加载
async function getUserWithSelectedPosts() {
  const users = await prisma.user.findMany({
    include: {
      posts: {
        select: {
          id: true,
          title: true,
          published: true,
          // 只选择需要的字段
        },
        where: {
          published: true,
        },
        orderBy: {
          createdAt: 'desc',
        },
        take: 5, // 限制关联数据数量
      },
    },
  })
  return users
}

避免 N+1 查询问题

typescript
// N+1 问题示例(不推荐)
async function getPostsWithNPlusOneProblem() {
  const posts = await prisma.post.findMany()
  
  // 这将导致 N 次额外查询
  for (const post of posts) {
    post.author = await prisma.user.findUnique({
      where: { id: post.authorId }
    }) // 为每个帖子执行一次查询
  }
  
  return posts
}

// 解决 N+1 问题(推荐)
async function getPostsWithoutNPlusOne() {
  const posts = await prisma.post.findMany({
    include: {
      author: {
        select: {
          id: true,
          name: true,
          email: true,
        }
      }
    }
  })
  // 只有一次查询就获取了所有数据
  return posts
}

// 使用 groupBy 避免重复查询
async function getOptimizedGroupedData() {
  const posts = await prisma.post.findMany({
    include: {
      author: true,
      _count: {
        select: { comments: true } // 使用聚合而不是单独查询
      }
    }
  })
  return posts
}

索引优化

在 Schema 中定义索引

prisma
// schema.prisma
model User {
  id    Int     @id @default(autoincrement())
  email String  @unique
  name  String?
  role  String?
  
  // 单列索引
  @@index([role])
  
  // 复合索引
  @@index([email, role])
  
  // 唯一复合索引
  @@unique([name, email])
  
  // 指定索引名称
  @@index([name], map: "idx_user_name")
}

model Post {
  id        Int      @id @default(autoincrement())
  title     String
  content   String?
  published Boolean  @default(false)
  authorId  Int
  author    User     @relation(fields: [authorId], references: [id])
  createdAt DateTime @default(now())
  
  // 按时间查询的索引
  @@index([createdAt])
  
  // 复合索引,适用于常见查询模式
  @@index([published, createdAt])
  
  // 全文搜索索引(数据库支持时)
  @@index([title], type: Fulltext)
}

索引友好查询

typescript
// 利用索引的查询
async function indexOptimizedQueries() {
  // 使用 email 索引
  const user = await prisma.user.findUnique({
    where: { email: 'test@example.com' }
  })

  // 使用复合索引 [email, role]
  const users = await prisma.user.findMany({
    where: {
      email: 'test@example.com',
      role: 'admin' // 使用复合索引的所有部分
    }
  })

  // 使用 [published, createdAt] 索引
  const recentPublishedPosts = await prisma.post.findMany({
    where: {
      published: true, // 索引的第一部分
      createdAt: {
        gte: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000) // 一周内
      }
    },
    orderBy: {
      createdAt: 'desc'
    }
  })

  return { user, users, recentPublishedPosts }
}

分页优化

基础分页

typescript
// 基础偏移分页(适用于小数据集)
async function offsetPagination(page: number, pageSize: number = 10) {
  const skip = (page - 1) * pageSize
  
  const [items, totalCount] = await Promise.all([
    prisma.user.findMany({
      skip,
      take: pageSize,
      orderBy: { id: 'asc' }
    }),
    prisma.user.count()
  ])

  return {
    items,
    totalCount,
    totalPages: Math.ceil(totalCount / pageSize),
    currentPage: page
  }
}

// 游标分页(推荐用于大数据集)
async function cursorPagination(afterCursor?: number, first: number = 10) {
  const where = afterCursor 
    ? { id: { gt: afterCursor } } 
    : {}

  const items = await prisma.user.findMany({
    where,
    take: first,
    orderBy: { id: 'asc' }
  })

  return {
    items,
    hasNextPage: items.length === first,
    endCursor: items.length > 0 ? items[items.length - 1].id : null
  }
}

高效的分页实现

typescript
// 结合索引的高效分页
async function efficientPagination({
  cursor,
  limit = 10,
  orderDirection = 'asc',
  searchTerm = ''
}: {
  cursor?: number
  limit?: number
  orderDirection?: 'asc' | 'desc'
  searchTerm?: string
}) {
  const where: any = {}
  
  if (searchTerm) {
    where.OR = [
      { name: { contains: searchTerm, mode: 'insensitive' } },
      { email: { contains: searchTerm, mode: 'insensitive' } }
    ]
  }

  if (cursor) {
    where.id = {
      [orderDirection === 'asc' ? 'gt' : 'lt']: cursor
    }
  }

  const items = await prisma.user.findMany({
    where,
    take: limit,
    orderBy: { id: orderDirection }
  })

  const hasMore = items.length === limit

  return {
    items,
    nextCursor: hasMore ? items[limit - 1].id : undefined,
    hasMore
  }
}

批量操作优化

批量创建

typescript
// 高效的批量创建
async function batchCreateUsers(usersData: Array<{ email: string; name: string }>) {
  // 使用 createMany 进行批量插入
  const result = await prisma.user.createMany({
    data: usersData,
    skipDuplicates: true // 跳过重复项
  })
  
  return result
}

// 批量更新
async function batchUpdateUsers(userIds: number[], updateData: any) {
  const result = await prisma.user.updateMany({
    where: {
      id: { in: userIds }
    },
    data: updateData
  })
  
  return result
}

// 批量删除
async function batchDeleteUsers(userIds: number[]) {
  const result = await prisma.user.deleteMany({
    where: {
      id: { in: userIds }
    }
  })
  
  return result
}

使用事务优化批量操作

typescript
// 批量操作的事务处理
async function batchOperationsWithTransaction() {
  try {
    const result = await prisma.$transaction([
      // 批量创建
      prisma.user.createMany({
        data: [
          { email: 'user1@example.com', name: 'User 1' },
          { email: 'user2@example.com', name: 'User 2' },
        ],
        skipDuplicates: true
      }),
      
      // 批量更新
      prisma.post.updateMany({
        where: { published: false },
        data: { published: true }
      }),
      
      // 批量删除
      prisma.comment.deleteMany({
        where: { spam: true }
      })
    ], {
      maxWait: 5000, // 等待获取锁的最大时间
      timeout: 10000 // 整个事务的超时时间
    })
    
    return result
  } catch (error) {
    console.error('Batch operation failed:', error)
    throw error
  }
}

连接池和查询优化

连接池配置

typescript
// 优化的 Prisma 客户端配置
const optimizedPrisma = new PrismaClient({
  datasources: {
    db: {
      url: process.env.DATABASE_URL,
    }
  },
  // 连接池配置
  __internal: {
    // 这些是内部选项,在未来版本中可能变化
  },
  // 查询日志(生产环境建议关闭)
  log: process.env.NODE_ENV === 'development' 
    ? ['warn', 'error'] 
    : ['error']
})

// 自定义连接管理
class PrismaConnectionManager {
  private static instance: PrismaClient
  
  static getInstance(): PrismaClient {
    if (!this.instance) {
      this.instance = new PrismaClient({
        datasources: {
          db: { url: process.env.DATABASE_URL }
        }
      })
    }
    return this.instance
  }
  
  static async disconnect() {
    if (this.instance) {
      await this.instance.$disconnect()
    }
  }
}

查询缓存

typescript
// 实现查询缓存
class QueryCache {
  private cache = new Map<string, { data: any; timestamp: number; ttl: number }>()
  
  async getOrSet<T>(
    key: string, 
    fetchFn: () => Promise<T>, 
    ttl: number = 5 * 60 * 1000 // 5分钟默认TTL
  ): Promise<T> {
    const cached = this.cache.get(key)
    
    if (cached && (Date.now() - cached.timestamp) < cached.ttl) {
      console.log(`Cache hit for key: ${key}`)
      return cached.data as T
    }
    
    console.log(`Cache miss for key: ${key}`)
    const data = await fetchFn()
    
    this.cache.set(key, {
      data,
      timestamp: Date.now(),
      ttl
    })
    
    return data as T
  }
  
  invalidate(key: string) {
    this.cache.delete(key)
  }
  
  clear() {
    this.cache.clear()
  }
}

const queryCache = new QueryCache()

// 使用缓存的查询
async function getCachedPopularPosts() {
  return await queryCache.getOrSet(
    'popular-posts',
    async () => {
      return await prisma.post.findMany({
        where: { 
          published: true,
          views: { gte: 1000 }
        },
        orderBy: { views: 'desc' },
        take: 10
      })
    },
    10 * 60 * 1000 // 10分钟TTL
  )
}

性能监控和分析

查询性能监控

typescript
// 查询性能监控
class QueryPerformanceMonitor {
  static async monitorQuery<T>(
    queryName: string, 
    queryFn: () => Promise<T>
  ): Promise<{ result: T; duration: number }> {
    const startTime = performance.now()
    
    try {
      const result = await queryFn()
      const duration = performance.now() - startTime
      
      // 记录慢查询
      if (duration > 1000) { // 超过1秒
        console.warn(`Slow query detected: ${queryName}, Duration: ${duration}ms`)
      }
      
      console.log(`Query ${queryName} completed in ${duration}ms`)
      
      return { result, duration }
    } catch (error) {
      const duration = performance.now() - startTime
      console.error(`Query ${queryName} failed after ${duration}ms:`, error)
      throw error
    }
  }
}

// 使用性能监控
async function monitoredUserQuery(userId: number) {
  const { result, duration } = await QueryPerformanceMonitor.monitorQuery(
    `getUser-${userId}`,
    async () => {
      return await prisma.user.findUnique({
        where: { id: userId },
        include: { posts: { take: 5 } }
      })
    }
  )
  
  return result
}

数据库查询分析

typescript
// 查询分析工具
class QueryAnalyzer {
  // 分析查询模式
  static analyzeQueryPattern(model: string, action: string, args: any) {
    const analysis = {
      model,
      action,
      hasWhere: !!args?.where,
      hasInclude: !!args?.include,
      hasSelect: !!args?.select,
      hasOrderBy: !!args?.orderBy,
      hasLimit: !!args?.take,
      complexity: this.calculateComplexity(args)
    }
    
    return analysis
  }
  
  private static calculateComplexity(args: any): number {
    let complexity = 1
    
    if (args?.include) complexity += 2
    if (args?.where) complexity += this.analyzeWhereClause(args.where)
    if (args?.orderBy) complexity += 1
    if (args?.take && args.take > 100) complexity += 1
    
    return complexity
  }
  
  private static analyzeWhereClause(where: any): number {
    if (!where) return 0
    
    let complexity = 0
    const keys = Object.keys(where)
    
    for (const key of keys) {
      if (['AND', 'OR', 'NOT'].includes(key)) {
        complexity += 2
      } else {
        complexity += 1
      }
    }
    
    return complexity
  }
}

// 应用查询分析中间件
const queryAnalysisMiddleware: Prisma.Middleware = async (params, next) => {
  const analysis = QueryAnalyzer.analyzeQueryPattern(
    params.model,
    params.action,
    params.args
  )
  
  if (analysis.complexity > 5) {
    console.warn('High complexity query detected:', analysis)
  }
  
  return next(params)
}

prisma.$use(queryAnalysisMiddleware)

特定场景优化

大数据集优化

typescript
// 处理大数据集的优化策略
class LargeDataSetHandler {
  // 分批处理大数据集
  static async processLargeDataSet<T>(
    processChunk: (chunk: T[]) => Promise<void>,
    batchSize: number = 1000
  ) {
    let cursor: number | undefined = undefined
    let hasMore = true
    
    while (hasMore) {
      const chunk = await prisma.user.findMany({
        take: batchSize,
        ...(cursor && { cursor: { id: cursor } }),
        orderBy: { id: 'asc' }
      })
      
      if (chunk.length === 0) {
        hasMore = false
        break
      }
      
      await processChunk(chunk)
      
      if (chunk.length < batchSize) {
        hasMore = false
      } else {
        cursor = chunk[chunk.length - 1].id
      }
    }
  }
  
  // 高效的数据导出
  static async exportDataEfficiently(where: any, outputStream: any) {
    const batchSize = 1000
    let cursor: number | undefined = undefined
    let batchCount = 0
    
    do {
      const batch = await prisma.user.findMany({
        where,
        take: batchSize,
        ...(cursor && { cursor: { id: cursor } }),
        orderBy: { id: 'asc' }
      })
      
      if (batch.length === 0) break
      
      // 将批次写入输出流
      outputStream.write(JSON.stringify(batch) + '\n')
      
      if (batch.length < batchSize) break
      cursor = batch[batch.length - 1].id
      batchCount++
      
      // 可选:在处理过程中释放内存
      if (batchCount % 10 === 0) {
        await new Promise(resolve => setImmediate(resolve))
      }
    } while (true)
  }
}

内存优化

typescript
// 内存优化查询
async function memoryEfficientQueries() {
  // 对于只需要计数的场景,避免获取所有数据
  const userCount = await prisma.user.count({
    where: { active: true }
  })
  
  // 使用聚合查询而不是获取所有数据后计算
  const stats = await prisma.post.aggregate({
    where: { published: true },
    _avg: { views: true },
    _count: { _all: true },
    _sum: { views: true }
  })
  
  // 分批处理而不是一次性加载所有数据
  const processBatch = async (offset: number, limit: number) => {
    const batch = await prisma.user.findMany({
      skip: offset,
      take: limit,
      select: {
        id: true,
        email: true,
        // 只选择需要的字段
      }
    })
    
    // 处理批次
    return batch
  }
  
  return { userCount, stats, processBatch }
}

通过实施这些性能优化策略,您可以显著提高 Prisma 应用程序的性能,减少数据库负载,并提供更好的用户体验。