Skip to content
On this page

MySQL与Node.js集成

MySQL是Node.js应用中最常用的数据库之一。本文档将详细介绍如何在Node.js环境中使用MySQL,包括连接管理、查询优化、ORM集成和最佳实践。

MySQL Node.js驱动

mysql2包

mysql2是Node.js中流行的MySQL客户端,提供了原生Promise支持和更佳的性能。

bash
npm install mysql2

基本连接

javascript
const mysql = require('mysql2');

// 创建连接
const connection = mysql.createConnection({
  host: 'localhost',
  user: 'root',
  password: 'password',
  database: 'mydb'
});

// 执行查询
connection.execute('SELECT * FROM users WHERE id = ?', [1], (err, results) => {
  if (err) {
    console.error('查询错误:', err);
    return;
  }
  console.log(results);
});

connection.end();

使用Promise包装

javascript
const mysql = require('mysql2/promise');

async function getUser(userId) {
  const connection = await mysql.createConnection({
    host: 'localhost',
    user: 'root',
    password: 'password',
    database: 'mydb'
  });

  const [rows] = await connection.execute(
    'SELECT * FROM users WHERE id = ?',
    [userId]
  );

  await connection.end();
  return rows[0];
}

// 使用示例
getUser(1).then(user => console.log(user));

连接池管理

javascript
const mysql = require('mysql2/promise');

// 创建连接池
const pool = mysql.createPool({
  host: 'localhost',
  user: 'root',
  password: 'password',
  database: 'mydb',
  waitForConnections: true,
  connectionLimit: 10,
  queueLimit: 0,
  acquireTimeout: 60000,
  timeout: 60000,
});

// 使用连接池查询
async function getUsers() {
  try {
    const [rows] = await pool.execute('SELECT * FROM users');
    return rows;
  } catch (error) {
    console.error('查询失败:', error);
    throw error;
  }
}

// 事务处理
async function transferMoney(fromUserId, toUserId, amount) {
  const connection = await pool.getConnection();
  
  try {
    await connection.beginTransaction();
    
    // 检查余额
    const [balanceRows] = await connection.execute(
      'SELECT balance FROM accounts WHERE user_id = ? FOR UPDATE',
      [fromUserId]
    );
    
    if (balanceRows[0].balance < amount) {
      throw new Error('余额不足');
    }
    
    // 扣款
    await connection.execute(
      'UPDATE accounts SET balance = balance - ? WHERE user_id = ?',
      [amount, fromUserId]
    );
    
    // 入款
    await connection.execute(
      'UPDATE accounts SET balance = balance + ? WHERE user_id = ?',
      [amount, toUserId]
    );
    
    // 记录交易
    await connection.execute(
      'INSERT INTO transactions (from_user, to_user, amount, created_at) VALUES (?, ?, ?, NOW())',
      [fromUserId, toUserId, amount]
    );
    
    await connection.commit();
    console.log('转账成功');
  } catch (error) {
    await connection.rollback();
    console.error('转账失败:', error);
    throw error;
  } finally {
    connection.release();
  }
}

ORM集成

Sequelize ORM

Sequelize是Node.js中流行的ORM框架,支持多种数据库。

bash
npm install sequelize mysql2

基本配置

javascript
const { Sequelize, DataTypes } = require('sequelize');

// 创建Sequelize实例
const sequelize = new Sequelize('mydb', 'root', 'password', {
  host: 'localhost',
  dialect: 'mysql',
  logging: console.log, // 生产环境中应设为false
  pool: {
    max: 10,
    min: 0,
    acquire: 30000,
    idle: 10000
  }
});

// 定义模型
const User = sequelize.define('User', {
  id: {
    type: DataTypes.INTEGER,
    primaryKey: true,
    autoIncrement: true
  },
  username: {
    type: DataTypes.STRING(50),
    allowNull: false,
    unique: true
  },
  email: {
    type: DataTypes.STRING(100),
    allowNull: false,
    unique: true,
    validate: {
      isEmail: true
    }
  },
  age: {
    type: DataTypes.INTEGER,
    validate: {
      min: 0,
      max: 150
    }
  },
  isActive: {
    type: DataTypes.BOOLEAN,
    defaultValue: true
  }
}, {
  tableName: 'users',
  timestamps: true, // 自动添加createdAt和updatedAt
  underscored: true // 使用下划线命名
});

// 同步模型到数据库
(async () => {
  await sequelize.sync({ alter: true }); // alter: true会根据模型修改表结构
})();

CRUD操作

javascript
// 创建用户
async function createUser(userData) {
  try {
    const user = await User.create(userData);
    return user.toJSON(); // 返回纯对象
  } catch (error) {
    console.error('创建用户失败:', error);
    throw error;
  }
}

// 查询用户
async function findUserById(id) {
  try {
    const user = await User.findByPk(id);
    return user ? user.toJSON() : null;
  } catch (error) {
    console.error('查询用户失败:', error);
    throw error;
  }
}

// 更新用户
async function updateUser(id, updateData) {
  try {
    const [updatedRowsCount] = await User.update(updateData, {
      where: { id: id }
    });
    return updatedRowsCount > 0;
  } catch (error) {
    console.error('更新用户失败:', error);
    throw error;
  }
}

// 删除用户
async function deleteUser(id) {
  try {
    const deletedRowsCount = await User.destroy({
      where: { id: id }
    });
    return deletedRowsCount > 0;
  } catch (error) {
    console.error('删除用户失败:', error);
    throw error;
  }
}

// 批量查询
async function findActiveUsers(limit = 10, offset = 0) {
  try {
    const users = await User.findAll({
      where: { isActive: true },
      limit: limit,
      offset: offset,
      order: [['createdAt', 'DESC']]
    });
    return users.map(user => user.toJSON());
  } catch (error) {
    console.error('查询用户失败:', error);
    throw error;
  }
}

高级查询

javascript
// 关联查询
const Order = sequelize.define('Order', {
  id: {
    type: DataTypes.INTEGER,
    primaryKey: true,
    autoIncrement: true
  },
  userId: {
    type: DataTypes.INTEGER,
    allowNull: false,
    references: {
      model: User,
      key: 'id'
    }
  },
  totalAmount: {
    type: DataTypes.DECIMAL(10, 2),
    allowNull: false
  },
  status: {
    type: DataTypes.ENUM('pending', 'confirmed', 'shipped', 'delivered', 'cancelled'),
    defaultValue: 'pending'
  }
}, {
  tableName: 'orders'
});

// 定义关联关系
User.hasMany(Order, { foreignKey: 'userId', as: 'orders' });
Order.belongsTo(User, { foreignKey: 'userId', as: 'user' });

// 关联查询示例
async function getUserWithOrders(userId) {
  try {
    const user = await User.findOne({
      where: { id: userId },
      include: [{
        model: Order,
        as: 'orders',
        attributes: ['id', 'totalAmount', 'status', 'createdAt'],
        where: { status: 'delivered' },
        required: false // LEFT JOIN
      }]
    });
    
    return user ? user.toJSON() : null;
  } catch (error) {
    console.error('查询用户订单失败:', error);
    throw error;
  }
}

// 聚合查询
async function getUserStatistics() {
  try {
    const stats = await User.findAll({
      attributes: [
        'isActive',
        [sequelize.fn('COUNT', sequelize.col('id')), 'count'],
        [sequelize.fn('AVG', sequelize.col('age')), 'avgAge']
      ],
      group: ['isActive'],
      raw: true // 返回纯对象而不是Sequelize实例
    });
    return stats;
  } catch (error) {
    console.error('统计查询失败:', error);
    throw error;
  }
}

Prisma ORM

Prisma是现代化的数据库工具包。

bash
npm install prisma @prisma/client
npx prisma init

Prisma Schema

prisma
// prisma/schema.prisma
generator client {
  provider = "prisma-client-js"
}

datasource db {
  provider = "mysql"
  url      = env("DATABASE_URL")
}

model User {
  id        Int      @id @default(autoincrement())
  username  String   @unique
  email     String   @unique
  age       Int?
  isActive  Boolean  @default(true)
  createdAt DateTime @default(now())
  updatedAt DateTime @updatedAt
  
  orders    Order[]
}

model Order {
  id          Int      @id @default(autoincrement())
  userId      Int
  totalAmount Float
  status      String   @default("pending")
  createdAt   DateTime @default(now())
  
  user        User     @relation(fields: [userId], references: [id])
}

Prisma使用

javascript
const { PrismaClient } = require('@prisma/client');
const prisma = new PrismaClient();

// 基本查询
async function getUserWithOrders(userId) {
  const user = await prisma.user.findUnique({
    where: { id: userId },
    include: {
      orders: {
        where: { status: 'delivered' },
        orderBy: { createdAt: 'desc' }
      }
    }
  });
  
  return user;
}

// 复杂查询
async function getTopUsersByOrderCount(limit = 10) {
  const topUsers = await prisma.user.findMany({
    take: limit,
    include: {
      _count: {
        select: { orders: true }
      }
    },
    orderBy: {
      orders: {
        _count: 'desc'
      }
    }
  });
  
  return topUsers;
}

// 事务处理
async function transferMoney(fromUserId, toUserId, amount) {
  const result = await prisma.$transaction(async (tx) => {
    // 检查余额
    const fromAccount = await tx.account.findUnique({
      where: { userId: fromUserId }
    });
    
    if (fromAccount.balance < amount) {
      throw new Error('余额不足');
    }
    
    // 执行转账
    await tx.account.update({
      where: { userId: fromUserId },
      data: { balance: { decrement: amount } }
    });
    
    await tx.account.update({
      where: { userId: toUserId },
      data: { balance: { increment: amount } }
    });
    
    // 记录交易
    await tx.transaction.create({
      data: {
        fromUserId,
        toUserId,
        amount,
        createdAt: new Date()
      }
    });
    
    return { success: true };
  });
  
  return result;
}

性能优化

查询优化

javascript
// 使用索引优化查询
// 在数据库中创建复合索引
// CREATE INDEX idx_users_status_created ON users(status, created_at);

class UserService {
  constructor(pool) {
    this.pool = pool;
  }

  // 分页查询优化
  async getUsersPaginated(page = 1, limit = 10, status = null) {
    const offset = (page - 1) * limit;
    let query = 'SELECT id, username, email, created_at FROM users';
    const params = [];

    if (status) {
      query += ' WHERE status = ?';
      params.push(status);
    }

    query += ' ORDER BY created_at DESC LIMIT ? OFFSET ?';
    params.push(limit, offset);

    const [rows] = await this.pool.execute(query, params);
    return rows;
  }

  // 覆盖索引查询
  async getUserIdsByEmailDomain(domain) {
    // 假设有一个索引覆盖了email和id字段
    const [rows] = await this.pool.execute(
      'SELECT id FROM users WHERE email LIKE ?',
      [`%@${domain}`]
    );
    return rows.map(row => row.id);
  }

  // 批量操作优化
  async bulkUpdateUserStatus(userIds, newStatus) {
    if (!userIds || userIds.length === 0) return 0;

    const placeholders = userIds.map(() => '?').join(',');
    const query = `UPDATE users SET status = ?, updated_at = NOW() WHERE id IN (${placeholders})`;
    const params = [newStatus, ...userIds];

    const [result] = await this.pool.execute(query, params);
    return result.affectedRows;
  }
}

连接池优化

javascript
const mysql = require('mysql2/promise');

class DatabaseManager {
  constructor(config) {
    this.pool = mysql.createPool({
      host: config.host,
      user: config.user,
      password: config.password,
      database: config.database,
      charset: 'utf8mb4',
      
      // 连接池配置
      connectionLimit: config.connectionLimit || 20,
      queueLimit: config.queueLimit || 0,
      acquireTimeout: config.acquireTimeout || 60000,
      timeout: config.timeout || 60000,
      
      // 连接健康检查
      reconnect: true,
      insecureAuth: false,
      
      // 性能优化
      enableKeepAlive: true,
      keepAliveInitialDelay: 0,
    });
    
    // 监听连接池事件
    this.pool.on('connection', (connection) => {
      console.log(`新连接建立: ${connection.threadId}`);
    });
    
    this.pool.on('acquire', (connection) => {
      console.log(`连接被获取: ${connection.threadId}`);
    });
    
    this.pool.on('release', (connection) => {
      console.log(`连接被释放: ${connection.threadId}`);
    });
  }

  async query(sql, params) {
    try {
      const [rows] = await this.pool.execute(sql, params);
      return rows;
    } catch (error) {
      console.error('数据库查询错误:', error);
      throw error;
    }
  }

  async transaction(callback) {
    const connection = await this.pool.getConnection();
    
    try {
      await connection.beginTransaction();
      const result = await callback(connection);
      await connection.commit();
      return result;
    } catch (error) {
      await connection.rollback();
      throw error;
    } finally {
      connection.release();
    }
  }

  async close() {
    await this.pool.end();
  }
}

错误处理和重试机制

javascript
class RobustDatabaseService {
  constructor(pool) {
    this.pool = pool;
  }

  async executeWithRetry(query, params, maxRetries = 3, delay = 1000) {
    let lastError;
    
    for (let attempt = 1; attempt <= maxRetries; attempt++) {
      try {
        const [rows] = await this.pool.execute(query, params);
        return rows;
      } catch (error) {
        lastError = error;
        
        // 只对特定错误进行重试
        if (attempt < maxRetries && this.isRetryableError(error)) {
          console.warn(`查询失败,第${attempt}次重试:`, error.message);
          await this.sleep(delay * Math.pow(2, attempt - 1)); // 指数退避
        } else {
          break;
        }
      }
    }
    
    throw lastError;
  }

  isRetryableError(error) {
    // 可重试的错误类型
    const retryableCodes = [
      'PROTOCOL_CONNECTION_LOST',
      'ECONNRESET',
      'ETIMEDOUT',
      'EPIPE',
      'ENOTFOUND',
      'EPERM',
      'EHOSTUNREACH'
    ];
    
    return retryableCodes.includes(error.code) || 
           error.errno === 1213 || // Deadlock found
           error.errno === 1205;   // Lock wait timeout
  }

  sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
  }

  async executeInTransaction(operation, maxRetries = 3) {
    for (let attempt = 1; attempt <= maxRetries; attempt++) {
      const connection = await this.pool.getConnection();
      
      try {
        await connection.beginTransaction();
        const result = await operation(connection);
        await connection.commit();
        return result;
      } catch (error) {
        await connection.rollback();
        
        if (attempt < maxRetries && error.errno === 1213) { // Deadlock
          console.warn(`死锁检测到,第${attempt}次重试`);
          await this.sleep(100 * attempt);
          continue;
        }
        
        throw error;
      } finally {
        connection.release();
      }
    }
  }
}

安全最佳实践

SQL注入防护

javascript
// ❌ 错误:容易受到SQL注入攻击
const badQuery = `SELECT * FROM users WHERE id = ${userId}`;

// ✅ 正确:使用参数化查询
const goodQuery = 'SELECT * FROM users WHERE id = ?';
const [rows] = await connection.execute(goodQuery, [userId]);

// 在ORM中同样使用参数化查询
const user = await User.findOne({
  where: {
    email: userInput, // ORM会自动处理转义
  }
});

连接安全

javascript
const mysql = require('mysql2/promise');

const securePool = mysql.createPool({
  host: 'localhost',
  user: 'app_user',
  password: process.env.DB_PASSWORD, // 从环境变量获取
  database: 'mydb',
  ssl: {
    rejectUnauthorized: true // 验证SSL证书
  },
  charset: 'utf8mb4',
  timezone: '+08:00', // 设置时区
  compress: true, // 启用压缩
});

监控和日志

javascript
const winston = require('winston');

class MonitoredDatabaseService {
  constructor(pool) {
    this.pool = pool;
    this.logger = winston.createLogger({
      level: 'info',
      format: winston.format.combine(
        winston.format.timestamp(),
        winston.format.json()
      ),
      transports: [
        new winston.transports.File({ filename: 'database.log' }),
        new winston.transports.Console()
      ]
    });
  }

  async queryWithLogging(sql, params) {
    const startTime = Date.now();
    const queryId = Math.random().toString(36).substr(2, 9);
    
    this.logger.info('Query started', { 
      queryId, 
      sql: this.maskSensitiveData(sql), 
      params: this.maskSensitiveParams(params) 
    });

    try {
      const [rows] = await this.pool.execute(sql, params);
      const duration = Date.now() - startTime;
      
      this.logger.info('Query completed', { 
        queryId, 
        duration: `${duration}ms`,
        rowCount: Array.isArray(rows) ? rows.length : 0
      });
      
      return rows;
    } catch (error) {
      const duration = Date.now() - startTime;
      
      this.logger.error('Query failed', { 
        queryId, 
        duration: `${duration}ms`,
        error: error.message,
        sql: this.maskSensitiveData(sql)
      });
      
      throw error;
    }
  }

  maskSensitiveData(sql) {
    // 简单的SQL掩码,实际使用时可能需要更复杂的逻辑
    return sql.replace(/password\s*=\s*'[^']*'/gi, "password='***'");
  }

  maskSensitiveParams(params) {
    if (!Array.isArray(params)) return params;
    return params.map(param => 
      typeof param === 'string' && param.toLowerCase().includes('password') 
        ? '***' 
        : param
    );
  }
}

实际应用示例

用户服务类

javascript
class UserService {
  constructor(databaseManager) {
    this.db = databaseManager;
  }

  async createUser(userData) {
    const { username, email, password, age } = userData;
    
    return await this.db.transaction(async (connection) => {
      // 检查用户是否已存在
      const [existingUsers] = await connection.execute(
        'SELECT id FROM users WHERE username = ? OR email = ?',
        [username, email]
      );
      
      if (existingUsers.length > 0) {
        throw new Error('用户名或邮箱已存在');
      }
      
      // 插入新用户
      const [result] = await connection.execute(
        'INSERT INTO users (username, email, password_hash, age) VALUES (?, ?, ?, ?)',
        [username, email, this.hashPassword(password), age]
      );
      
      // 创建用户资料
      await connection.execute(
        'INSERT INTO user_profiles (user_id, created_at) VALUES (?, NOW())',
        [result.insertId]
      );
      
      return result.insertId;
    });
  }

  async getUserProfile(userId) {
    const [rows] = await this.db.query(`
      SELECT 
        u.id,
        u.username,
        u.email,
        u.age,
        up.bio,
        up.avatar_url,
        COUNT(o.id) as order_count,
        COALESCE(SUM(o.total_amount), 0) as total_spent
      FROM users u
      LEFT JOIN user_profiles up ON u.id = up.user_id
      LEFT JOIN orders o ON u.id = o.user_id
      WHERE u.id = ?
      GROUP BY u.id, up.id
    `, [userId]);
    
    return rows[0] || null;
  }

  hashPassword(password) {
    // 实际应用中使用bcrypt或其他安全的哈希算法
    return require('crypto').createHash('sha256').update(password).digest('hex');
  }
}

总结

MySQL与Node.js集成的最佳实践包括:

  1. 连接管理:使用连接池管理数据库连接
  2. 错误处理:实现适当的错误处理和重试机制
  3. 安全防护:防范SQL注入和其他安全威胁
  4. 性能优化:使用索引、优化查询和批处理操作
  5. 监控日志:记录查询性能和错误信息
  6. 事务管理:正确处理数据库事务确保数据一致性

通过遵循这些最佳实践,可以构建高效、安全、可维护的Node.js应用与MySQL数据库的集成方案。