Appearance
MongoDB 聚合管道
聚合管道是MongoDB中用于处理数据并返回计算结果的强大工具。它允许对文档进行一系列转换操作,最终产生一个或多个结果文档。
聚合管道基础
聚合管道由一系列阶段(stages)组成,每个阶段对输入文档进行转换操作,然后将结果传递给下一个阶段。
基本语法
javascript
db.collection.aggregate([
{ $stage1: { ... } },
{ $stage2: { ... } },
{ $stage3: { ... } }
])
简单示例
javascript
// 计算每个状态的用户数量
db.users.aggregate([
{
$group: {
_id: "$status",
count: { $sum: 1 }
}
},
{
$sort: { count: -1 }
}
])
常用聚合阶段
$match 阶段
$match用于过滤文档,类似于find()操作。
javascript
// 过滤活跃用户
db.users.aggregate([
{
$match: {
status: "active",
age: { $gte: 18 }
}
}
])
// 复杂匹配条件
db.orders.aggregate([
{
$match: {
$and: [
{ orderDate: { $gte: new Date("2023-01-01") } },
{ status: { $in: ["completed", "shipped"] } },
{ totalAmount: { $gte: 100 } }
]
}
}
])
$project 阶段
$project用于选择输出文档中的字段。
javascript
// 只返回特定字段
db.users.aggregate([
{
$project: {
name: 1,
email: 1,
age: 1,
_id: 0 // 排除_id字段
}
}
])
// 字段重命名和计算
db.users.aggregate([
{
$project: {
fullName: { $concat: ["$firstName", " ", "$lastName"] },
age: 1,
isAdult: { $gte: ["$age", 18] },
email: 1
}
}
])
// 条件投影
db.users.aggregate([
{
$project: {
name: 1,
status: 1,
displayName: {
$cond: {
if: { $eq: ["$status", "premium"] },
then: { $concat: ["VIP ", "$name"] },
else: "$name"
}
}
}
}
])
$group 阶段
$group用于将文档分组并执行聚合操作。
javascript
// 基本分组
db.orders.aggregate([
{
$group: {
_id: "$customerId",
totalOrders: { $sum: 1 },
totalAmount: { $sum: "$amount" },
avgAmount: { $avg: "$amount" },
maxAmount: { $max: "$amount" },
minAmount: { $min: "$amount" }
}
}
])
// 复合分组键
db.orders.aggregate([
{
$group: {
_id: {
customerId: "$customerId",
month: { $month: "$orderDate" },
year: { $year: "$orderDate" }
},
totalOrders: { $sum: 1 },
totalAmount: { $sum: "$amount" }
}
}
])
// 分组并收集数组
db.orders.aggregate([
{
$group: {
_id: "$customerId",
orderIds: { $push: "$orderId" },
amounts: { $push: "$amount" },
uniqueProducts: { $addToSet: "$productId" }
}
}
])
$sort 阶段
$sort用于对文档进行排序。
javascript
// 基本排序
db.users.aggregate([
{
$sort: { age: -1, name: 1 } // 按年龄降序,姓名升序
}
])
// 在管道中间排序
db.orders.aggregate([
{
$match: { status: "completed" }
},
{
$sort: { amount: -1 }
},
{
$limit: 10
}
])
$limit 和 $skip 阶段
用于分页操作。
javascript
// 分页查询
db.users.aggregate([
{ $sort: { createdAt: -1 } },
{ $skip: 10 }, // 跳过前10个
{ $limit: 10 } // 返回10个
])
// 深度分页优化
db.users.aggregate([
{ $sort: { createdAt: -1 } },
{ $match: { createdAt: { $lt: lastDate } } }, // 使用范围查询替代skip
{ $limit: 10 }
])
$lookup 阶段
$lookup用于执行左外连接。
javascript
// 基本连接
db.orders.aggregate([
{
$lookup: {
from: "customers",
localField: "customerId",
foreignField: "_id",
as: "customerInfo"
}
}
])
// 复杂连接条件
db.orders.aggregate([
{
$lookup: {
from: "products",
let: { productIds: "$productIds" },
pipeline: [
{
$match: {
$expr: { $in: ["$_id", "$$productIds"] }
}
},
{
$project: {
name: 1,
price: 1,
category: 1
}
}
],
as: "products"
}
}
])
// 多连接
db.orders.aggregate([
{
$lookup: {
from: "customers",
localField: "customerId",
foreignField: "_id",
as: "customer"
}
},
{
$lookup: {
from: "products",
localField: "productId",
foreignField: "_id",
as: "product"
}
}
])
高级聚合操作
$unwind 阶段
$unwind用于展开数组字段。
javascript
// 展开数组
db.users.aggregate([
{
$unwind: "$tags"
},
{
$group: {
_id: "$tags",
count: { $sum: 1 }
}
}
])
// 展开嵌套数组
db.users.aggregate([
{
$unwind: {
path: "$hobbies",
preserveNullAndEmptyArrays: true // 保留null和空数组
}
}
])
// 展开多个数组
db.users.aggregate([
{
$unwind: "$orders"
},
{
$unwind: "$orders.items"
}
])
$addFields 阶段
$addFields用于添加新字段而不改变现有字段。
javascript
// 添加计算字段
db.orders.aggregate([
{
$addFields: {
tax: { $multiply: ["$amount", 0.08] },
total: { $add: ["$amount", { $multiply: ["$amount", 0.08] }] },
isHighValue: { $gte: ["$amount", 1000] }
}
}
])
// 条件字段添加
db.users.aggregate([
{
$addFields: {
category: {
$switch: {
branches: [
{ case: { $gte: ["$age", 65] }, then: "senior" },
{ case: { $gte: ["$age", 18] }, then: "adult" },
{ case: { $lt: ["$age", 18] }, then: "minor" }
],
default: "unknown"
}
}
}
}
])
$facet 阶段
$falicit用于在一个阶段内执行多个聚合操作。
javascript
// 多维度分析
db.products.aggregate([
{
$facet: {
priceStats: [
{
$group: {
_id: null,
avgPrice: { $avg: "$price" },
minPrice: { $min: "$price" },
maxPrice: { $max: "$price" }
}
}
],
categoryDistribution: [
{
$group: {
_id: "$category",
count: { $sum: 1 }
}
},
{ $sort: { count: -1 } }
],
topProducts: [
{ $sort: { sales: -1 } },
{ $limit: 10 }
]
}
}
])
$bucket 阶段
$bucket用于将文档分组到指定的桶中。
javascript
// 按价格分桶
db.products.aggregate([
{
$bucket: {
groupBy: "$price",
boundaries: [0, 100, 500, 1000, Infinity],
default: "other",
output: {
count: { $sum: 1 },
averagePrice: { $avg: "$price" },
products: { $push: "$name" }
}
}
}
])
// 按年龄分桶
db.users.aggregate([
{
$bucket: {
groupBy: "$age",
boundaries: [0, 18, 30, 50, 65, 100],
output: {
count: { $sum: 1 },
avgIncome: { $avg: "$income" }
}
}
}
])
表达式操作符
算术表达式
javascript
// 基本算术
db.orders.aggregate([
{
$addFields: {
subtotal: { $add: ["$quantity", "$price"] },
tax: { $multiply: ["$subtotal", 0.08] },
total: { $add: ["$subtotal", "$tax"] },
average: { $divide: ["$total", "$quantity"] },
remainder: { $mod: ["$total", 10] }
}
}
])
日期表达式
javascript
// 日期操作
db.orders.aggregate([
{
$addFields: {
orderYear: { $year: "$orderDate" },
orderMonth: { $month: "$orderDate" },
orderDay: { $dayOfMonth: "$orderDate" },
orderHour: { $hour: "$orderDate" },
daysSinceOrder: {
$divide: [
{ $subtract: [new Date(), "$orderDate"] },
1000 * 60 * 60 * 24
]
}
}
}
])
// 日期格式化
db.orders.aggregate([
{
$addFields: {
formattedDate: {
$dateToString: {
format: "%Y-%m-%d",
date: "$orderDate"
}
}
}
}
])
字符串表达式
javascript
// 字符串操作
db.users.aggregate([
{
$addFields: {
firstName: { $substr: ["$fullName", 0, { $indexOfBytes: ["$fullName", " "] }] },
lastName: {
$substr: [
"$fullName",
{ $add: [{ $indexOfBytes: ["$fullName", " "], 1 }] },
{ $strLenBytes: "$fullName" }
]
},
emailDomain: {
$arrayElemAt: [
{ $split: ["$email", "@"] },
1
]
},
displayName: {
$concat: [
{ $toUpper: { $substr: ["$name", 0, 1] } },
{ $substr: ["$name", 1, { $strLenBytes: "$name" }] }
]
}
}
}
])
性能优化
索引使用
javascript
// 在聚合中利用索引
db.orders.aggregate([
{ $match: { orderDate: { $gte: new Date("2023-01-01") } } }, // 使用索引
{ $sort: { amount: -1 } }, // 使用索引
{ $limit: 100 }
])
// 创建适合聚合的索引
db.orders.createIndex({ orderDate: 1, amount: -1 })
阶段顺序优化
javascript
// 推荐的阶段顺序
db.orders.aggregate([
{ $match: { status: "completed" } }, // 尽早过滤
{ $lookup: { /* join */ } }, // 连接
{ $unwind: "$items" }, // 展开数组
{ $group: { /* 分组 */ } }, // 分组聚合
{ $match: { total: { $gte: 100 } } }, // 过滤聚合结果
{ $sort: { total: -1 } }, // 排序
{ $limit: 10 } // 限制结果
])
内存使用优化
javascript
// 对于大数据集使用allowDiskUse
db.largeCollection.aggregate([
{ $group: { /* ... */ } }
], { allowDiskUse: true })
// 分批处理大数据集
db.collection.aggregate([
{ $sort: { _id: 1 } },
{ $limit: 10000 },
{ $out: "temp_collection" }
])
实际应用案例
电商分析
javascript
// 电商销售分析
db.orders.aggregate([
{
$match: {
orderDate: {
$gte: new Date("2023-01-01"),
$lt: new Date("2024-01-01")
}
}
},
{
$lookup: {
from: "customers",
localField: "customerId",
foreignField: "_id",
as: "customer"
}
},
{
$unwind: "$customer"
},
{
$addFields: {
customerRegion: "$customer.region"
}
},
{
$group: {
_id: {
region: "$customerRegion",
month: { $month: "$orderDate" }
},
totalSales: { $sum: "$amount" },
orderCount: { $sum: 1 },
avgOrderValue: { $avg: "$amount" },
uniqueCustomers: { $addToSet: "$customerId" }
}
},
{
$project: {
region: "$_id.region",
month: "$_id.month",
totalSales: 1,
orderCount: 1,
avgOrderValue: 1,
customerCount: { $size: "$uniqueCustomers" }
}
},
{
$sort: { "region": 1, "month": 1 }
}
])
用户行为分析
javascript
// 用户行为分析
db.pageViews.aggregate([
{
$match: {
timestamp: {
$gte: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000) // 最近7天
}
}
},
{
$group: {
_id: "$userId",
pageViews: { $sum: 1 },
uniquePages: { $addToSet: "$page" },
sessionCount: { $sum: { $cond: [{ $eq: ["$isNewSession", true] }, 1, 0] } },
avgSessionDuration: { $avg: "$sessionDuration" }
}
},
{
$addFields: {
engagementScore: {
$add: [
{ $multiply: ["$pageViews", 0.3] },
{ $multiply: [{ $size: "$uniquePages" }, 0.4] },
{ $multiply: ["$sessionCount", 0.2] },
{ $multiply: ["$avgSessionDuration", 0.1] }
]
}
}
},
{
$sort: { engagementScore: -1 }
},
{
$limit: 100
}
])
实时仪表板数据
javascript
// 实时仪表板数据
db.aggregate([
{
$facet: {
dailyStats: [
{
$match: {
timestamp: { $gte: new Date(new Date().setHours(0, 0, 0, 0)) }
}
},
{
$group: {
_id: null,
newUsers: { $sum: 1 },
totalRevenue: { $sum: "$amount" },
avgOrderValue: { $avg: "$amount" }
}
}
],
weeklyTrend: [
{
$match: {
timestamp: {
$gte: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000)
}
}
},
{
$group: {
_id: { $dayOfWeek: "$timestamp" },
dailyTotal: { $sum: "$amount" },
orderCount: { $sum: 1 }
}
}
],
topProducts: [
{ $match: { /* 本周数据 */ } },
{ $group: { _id: "$productId", sales: { $sum: "$quantity" } } },
{ $sort: { sales: -1 } },
{ $limit: 10 }
]
}
}
])
最佳实践
1. 合理使用索引
- 在$match和$sort阶段前使用索引
- 为常用的聚合字段创建复合索引
- 监控聚合查询的执行计划
2. 优化阶段顺序
- 尽早使用$match减少数据量
- 合理安排$lookup的位置
- 在必要时使用$project减少数据传输
3. 内存管理
- 监控聚合操作的内存使用
- 对于大数据集使用allowDiskUse选项
- 考虑分批处理大量数据
4. 性能测试
javascript
// 使用explain分析聚合性能
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $group: { _id: "$customerId", total: { $sum: "$amount" } } }
]).explain("executionStats")
总结
MongoDB聚合管道是一个强大的数据分析工具,通过合理使用各种阶段和操作符,可以实现复杂的数据处理和分析任务。关键是要理解各个阶段的作用,合理安排阶段顺序,并充分利用索引来优化性能。在实际应用中,聚合管道常用于商业智能、报表生成、数据挖掘等场景。