Skip to content
On this page

NestJS 微服务 (Microservices)

NestJS 微服务提供了一种构建分布式系统的强大方式,支持多种传输策略,如 TCP、NATS、MQTT、Redis、RabbitMQ、Kafka 等。微服务架构允许将应用程序拆分为多个小型、独立的服务,每个服务都可以独立开发、部署和扩展。

基础概念

NestJS 微服务架构包含:

  1. 微服务应用 - 运行在特定传输协议上的服务
  2. 客户端 - 调用微服务的客户端
  3. 消息模式 - 请求-响应和事件驱动模式
  4. 传输层 - 处理服务间通信的协议

创建微服务应用

基本微服务应用

typescript
// main.ts
import { NestFactory } from '@nestjs/core';
import { Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.createMicroservice(AppModule, {
    transport: Transport.TCP,
    options: {
      host: 'localhost',
      port: 8877,
    },
  });
  app.listen(() => console.log('Microservice is listening'));
}
bootstrap();

控制器中的微服务模式

typescript
// math.controller.ts
import { Controller } from '@nestjs/common';
import { MessagePattern, Payload } from '@nestjs/microservices';

@Controller()
export class MathController {
  @MessagePattern({ cmd: 'sum' })
  sum(@Payload() data: number[]): number {
    return (data || []).reduce((a, b) => a + b);
  }

  @MessagePattern({ cmd: 'multiply' })
  multiply(@Payload() data: { a: number; b: number }): number {
    return data.a * data.b;
  }
}

传输策略

TCP 传输

TCP 是默认的传输策略:

typescript
// 服务端
const app = await NestFactory.createMicroservice(AppModule, {
  transport: Transport.TCP,
  options: {
    host: '127.0.0.1',
    port: 8877,
  },
});

// 客户端
const client = ClientProxyFactory.create({
  transport: Transport.TCP,
  options: {
    host: '127.0.0.1',
    port: 8877,
  },
});

Redis 传输

typescript
// 服务端
const app = await NestFactory.createMicroservice(AppModule, {
  transport: Transport.REDIS,
  options: {
    url: 'redis://localhost:6379',
  },
});

// 客户端
const client = ClientProxyFactory.create({
  transport: Transport.REDIS,
  options: {
    url: 'redis://localhost:6379',
  },
});

NATS 传输

typescript
// 服务端
const app = await NestFactory.createMicroservice(AppModule, {
  transport: Transport.NATS,
  options: {
    servers: ['nats://localhost:4222'],
  },
});

// 客户端
const client = ClientProxyFactory.create({
  transport: Transport.NATS,
  options: {
    servers: ['nats://localhost:4222'],
  },
});

MQTT 传输

typescript
// 服务端
const app = await NestFactory.createMicroservice(AppModule, {
  transport: Transport.MQTT,
  options: {
    url: 'mqtt://localhost:1883',
  },
});

// 客户端
const client = ClientProxyFactory.create({
  transport: Transport.MQTT,
  options: {
    url: 'mqtt://localhost:1883',
  },
});

客户端通信

注入客户端

typescript
// client.service.ts
import { Injectable } from '@nestjs/common';
import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices';

@Injectable()
export class ClientService {
  private client: ClientProxy;

  constructor() {
    this.client = ClientProxyFactory.create({
      transport: Transport.TCP,
      options: {
        host: 'localhost',
        port: 8877,
      },
    });
  }

  async callSum(data: number[]): Promise<number> {
    return this.client.send<number>({ cmd: 'sum' }, data).toPromise();
  }

  async emitEvent(data: any): Promise<void> {
    this.client.emit('user_created', data);
  }
}

使用装饰器注入客户端

typescript
// app.service.ts
import { Injectable } from '@nestjs/common';
import { Client, ClientProxy, Transport } from '@nestjs/microservices';

@Injectable()
export class AppService {
  @Client({
    transport: Transport.TCP,
    options: {
      host: 'localhost',
      port: 8877,
    },
  })
  client: ClientProxy;

  async getHello(): Promise<string> {
    return this.client.send<string>({ cmd: 'hello' }, {}).toPromise();
  }
}

消息模式

请求-响应模式

typescript
// 微服务控制器
@Controller()
export class UserService {
  @MessagePattern({ cmd: 'get_user' })
  getUser(@Payload() userId: number) {
    // 模拟从数据库获取用户
    return {
      id: userId,
      name: 'John Doe',
      email: 'john@example.com',
    };
  }

  @MessagePattern({ cmd: 'create_user' })
  createUser(@Payload() userData: any) {
    // 模拟创建用户
    return {
      id: Date.now(),
      ...userData,
      createdAt: new Date(),
    };
  }
}

// 客户端使用
@Injectable()
export class ClientService {
  @Client({
    transport: Transport.TCP,
    options: {
      host: 'localhost',
      port: 8877,
    },
  })
  client: ClientProxy;

  async getUser(id: number) {
    return this.client.send({ cmd: 'get_user' }, id).toPromise();
  }

  async createUser(userData: any) {
    return this.client.send({ cmd: 'create_user' }, userData).toPromise();
  }
}

事件驱动模式

typescript
// 事件发布者
@Injectable()
export class EventPublisherService {
  @Client({
    transport: Transport.TCP,
    options: {
      host: 'localhost',
      port: 8877,
    },
  })
  client: ClientProxy;

  publishUserCreated(user: any) {
    this.client.emit('user_created', user);
  }

  publishUserUpdated(user: any) {
    this.client.emit('user_updated', user);
  }
}

// 事件订阅者
@Controller()
export class EventSubscriberService {
  @EventPattern('user_created')
  handleUserCreated(@Payload() data: any) {
    console.log('User created:', data);
    // 处理用户创建事件
  }

  @EventPattern('user_updated')
  handleUserUpdated(@Payload() data: any) {
    console.log('User updated:', data);
    // 处理用户更新事件
  }
}

微服务客户端

ClientProxyFactory

typescript
import { Injectable, OnModuleInit } from '@nestjs/common';
import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices';

@Injectable()
export class MathService implements OnModuleInit {
  private client: ClientProxy;

  constructor() {
    this.client = ClientProxyFactory.create({
      transport: Transport.TCP,
      options: {
        host: 'localhost',
        port: 8877,
      },
    });
  }

  async onModuleInit() {
    await this.client.connect();
  }

  async accumulate(data: number[]): Promise<number> {
    return this.client.send<number>({ cmd: 'sum' }, data).toPromise();
  }

  emitData(data: any) {
    this.client.emit('data', data);
  }
}

连接生命周期

typescript
import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';

@Injectable()
export class MicroserviceClient implements OnModuleInit, OnModuleDestroy {
  constructor(@Inject('ClientService') private readonly client: ClientProxy) {}

  async onModuleInit() {
    await this.client.connect();
  }

  async onModuleDestroy() {
    await this.client.close();
  }

  async call(pattern: any, data: any) {
    return this.client.send(pattern, data).toPromise();
  }
}

错误处理

微服务错误处理

typescript
@Controller()
export class ErrorHandlingController {
  @MessagePattern({ cmd: 'divide' })
  async divide(@Payload() data: { a: number; b: number }) {
    if (data.b === 0) {
      throw new BadRequestException('Division by zero');
    }
    return data.a / data.b;
  }

  @MessagePattern({ cmd: 'process_data' })
  async processData(@Payload() data: any) {
    try {
      // 处理数据
      return await this.processDataInternal(data);
    } catch (error) {
      // 记录错误并重新抛出
      console.error('Error processing data:', error);
      throw new InternalServerErrorException('Processing failed');
    }
  }

  private async processDataInternal(data: any) {
    // 实际处理逻辑
    return data;
  }
}

客户端错误处理

typescript
@Injectable()
export class SafeClientService {
  @Client({
    transport: Transport.TCP,
    options: {
      host: 'localhost',
      port: 8877,
    },
  })
  client: ClientProxy;

  async safeCall(pattern: any, data: any) {
    try {
      const result = await this.client.send(pattern, data).toPromise();
      return result;
    } catch (error) {
      console.error('Microservice call failed:', error);
      throw new ServiceUnavailableException('Service temporarily unavailable');
    }
  }
}

超时和重试

配置超时

typescript
const app = await NestFactory.createMicroservice(AppModule, {
  transport: Transport.TCP,
  options: {
    host: 'localhost',
    port: 8877,
    timeout: 5000, // 5秒超时
  },
});

客户端重试策略

typescript
@Injectable()
export class RetryClientService {
  @Client({
    transport: Transport.TCP,
    options: {
      host: 'localhost',
      port: 8877,
      retryAttempts: 3, // 重试次数
      retryDelay: 1000, // 重试延迟(毫秒)
    },
  })
  client: ClientProxy;

  async callWithRetry(pattern: any, data: any) {
    return this.client.send(pattern, data).toPromise();
  }
}

微服务模块

微服务模块配置

typescript
// math.module.ts
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { MathController } from './math.controller';
import { ClientService } from './client.service';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'MATH_SERVICE',
        transport: Transport.TCP,
        options: {
          host: 'localhost',
          port: 8877,
        },
      },
    ]),
  ],
  controllers: [MathController],
  providers: [ClientService],
})
export class MathModule {}

使用命名客户端

typescript
// client.service.ts
import { Injectable } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { Inject } from '@nestjs/common';

@Injectable()
export class ClientService {
  constructor(@Inject('MATH_SERVICE') private readonly client: ClientProxy) {}

  async calculateSum(data: number[]): Promise<number> {
    return this.client.send({ cmd: 'sum' }, data).toPromise();
  }
}

消息序列化

自定义序列化器

typescript
import { Serializer } from '@nestjs/microservices';

class CustomSerializer implements Serializer {
  serialize(value: any): any {
    return JSON.stringify(value);
  }
}

// 使用自定义序列化器
const app = await NestFactory.createMicroservice(AppModule, {
  transport: Transport.TCP,
  options: {
    host: 'localhost',
    port: 8877,
  },
  serializer: new CustomSerializer(),
});

负载均衡和容错

多个微服务实例

typescript
// 客户端配置多个服务实例
const client = ClientProxyFactory.create({
  transport: Transport.NATS,
  options: {
    servers: [
      'nats://server1:4222',
      'nats://server2:4222',
      'nats://server3:4222',
    ],
  },
});

微服务最佳实践

1. 服务发现

typescript
// 使用 Consul 进行服务发现
const app = await NestFactory.createMicroservice(AppModule, {
  transport: Transport.NATS,
  options: {
    servers: await discoverServices('my-service'),
  },
});

async function discoverServices(serviceName: string): Promise<string[]> {
  // 实现服务发现逻辑
  return [`nats://${serviceName}:4222`];
}

2. 配置管理

typescript
// config/microservice.config.ts
import { Transport } from '@nestjs/microservices';

export const microserviceConfig = {
  transport: Transport.TCP,
  options: {
    host: process.env.MICROSERVICE_HOST || 'localhost',
    port: parseInt(process.env.MICROSERVICE_PORT, 10) || 8877,
    timeout: parseInt(process.env.MICROSERVICE_TIMEOUT, 10) || 5000,
  },
};

// 使用配置
const app = await NestFactory.createMicroservice(AppModule, microserviceConfig);

3. 健康检查

typescript
@Controller('health')
export class HealthController {
  @Get()
  getHealth() {
    return { status: 'ok', timestamp: new Date().toISOString() };
  }
}

// 微服务健康检查
@Controller()
export class MicroserviceHealthController {
  @EventPattern('health_check')
  handleHealthCheck() {
    return { status: 'healthy', timestamp: new Date().toISOString() };
  }
}

4. 监控和日志

typescript
@Injectable()
export class LoggingInterceptor implements NestInterceptor {
  intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
    const start = Date.now();
    return next.handle().pipe(
      tap(() => {
        const end = Date.now();
        console.log(`Request took ${end - start}ms`);
      }),
    );
  }
}

// 在微服务中使用
@Controller()
export class MonitoredController {
  @MessagePattern({ cmd: 'process' })
  @UseInterceptors(LoggingInterceptor)
  process(@Payload() data: any) {
    return { processed: true, data };
  }
}

微服务测试

微服务单元测试

typescript
// math.service.spec.ts
import { Test } from '@nestjs/testing';
import { MathController } from './math.controller';

describe('MathController', () => {
  let mathController: MathController;

  beforeEach(async () => {
    const moduleRef = await Test.createTestingModule({
      controllers: [MathController],
    }).compile();

    mathController = moduleRef.get<MathController>(MathController);
  });

  describe('sum', () => {
    it('should sum an array of numbers', () => {
      const result = mathController.sum([1, 2, 3, 4, 5]);
      expect(result).toBe(15);
    });
  });
});

微服务集成测试

typescript
// microservice.e2e-spec.ts
import { Test } from '@nestjs/testing';
import { INestMicroservice } from '@nestjs/common';
import { Transport } from '@nestjs/microservices';
import { AppModule } from '../src/app.module';

describe('Microservice', () => {
  let app: INestMicroservice;

  beforeAll(async () => {
    app = await Test.createTestingModule({
      imports: [AppModule],
    })
      .compile()
      .then(module => module.createNestMicroservice({
        transport: Transport.TCP,
        options: {
          host: 'localhost',
          port: 8877,
        },
      }));

    await app.listen();
  });

  afterAll(async () => {
    await app.close();
  });

  it('should handle sum command', async () => {
    const result = await app
      .getClientByPattern({ cmd: 'sum' })
      .send({ cmd: 'sum' }, [1, 2, 3])
      .toPromise();
    
    expect(result).toBe(6);
  });
});

安全性

认证和授权

typescript
// 认证守卫
@Injectable()
export class AuthGuard implements CanActivate {
  canActivate(context: ExecutionContext): boolean {
    const pattern = context.switchToRpc().getData();
    // 实现认证逻辑
    return true;
  }
}

// 在微服务中使用
@Controller()
@UseGuards(AuthGuard)
export class SecureController {
  @MessagePattern({ cmd: 'secure_data' })
  getSecureData(@Payload() user: any) {
    return { data: 'secure', user: user.id };
  }
}

性能优化

消息批处理

typescript
@Controller()
export class BatchController {
  @MessagePattern({ cmd: 'batch_process' })
  async batchProcess(@Payload() payloads: any[]) {
    // 批量处理消息
    const results = await Promise.all(
      payloads.map(payload => this.processIndividual(payload)),
    );
    return results;
  }

  private async processIndividual(payload: any) {
    // 处理单个负载
    return { processed: true, payload };
  }
}

连接池

typescript
@Injectable()
export class PooledClientService {
  private clients: ClientProxy[] = [];
  private currentIndex = 0;

  constructor() {
    // 创建多个客户端连接
    for (let i = 0; i < 5; i++) {
      const client = ClientProxyFactory.create({
        transport: Transport.TCP,
        options: {
          host: 'localhost',
          port: 8877 + i, // 不同端口
        },
      });
      this.clients.push(client);
    }
  }

  async call(pattern: any, data: any) {
    const client = this.clients[this.currentIndex % this.clients.length];
    this.currentIndex++;
    return client.send(pattern, data).toPromise();
  }
}

总结

NestJS 微服务提供了构建分布式系统的强大工具,支持多种传输协议和通信模式。通过合理的架构设计和最佳实践,可以构建可扩展、可维护的微服务应用程序。

微服务的主要特点:

  • 支持多种传输策略
  • 提供请求-响应和事件驱动模式
  • 支持多种消息队列系统
  • 包含错误处理和重试机制
  • 支持负载均衡和容错
  • 提供测试工具
  • 支持配置管理
  • 包含监控和日志功能