Appearance
MySQL与Node.js集成
MySQL是Node.js应用中最常用的数据库之一。本文档将详细介绍如何在Node.js环境中使用MySQL,包括连接管理、查询优化、ORM集成和最佳实践。
MySQL Node.js驱动
mysql2包
mysql2是Node.js中流行的MySQL客户端,提供了原生Promise支持和更佳的性能。
bash
npm install mysql2
基本连接
javascript
const mysql = require('mysql2');
// 创建连接
const connection = mysql.createConnection({
host: 'localhost',
user: 'root',
password: 'password',
database: 'mydb'
});
// 执行查询
connection.execute('SELECT * FROM users WHERE id = ?', [1], (err, results) => {
if (err) {
console.error('查询错误:', err);
return;
}
console.log(results);
});
connection.end();
使用Promise包装
javascript
const mysql = require('mysql2/promise');
async function getUser(userId) {
const connection = await mysql.createConnection({
host: 'localhost',
user: 'root',
password: 'password',
database: 'mydb'
});
const [rows] = await connection.execute(
'SELECT * FROM users WHERE id = ?',
[userId]
);
await connection.end();
return rows[0];
}
// 使用示例
getUser(1).then(user => console.log(user));
连接池管理
javascript
const mysql = require('mysql2/promise');
// 创建连接池
const pool = mysql.createPool({
host: 'localhost',
user: 'root',
password: 'password',
database: 'mydb',
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0,
acquireTimeout: 60000,
timeout: 60000,
});
// 使用连接池查询
async function getUsers() {
try {
const [rows] = await pool.execute('SELECT * FROM users');
return rows;
} catch (error) {
console.error('查询失败:', error);
throw error;
}
}
// 事务处理
async function transferMoney(fromUserId, toUserId, amount) {
const connection = await pool.getConnection();
try {
await connection.beginTransaction();
// 检查余额
const [balanceRows] = await connection.execute(
'SELECT balance FROM accounts WHERE user_id = ? FOR UPDATE',
[fromUserId]
);
if (balanceRows[0].balance < amount) {
throw new Error('余额不足');
}
// 扣款
await connection.execute(
'UPDATE accounts SET balance = balance - ? WHERE user_id = ?',
[amount, fromUserId]
);
// 入款
await connection.execute(
'UPDATE accounts SET balance = balance + ? WHERE user_id = ?',
[amount, toUserId]
);
// 记录交易
await connection.execute(
'INSERT INTO transactions (from_user, to_user, amount, created_at) VALUES (?, ?, ?, NOW())',
[fromUserId, toUserId, amount]
);
await connection.commit();
console.log('转账成功');
} catch (error) {
await connection.rollback();
console.error('转账失败:', error);
throw error;
} finally {
connection.release();
}
}
ORM集成
Sequelize ORM
Sequelize是Node.js中流行的ORM框架,支持多种数据库。
bash
npm install sequelize mysql2
基本配置
javascript
const { Sequelize, DataTypes } = require('sequelize');
// 创建Sequelize实例
const sequelize = new Sequelize('mydb', 'root', 'password', {
host: 'localhost',
dialect: 'mysql',
logging: console.log, // 生产环境中应设为false
pool: {
max: 10,
min: 0,
acquire: 30000,
idle: 10000
}
});
// 定义模型
const User = sequelize.define('User', {
id: {
type: DataTypes.INTEGER,
primaryKey: true,
autoIncrement: true
},
username: {
type: DataTypes.STRING(50),
allowNull: false,
unique: true
},
email: {
type: DataTypes.STRING(100),
allowNull: false,
unique: true,
validate: {
isEmail: true
}
},
age: {
type: DataTypes.INTEGER,
validate: {
min: 0,
max: 150
}
},
isActive: {
type: DataTypes.BOOLEAN,
defaultValue: true
}
}, {
tableName: 'users',
timestamps: true, // 自动添加createdAt和updatedAt
underscored: true // 使用下划线命名
});
// 同步模型到数据库
(async () => {
await sequelize.sync({ alter: true }); // alter: true会根据模型修改表结构
})();
CRUD操作
javascript
// 创建用户
async function createUser(userData) {
try {
const user = await User.create(userData);
return user.toJSON(); // 返回纯对象
} catch (error) {
console.error('创建用户失败:', error);
throw error;
}
}
// 查询用户
async function findUserById(id) {
try {
const user = await User.findByPk(id);
return user ? user.toJSON() : null;
} catch (error) {
console.error('查询用户失败:', error);
throw error;
}
}
// 更新用户
async function updateUser(id, updateData) {
try {
const [updatedRowsCount] = await User.update(updateData, {
where: { id: id }
});
return updatedRowsCount > 0;
} catch (error) {
console.error('更新用户失败:', error);
throw error;
}
}
// 删除用户
async function deleteUser(id) {
try {
const deletedRowsCount = await User.destroy({
where: { id: id }
});
return deletedRowsCount > 0;
} catch (error) {
console.error('删除用户失败:', error);
throw error;
}
}
// 批量查询
async function findActiveUsers(limit = 10, offset = 0) {
try {
const users = await User.findAll({
where: { isActive: true },
limit: limit,
offset: offset,
order: [['createdAt', 'DESC']]
});
return users.map(user => user.toJSON());
} catch (error) {
console.error('查询用户失败:', error);
throw error;
}
}
高级查询
javascript
// 关联查询
const Order = sequelize.define('Order', {
id: {
type: DataTypes.INTEGER,
primaryKey: true,
autoIncrement: true
},
userId: {
type: DataTypes.INTEGER,
allowNull: false,
references: {
model: User,
key: 'id'
}
},
totalAmount: {
type: DataTypes.DECIMAL(10, 2),
allowNull: false
},
status: {
type: DataTypes.ENUM('pending', 'confirmed', 'shipped', 'delivered', 'cancelled'),
defaultValue: 'pending'
}
}, {
tableName: 'orders'
});
// 定义关联关系
User.hasMany(Order, { foreignKey: 'userId', as: 'orders' });
Order.belongsTo(User, { foreignKey: 'userId', as: 'user' });
// 关联查询示例
async function getUserWithOrders(userId) {
try {
const user = await User.findOne({
where: { id: userId },
include: [{
model: Order,
as: 'orders',
attributes: ['id', 'totalAmount', 'status', 'createdAt'],
where: { status: 'delivered' },
required: false // LEFT JOIN
}]
});
return user ? user.toJSON() : null;
} catch (error) {
console.error('查询用户订单失败:', error);
throw error;
}
}
// 聚合查询
async function getUserStatistics() {
try {
const stats = await User.findAll({
attributes: [
'isActive',
[sequelize.fn('COUNT', sequelize.col('id')), 'count'],
[sequelize.fn('AVG', sequelize.col('age')), 'avgAge']
],
group: ['isActive'],
raw: true // 返回纯对象而不是Sequelize实例
});
return stats;
} catch (error) {
console.error('统计查询失败:', error);
throw error;
}
}
Prisma ORM
Prisma是现代化的数据库工具包。
bash
npm install prisma @prisma/client
npx prisma init
Prisma Schema
prisma
// prisma/schema.prisma
generator client {
provider = "prisma-client-js"
}
datasource db {
provider = "mysql"
url = env("DATABASE_URL")
}
model User {
id Int @id @default(autoincrement())
username String @unique
email String @unique
age Int?
isActive Boolean @default(true)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
orders Order[]
}
model Order {
id Int @id @default(autoincrement())
userId Int
totalAmount Float
status String @default("pending")
createdAt DateTime @default(now())
user User @relation(fields: [userId], references: [id])
}
Prisma使用
javascript
const { PrismaClient } = require('@prisma/client');
const prisma = new PrismaClient();
// 基本查询
async function getUserWithOrders(userId) {
const user = await prisma.user.findUnique({
where: { id: userId },
include: {
orders: {
where: { status: 'delivered' },
orderBy: { createdAt: 'desc' }
}
}
});
return user;
}
// 复杂查询
async function getTopUsersByOrderCount(limit = 10) {
const topUsers = await prisma.user.findMany({
take: limit,
include: {
_count: {
select: { orders: true }
}
},
orderBy: {
orders: {
_count: 'desc'
}
}
});
return topUsers;
}
// 事务处理
async function transferMoney(fromUserId, toUserId, amount) {
const result = await prisma.$transaction(async (tx) => {
// 检查余额
const fromAccount = await tx.account.findUnique({
where: { userId: fromUserId }
});
if (fromAccount.balance < amount) {
throw new Error('余额不足');
}
// 执行转账
await tx.account.update({
where: { userId: fromUserId },
data: { balance: { decrement: amount } }
});
await tx.account.update({
where: { userId: toUserId },
data: { balance: { increment: amount } }
});
// 记录交易
await tx.transaction.create({
data: {
fromUserId,
toUserId,
amount,
createdAt: new Date()
}
});
return { success: true };
});
return result;
}
性能优化
查询优化
javascript
// 使用索引优化查询
// 在数据库中创建复合索引
// CREATE INDEX idx_users_status_created ON users(status, created_at);
class UserService {
constructor(pool) {
this.pool = pool;
}
// 分页查询优化
async getUsersPaginated(page = 1, limit = 10, status = null) {
const offset = (page - 1) * limit;
let query = 'SELECT id, username, email, created_at FROM users';
const params = [];
if (status) {
query += ' WHERE status = ?';
params.push(status);
}
query += ' ORDER BY created_at DESC LIMIT ? OFFSET ?';
params.push(limit, offset);
const [rows] = await this.pool.execute(query, params);
return rows;
}
// 覆盖索引查询
async getUserIdsByEmailDomain(domain) {
// 假设有一个索引覆盖了email和id字段
const [rows] = await this.pool.execute(
'SELECT id FROM users WHERE email LIKE ?',
[`%@${domain}`]
);
return rows.map(row => row.id);
}
// 批量操作优化
async bulkUpdateUserStatus(userIds, newStatus) {
if (!userIds || userIds.length === 0) return 0;
const placeholders = userIds.map(() => '?').join(',');
const query = `UPDATE users SET status = ?, updated_at = NOW() WHERE id IN (${placeholders})`;
const params = [newStatus, ...userIds];
const [result] = await this.pool.execute(query, params);
return result.affectedRows;
}
}
连接池优化
javascript
const mysql = require('mysql2/promise');
class DatabaseManager {
constructor(config) {
this.pool = mysql.createPool({
host: config.host,
user: config.user,
password: config.password,
database: config.database,
charset: 'utf8mb4',
// 连接池配置
connectionLimit: config.connectionLimit || 20,
queueLimit: config.queueLimit || 0,
acquireTimeout: config.acquireTimeout || 60000,
timeout: config.timeout || 60000,
// 连接健康检查
reconnect: true,
insecureAuth: false,
// 性能优化
enableKeepAlive: true,
keepAliveInitialDelay: 0,
});
// 监听连接池事件
this.pool.on('connection', (connection) => {
console.log(`新连接建立: ${connection.threadId}`);
});
this.pool.on('acquire', (connection) => {
console.log(`连接被获取: ${connection.threadId}`);
});
this.pool.on('release', (connection) => {
console.log(`连接被释放: ${connection.threadId}`);
});
}
async query(sql, params) {
try {
const [rows] = await this.pool.execute(sql, params);
return rows;
} catch (error) {
console.error('数据库查询错误:', error);
throw error;
}
}
async transaction(callback) {
const connection = await this.pool.getConnection();
try {
await connection.beginTransaction();
const result = await callback(connection);
await connection.commit();
return result;
} catch (error) {
await connection.rollback();
throw error;
} finally {
connection.release();
}
}
async close() {
await this.pool.end();
}
}
错误处理和重试机制
javascript
class RobustDatabaseService {
constructor(pool) {
this.pool = pool;
}
async executeWithRetry(query, params, maxRetries = 3, delay = 1000) {
let lastError;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
const [rows] = await this.pool.execute(query, params);
return rows;
} catch (error) {
lastError = error;
// 只对特定错误进行重试
if (attempt < maxRetries && this.isRetryableError(error)) {
console.warn(`查询失败,第${attempt}次重试:`, error.message);
await this.sleep(delay * Math.pow(2, attempt - 1)); // 指数退避
} else {
break;
}
}
}
throw lastError;
}
isRetryableError(error) {
// 可重试的错误类型
const retryableCodes = [
'PROTOCOL_CONNECTION_LOST',
'ECONNRESET',
'ETIMEDOUT',
'EPIPE',
'ENOTFOUND',
'EPERM',
'EHOSTUNREACH'
];
return retryableCodes.includes(error.code) ||
error.errno === 1213 || // Deadlock found
error.errno === 1205; // Lock wait timeout
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
async executeInTransaction(operation, maxRetries = 3) {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
const connection = await this.pool.getConnection();
try {
await connection.beginTransaction();
const result = await operation(connection);
await connection.commit();
return result;
} catch (error) {
await connection.rollback();
if (attempt < maxRetries && error.errno === 1213) { // Deadlock
console.warn(`死锁检测到,第${attempt}次重试`);
await this.sleep(100 * attempt);
continue;
}
throw error;
} finally {
connection.release();
}
}
}
}
安全最佳实践
SQL注入防护
javascript
// ❌ 错误:容易受到SQL注入攻击
const badQuery = `SELECT * FROM users WHERE id = ${userId}`;
// ✅ 正确:使用参数化查询
const goodQuery = 'SELECT * FROM users WHERE id = ?';
const [rows] = await connection.execute(goodQuery, [userId]);
// 在ORM中同样使用参数化查询
const user = await User.findOne({
where: {
email: userInput, // ORM会自动处理转义
}
});
连接安全
javascript
const mysql = require('mysql2/promise');
const securePool = mysql.createPool({
host: 'localhost',
user: 'app_user',
password: process.env.DB_PASSWORD, // 从环境变量获取
database: 'mydb',
ssl: {
rejectUnauthorized: true // 验证SSL证书
},
charset: 'utf8mb4',
timezone: '+08:00', // 设置时区
compress: true, // 启用压缩
});
监控和日志
javascript
const winston = require('winston');
class MonitoredDatabaseService {
constructor(pool) {
this.pool = pool;
this.logger = winston.createLogger({
level: 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.json()
),
transports: [
new winston.transports.File({ filename: 'database.log' }),
new winston.transports.Console()
]
});
}
async queryWithLogging(sql, params) {
const startTime = Date.now();
const queryId = Math.random().toString(36).substr(2, 9);
this.logger.info('Query started', {
queryId,
sql: this.maskSensitiveData(sql),
params: this.maskSensitiveParams(params)
});
try {
const [rows] = await this.pool.execute(sql, params);
const duration = Date.now() - startTime;
this.logger.info('Query completed', {
queryId,
duration: `${duration}ms`,
rowCount: Array.isArray(rows) ? rows.length : 0
});
return rows;
} catch (error) {
const duration = Date.now() - startTime;
this.logger.error('Query failed', {
queryId,
duration: `${duration}ms`,
error: error.message,
sql: this.maskSensitiveData(sql)
});
throw error;
}
}
maskSensitiveData(sql) {
// 简单的SQL掩码,实际使用时可能需要更复杂的逻辑
return sql.replace(/password\s*=\s*'[^']*'/gi, "password='***'");
}
maskSensitiveParams(params) {
if (!Array.isArray(params)) return params;
return params.map(param =>
typeof param === 'string' && param.toLowerCase().includes('password')
? '***'
: param
);
}
}
实际应用示例
用户服务类
javascript
class UserService {
constructor(databaseManager) {
this.db = databaseManager;
}
async createUser(userData) {
const { username, email, password, age } = userData;
return await this.db.transaction(async (connection) => {
// 检查用户是否已存在
const [existingUsers] = await connection.execute(
'SELECT id FROM users WHERE username = ? OR email = ?',
[username, email]
);
if (existingUsers.length > 0) {
throw new Error('用户名或邮箱已存在');
}
// 插入新用户
const [result] = await connection.execute(
'INSERT INTO users (username, email, password_hash, age) VALUES (?, ?, ?, ?)',
[username, email, this.hashPassword(password), age]
);
// 创建用户资料
await connection.execute(
'INSERT INTO user_profiles (user_id, created_at) VALUES (?, NOW())',
[result.insertId]
);
return result.insertId;
});
}
async getUserProfile(userId) {
const [rows] = await this.db.query(`
SELECT
u.id,
u.username,
u.email,
u.age,
up.bio,
up.avatar_url,
COUNT(o.id) as order_count,
COALESCE(SUM(o.total_amount), 0) as total_spent
FROM users u
LEFT JOIN user_profiles up ON u.id = up.user_id
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.id = ?
GROUP BY u.id, up.id
`, [userId]);
return rows[0] || null;
}
hashPassword(password) {
// 实际应用中使用bcrypt或其他安全的哈希算法
return require('crypto').createHash('sha256').update(password).digest('hex');
}
}
总结
MySQL与Node.js集成的最佳实践包括:
- 连接管理:使用连接池管理数据库连接
- 错误处理:实现适当的错误处理和重试机制
- 安全防护:防范SQL注入和其他安全威胁
- 性能优化:使用索引、优化查询和批处理操作
- 监控日志:记录查询性能和错误信息
- 事务管理:正确处理数据库事务确保数据一致性
通过遵循这些最佳实践,可以构建高效、安全、可维护的Node.js应用与MySQL数据库的集成方案。