Skip to content
On this page

MongoDB 聚合管道

聚合管道是MongoDB中用于处理数据并返回计算结果的强大工具。它允许对文档进行一系列转换操作,最终产生一个或多个结果文档。

聚合管道基础

聚合管道由一系列阶段(stages)组成,每个阶段对输入文档进行转换操作,然后将结果传递给下一个阶段。

基本语法

javascript
db.collection.aggregate([
  { $stage1: { ... } },
  { $stage2: { ... } },
  { $stage3: { ... } }
])

简单示例

javascript
// 计算每个状态的用户数量
db.users.aggregate([
  {
    $group: {
      _id: "$status",
      count: { $sum: 1 }
    }
  },
  {
    $sort: { count: -1 }
  }
])

常用聚合阶段

$match 阶段

$match用于过滤文档,类似于find()操作。

javascript
// 过滤活跃用户
db.users.aggregate([
  {
    $match: {
      status: "active",
      age: { $gte: 18 }
    }
  }
])

// 复杂匹配条件
db.orders.aggregate([
  {
    $match: {
      $and: [
        { orderDate: { $gte: new Date("2023-01-01") } },
        { status: { $in: ["completed", "shipped"] } },
        { totalAmount: { $gte: 100 } }
      ]
    }
  }
])

$project 阶段

$project用于选择输出文档中的字段。

javascript
// 只返回特定字段
db.users.aggregate([
  {
    $project: {
      name: 1,
      email: 1,
      age: 1,
      _id: 0  // 排除_id字段
    }
  }
])

// 字段重命名和计算
db.users.aggregate([
  {
    $project: {
      fullName: { $concat: ["$firstName", " ", "$lastName"] },
      age: 1,
      isAdult: { $gte: ["$age", 18] },
      email: 1
    }
  }
])

// 条件投影
db.users.aggregate([
  {
    $project: {
      name: 1,
      status: 1,
      displayName: {
        $cond: {
          if: { $eq: ["$status", "premium"] },
          then: { $concat: ["VIP ", "$name"] },
          else: "$name"
        }
      }
    }
  }
])

$group 阶段

$group用于将文档分组并执行聚合操作。

javascript
// 基本分组
db.orders.aggregate([
  {
    $group: {
      _id: "$customerId",
      totalOrders: { $sum: 1 },
      totalAmount: { $sum: "$amount" },
      avgAmount: { $avg: "$amount" },
      maxAmount: { $max: "$amount" },
      minAmount: { $min: "$amount" }
    }
  }
])

// 复合分组键
db.orders.aggregate([
  {
    $group: {
      _id: {
        customerId: "$customerId",
        month: { $month: "$orderDate" },
        year: { $year: "$orderDate" }
      },
      totalOrders: { $sum: 1 },
      totalAmount: { $sum: "$amount" }
    }
  }
])

// 分组并收集数组
db.orders.aggregate([
  {
    $group: {
      _id: "$customerId",
      orderIds: { $push: "$orderId" },
      amounts: { $push: "$amount" },
      uniqueProducts: { $addToSet: "$productId" }
    }
  }
])

$sort 阶段

$sort用于对文档进行排序。

javascript
// 基本排序
db.users.aggregate([
  {
    $sort: { age: -1, name: 1 }  // 按年龄降序,姓名升序
  }
])

// 在管道中间排序
db.orders.aggregate([
  {
    $match: { status: "completed" }
  },
  {
    $sort: { amount: -1 }
  },
  {
    $limit: 10
  }
])

$limit 和 $skip 阶段

用于分页操作。

javascript
// 分页查询
db.users.aggregate([
  { $sort: { createdAt: -1 } },
  { $skip: 10 },    // 跳过前10个
  { $limit: 10 }    // 返回10个
])

// 深度分页优化
db.users.aggregate([
  { $sort: { createdAt: -1 } },
  { $match: { createdAt: { $lt: lastDate } } },  // 使用范围查询替代skip
  { $limit: 10 }
])

$lookup 阶段

$lookup用于执行左外连接。

javascript
// 基本连接
db.orders.aggregate([
  {
    $lookup: {
      from: "customers",
      localField: "customerId",
      foreignField: "_id",
      as: "customerInfo"
    }
  }
])

// 复杂连接条件
db.orders.aggregate([
  {
    $lookup: {
      from: "products",
      let: { productIds: "$productIds" },
      pipeline: [
        {
          $match: {
            $expr: { $in: ["$_id", "$$productIds"] }
          }
        },
        {
          $project: {
            name: 1,
            price: 1,
            category: 1
          }
        }
      ],
      as: "products"
    }
  }
])

// 多连接
db.orders.aggregate([
  {
    $lookup: {
      from: "customers",
      localField: "customerId",
      foreignField: "_id",
      as: "customer"
    }
  },
  {
    $lookup: {
      from: "products",
      localField: "productId",
      foreignField: "_id",
      as: "product"
    }
  }
])

高级聚合操作

$unwind 阶段

$unwind用于展开数组字段。

javascript
// 展开数组
db.users.aggregate([
  {
    $unwind: "$tags"
  },
  {
    $group: {
      _id: "$tags",
      count: { $sum: 1 }
    }
  }
])

// 展开嵌套数组
db.users.aggregate([
  {
    $unwind: {
      path: "$hobbies",
      preserveNullAndEmptyArrays: true  // 保留null和空数组
    }
  }
])

// 展开多个数组
db.users.aggregate([
  {
    $unwind: "$orders"
  },
  {
    $unwind: "$orders.items"
  }
])

$addFields 阶段

$addFields用于添加新字段而不改变现有字段。

javascript
// 添加计算字段
db.orders.aggregate([
  {
    $addFields: {
      tax: { $multiply: ["$amount", 0.08] },
      total: { $add: ["$amount", { $multiply: ["$amount", 0.08] }] },
      isHighValue: { $gte: ["$amount", 1000] }
    }
  }
])

// 条件字段添加
db.users.aggregate([
  {
    $addFields: {
      category: {
        $switch: {
          branches: [
            { case: { $gte: ["$age", 65] }, then: "senior" },
            { case: { $gte: ["$age", 18] }, then: "adult" },
            { case: { $lt: ["$age", 18] }, then: "minor" }
          ],
          default: "unknown"
        }
      }
    }
  }
])

$facet 阶段

$falicit用于在一个阶段内执行多个聚合操作。

javascript
// 多维度分析
db.products.aggregate([
  {
    $facet: {
      priceStats: [
        {
          $group: {
            _id: null,
            avgPrice: { $avg: "$price" },
            minPrice: { $min: "$price" },
            maxPrice: { $max: "$price" }
          }
        }
      ],
      categoryDistribution: [
        {
          $group: {
            _id: "$category",
            count: { $sum: 1 }
          }
        },
        { $sort: { count: -1 } }
      ],
      topProducts: [
        { $sort: { sales: -1 } },
        { $limit: 10 }
      ]
    }
  }
])

$bucket 阶段

$bucket用于将文档分组到指定的桶中。

javascript
// 按价格分桶
db.products.aggregate([
  {
    $bucket: {
      groupBy: "$price",
      boundaries: [0, 100, 500, 1000, Infinity],
      default: "other",
      output: {
        count: { $sum: 1 },
        averagePrice: { $avg: "$price" },
        products: { $push: "$name" }
      }
    }
  }
])

// 按年龄分桶
db.users.aggregate([
  {
    $bucket: {
      groupBy: "$age",
      boundaries: [0, 18, 30, 50, 65, 100],
      output: {
        count: { $sum: 1 },
        avgIncome: { $avg: "$income" }
      }
    }
  }
])

表达式操作符

算术表达式

javascript
// 基本算术
db.orders.aggregate([
  {
    $addFields: {
      subtotal: { $add: ["$quantity", "$price"] },
      tax: { $multiply: ["$subtotal", 0.08] },
      total: { $add: ["$subtotal", "$tax"] },
      average: { $divide: ["$total", "$quantity"] },
      remainder: { $mod: ["$total", 10] }
    }
  }
])

日期表达式

javascript
// 日期操作
db.orders.aggregate([
  {
    $addFields: {
      orderYear: { $year: "$orderDate" },
      orderMonth: { $month: "$orderDate" },
      orderDay: { $dayOfMonth: "$orderDate" },
      orderHour: { $hour: "$orderDate" },
      daysSinceOrder: {
        $divide: [
          { $subtract: [new Date(), "$orderDate"] },
          1000 * 60 * 60 * 24
        ]
      }
    }
  }
])

// 日期格式化
db.orders.aggregate([
  {
    $addFields: {
      formattedDate: {
        $dateToString: {
          format: "%Y-%m-%d",
          date: "$orderDate"
        }
      }
    }
  }
])

字符串表达式

javascript
// 字符串操作
db.users.aggregate([
  {
    $addFields: {
      firstName: { $substr: ["$fullName", 0, { $indexOfBytes: ["$fullName", " "] }] },
      lastName: {
        $substr: [
          "$fullName",
          { $add: [{ $indexOfBytes: ["$fullName", " "], 1 }] },
          { $strLenBytes: "$fullName" }
        ]
      },
      emailDomain: {
        $arrayElemAt: [
          { $split: ["$email", "@"] },
          1
        ]
      },
      displayName: {
        $concat: [
          { $toUpper: { $substr: ["$name", 0, 1] } },
          { $substr: ["$name", 1, { $strLenBytes: "$name" }] }
        ]
      }
    }
  }
])

性能优化

索引使用

javascript
// 在聚合中利用索引
db.orders.aggregate([
  { $match: { orderDate: { $gte: new Date("2023-01-01") } } },  // 使用索引
  { $sort: { amount: -1 } },  // 使用索引
  { $limit: 100 }
])

// 创建适合聚合的索引
db.orders.createIndex({ orderDate: 1, amount: -1 })

阶段顺序优化

javascript
// 推荐的阶段顺序
db.orders.aggregate([
  { $match: { status: "completed" } },      // 尽早过滤
  { $lookup: { /* join */ } },              // 连接
  { $unwind: "$items" },                    // 展开数组
  { $group: { /* 分组 */ } },               // 分组聚合
  { $match: { total: { $gte: 100 } } },     // 过滤聚合结果
  { $sort: { total: -1 } },                 // 排序
  { $limit: 10 }                            // 限制结果
])

内存使用优化

javascript
// 对于大数据集使用allowDiskUse
db.largeCollection.aggregate([
  { $group: { /* ... */ } }
], { allowDiskUse: true })

// 分批处理大数据集
db.collection.aggregate([
  { $sort: { _id: 1 } },
  { $limit: 10000 },
  { $out: "temp_collection" }
])

实际应用案例

电商分析

javascript
// 电商销售分析
db.orders.aggregate([
  {
    $match: {
      orderDate: {
        $gte: new Date("2023-01-01"),
        $lt: new Date("2024-01-01")
      }
    }
  },
  {
    $lookup: {
      from: "customers",
      localField: "customerId",
      foreignField: "_id",
      as: "customer"
    }
  },
  {
    $unwind: "$customer"
  },
  {
    $addFields: {
      customerRegion: "$customer.region"
    }
  },
  {
    $group: {
      _id: {
        region: "$customerRegion",
        month: { $month: "$orderDate" }
      },
      totalSales: { $sum: "$amount" },
      orderCount: { $sum: 1 },
      avgOrderValue: { $avg: "$amount" },
      uniqueCustomers: { $addToSet: "$customerId" }
    }
  },
  {
    $project: {
      region: "$_id.region",
      month: "$_id.month",
      totalSales: 1,
      orderCount: 1,
      avgOrderValue: 1,
      customerCount: { $size: "$uniqueCustomers" }
    }
  },
  {
    $sort: { "region": 1, "month": 1 }
  }
])

用户行为分析

javascript
// 用户行为分析
db.pageViews.aggregate([
  {
    $match: {
      timestamp: {
        $gte: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000)  // 最近7天
      }
    }
  },
  {
    $group: {
      _id: "$userId",
      pageViews: { $sum: 1 },
      uniquePages: { $addToSet: "$page" },
      sessionCount: { $sum: { $cond: [{ $eq: ["$isNewSession", true] }, 1, 0] } },
      avgSessionDuration: { $avg: "$sessionDuration" }
    }
  },
  {
    $addFields: {
      engagementScore: {
        $add: [
          { $multiply: ["$pageViews", 0.3] },
          { $multiply: [{ $size: "$uniquePages" }, 0.4] },
          { $multiply: ["$sessionCount", 0.2] },
          { $multiply: ["$avgSessionDuration", 0.1] }
        ]
      }
    }
  },
  {
    $sort: { engagementScore: -1 }
  },
  {
    $limit: 100
  }
])

实时仪表板数据

javascript
// 实时仪表板数据
db.aggregate([
  {
    $facet: {
      dailyStats: [
        {
          $match: {
            timestamp: { $gte: new Date(new Date().setHours(0, 0, 0, 0)) }
          }
        },
        {
          $group: {
            _id: null,
            newUsers: { $sum: 1 },
            totalRevenue: { $sum: "$amount" },
            avgOrderValue: { $avg: "$amount" }
          }
        }
      ],
      weeklyTrend: [
        {
          $match: {
            timestamp: {
              $gte: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000)
            }
          }
        },
        {
          $group: {
            _id: { $dayOfWeek: "$timestamp" },
            dailyTotal: { $sum: "$amount" },
            orderCount: { $sum: 1 }
          }
        }
      ],
      topProducts: [
        { $match: { /* 本周数据 */ } },
        { $group: { _id: "$productId", sales: { $sum: "$quantity" } } },
        { $sort: { sales: -1 } },
        { $limit: 10 }
      ]
    }
  }
])

最佳实践

1. 合理使用索引

  • 在$match和$sort阶段前使用索引
  • 为常用的聚合字段创建复合索引
  • 监控聚合查询的执行计划

2. 优化阶段顺序

  • 尽早使用$match减少数据量
  • 合理安排$lookup的位置
  • 在必要时使用$project减少数据传输

3. 内存管理

  • 监控聚合操作的内存使用
  • 对于大数据集使用allowDiskUse选项
  • 考虑分批处理大量数据

4. 性能测试

javascript
// 使用explain分析聚合性能
db.orders.aggregate([
  { $match: { status: "completed" } },
  { $group: { _id: "$customerId", total: { $sum: "$amount" } } }
]).explain("executionStats")

总结

MongoDB聚合管道是一个强大的数据分析工具,通过合理使用各种阶段和操作符,可以实现复杂的数据处理和分析任务。关键是要理解各个阶段的作用,合理安排阶段顺序,并充分利用索引来优化性能。在实际应用中,聚合管道常用于商业智能、报表生成、数据挖掘等场景。