Appearance
NestJS 微服务 (Microservices)
NestJS 微服务提供了一种构建分布式系统的强大方式,支持多种传输策略,如 TCP、NATS、MQTT、Redis、RabbitMQ、Kafka 等。微服务架构允许将应用程序拆分为多个小型、独立的服务,每个服务都可以独立开发、部署和扩展。
基础概念
NestJS 微服务架构包含:
- 微服务应用 - 运行在特定传输协议上的服务
- 客户端 - 调用微服务的客户端
- 消息模式 - 请求-响应和事件驱动模式
- 传输层 - 处理服务间通信的协议
创建微服务应用
基本微服务应用
typescript
// main.ts
import { NestFactory } from '@nestjs/core';
import { Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.TCP,
options: {
host: 'localhost',
port: 8877,
},
});
app.listen(() => console.log('Microservice is listening'));
}
bootstrap();
控制器中的微服务模式
typescript
// math.controller.ts
import { Controller } from '@nestjs/common';
import { MessagePattern, Payload } from '@nestjs/microservices';
@Controller()
export class MathController {
@MessagePattern({ cmd: 'sum' })
sum(@Payload() data: number[]): number {
return (data || []).reduce((a, b) => a + b);
}
@MessagePattern({ cmd: 'multiply' })
multiply(@Payload() data: { a: number; b: number }): number {
return data.a * data.b;
}
}
传输策略
TCP 传输
TCP 是默认的传输策略:
typescript
// 服务端
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.TCP,
options: {
host: '127.0.0.1',
port: 8877,
},
});
// 客户端
const client = ClientProxyFactory.create({
transport: Transport.TCP,
options: {
host: '127.0.0.1',
port: 8877,
},
});
Redis 传输
typescript
// 服务端
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.REDIS,
options: {
url: 'redis://localhost:6379',
},
});
// 客户端
const client = ClientProxyFactory.create({
transport: Transport.REDIS,
options: {
url: 'redis://localhost:6379',
},
});
NATS 传输
typescript
// 服务端
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.NATS,
options: {
servers: ['nats://localhost:4222'],
},
});
// 客户端
const client = ClientProxyFactory.create({
transport: Transport.NATS,
options: {
servers: ['nats://localhost:4222'],
},
});
MQTT 传输
typescript
// 服务端
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.MQTT,
options: {
url: 'mqtt://localhost:1883',
},
});
// 客户端
const client = ClientProxyFactory.create({
transport: Transport.MQTT,
options: {
url: 'mqtt://localhost:1883',
},
});
客户端通信
注入客户端
typescript
// client.service.ts
import { Injectable } from '@nestjs/common';
import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices';
@Injectable()
export class ClientService {
private client: ClientProxy;
constructor() {
this.client = ClientProxyFactory.create({
transport: Transport.TCP,
options: {
host: 'localhost',
port: 8877,
},
});
}
async callSum(data: number[]): Promise<number> {
return this.client.send<number>({ cmd: 'sum' }, data).toPromise();
}
async emitEvent(data: any): Promise<void> {
this.client.emit('user_created', data);
}
}
使用装饰器注入客户端
typescript
// app.service.ts
import { Injectable } from '@nestjs/common';
import { Client, ClientProxy, Transport } from '@nestjs/microservices';
@Injectable()
export class AppService {
@Client({
transport: Transport.TCP,
options: {
host: 'localhost',
port: 8877,
},
})
client: ClientProxy;
async getHello(): Promise<string> {
return this.client.send<string>({ cmd: 'hello' }, {}).toPromise();
}
}
消息模式
请求-响应模式
typescript
// 微服务控制器
@Controller()
export class UserService {
@MessagePattern({ cmd: 'get_user' })
getUser(@Payload() userId: number) {
// 模拟从数据库获取用户
return {
id: userId,
name: 'John Doe',
email: 'john@example.com',
};
}
@MessagePattern({ cmd: 'create_user' })
createUser(@Payload() userData: any) {
// 模拟创建用户
return {
id: Date.now(),
...userData,
createdAt: new Date(),
};
}
}
// 客户端使用
@Injectable()
export class ClientService {
@Client({
transport: Transport.TCP,
options: {
host: 'localhost',
port: 8877,
},
})
client: ClientProxy;
async getUser(id: number) {
return this.client.send({ cmd: 'get_user' }, id).toPromise();
}
async createUser(userData: any) {
return this.client.send({ cmd: 'create_user' }, userData).toPromise();
}
}
事件驱动模式
typescript
// 事件发布者
@Injectable()
export class EventPublisherService {
@Client({
transport: Transport.TCP,
options: {
host: 'localhost',
port: 8877,
},
})
client: ClientProxy;
publishUserCreated(user: any) {
this.client.emit('user_created', user);
}
publishUserUpdated(user: any) {
this.client.emit('user_updated', user);
}
}
// 事件订阅者
@Controller()
export class EventSubscriberService {
@EventPattern('user_created')
handleUserCreated(@Payload() data: any) {
console.log('User created:', data);
// 处理用户创建事件
}
@EventPattern('user_updated')
handleUserUpdated(@Payload() data: any) {
console.log('User updated:', data);
// 处理用户更新事件
}
}
微服务客户端
ClientProxyFactory
typescript
import { Injectable, OnModuleInit } from '@nestjs/common';
import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices';
@Injectable()
export class MathService implements OnModuleInit {
private client: ClientProxy;
constructor() {
this.client = ClientProxyFactory.create({
transport: Transport.TCP,
options: {
host: 'localhost',
port: 8877,
},
});
}
async onModuleInit() {
await this.client.connect();
}
async accumulate(data: number[]): Promise<number> {
return this.client.send<number>({ cmd: 'sum' }, data).toPromise();
}
emitData(data: any) {
this.client.emit('data', data);
}
}
连接生命周期
typescript
import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
@Injectable()
export class MicroserviceClient implements OnModuleInit, OnModuleDestroy {
constructor(@Inject('ClientService') private readonly client: ClientProxy) {}
async onModuleInit() {
await this.client.connect();
}
async onModuleDestroy() {
await this.client.close();
}
async call(pattern: any, data: any) {
return this.client.send(pattern, data).toPromise();
}
}
错误处理
微服务错误处理
typescript
@Controller()
export class ErrorHandlingController {
@MessagePattern({ cmd: 'divide' })
async divide(@Payload() data: { a: number; b: number }) {
if (data.b === 0) {
throw new BadRequestException('Division by zero');
}
return data.a / data.b;
}
@MessagePattern({ cmd: 'process_data' })
async processData(@Payload() data: any) {
try {
// 处理数据
return await this.processDataInternal(data);
} catch (error) {
// 记录错误并重新抛出
console.error('Error processing data:', error);
throw new InternalServerErrorException('Processing failed');
}
}
private async processDataInternal(data: any) {
// 实际处理逻辑
return data;
}
}
客户端错误处理
typescript
@Injectable()
export class SafeClientService {
@Client({
transport: Transport.TCP,
options: {
host: 'localhost',
port: 8877,
},
})
client: ClientProxy;
async safeCall(pattern: any, data: any) {
try {
const result = await this.client.send(pattern, data).toPromise();
return result;
} catch (error) {
console.error('Microservice call failed:', error);
throw new ServiceUnavailableException('Service temporarily unavailable');
}
}
}
超时和重试
配置超时
typescript
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.TCP,
options: {
host: 'localhost',
port: 8877,
timeout: 5000, // 5秒超时
},
});
客户端重试策略
typescript
@Injectable()
export class RetryClientService {
@Client({
transport: Transport.TCP,
options: {
host: 'localhost',
port: 8877,
retryAttempts: 3, // 重试次数
retryDelay: 1000, // 重试延迟(毫秒)
},
})
client: ClientProxy;
async callWithRetry(pattern: any, data: any) {
return this.client.send(pattern, data).toPromise();
}
}
微服务模块
微服务模块配置
typescript
// math.module.ts
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { MathController } from './math.controller';
import { ClientService } from './client.service';
@Module({
imports: [
ClientsModule.register([
{
name: 'MATH_SERVICE',
transport: Transport.TCP,
options: {
host: 'localhost',
port: 8877,
},
},
]),
],
controllers: [MathController],
providers: [ClientService],
})
export class MathModule {}
使用命名客户端
typescript
// client.service.ts
import { Injectable } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { Inject } from '@nestjs/common';
@Injectable()
export class ClientService {
constructor(@Inject('MATH_SERVICE') private readonly client: ClientProxy) {}
async calculateSum(data: number[]): Promise<number> {
return this.client.send({ cmd: 'sum' }, data).toPromise();
}
}
消息序列化
自定义序列化器
typescript
import { Serializer } from '@nestjs/microservices';
class CustomSerializer implements Serializer {
serialize(value: any): any {
return JSON.stringify(value);
}
}
// 使用自定义序列化器
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.TCP,
options: {
host: 'localhost',
port: 8877,
},
serializer: new CustomSerializer(),
});
负载均衡和容错
多个微服务实例
typescript
// 客户端配置多个服务实例
const client = ClientProxyFactory.create({
transport: Transport.NATS,
options: {
servers: [
'nats://server1:4222',
'nats://server2:4222',
'nats://server3:4222',
],
},
});
微服务最佳实践
1. 服务发现
typescript
// 使用 Consul 进行服务发现
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.NATS,
options: {
servers: await discoverServices('my-service'),
},
});
async function discoverServices(serviceName: string): Promise<string[]> {
// 实现服务发现逻辑
return [`nats://${serviceName}:4222`];
}
2. 配置管理
typescript
// config/microservice.config.ts
import { Transport } from '@nestjs/microservices';
export const microserviceConfig = {
transport: Transport.TCP,
options: {
host: process.env.MICROSERVICE_HOST || 'localhost',
port: parseInt(process.env.MICROSERVICE_PORT, 10) || 8877,
timeout: parseInt(process.env.MICROSERVICE_TIMEOUT, 10) || 5000,
},
};
// 使用配置
const app = await NestFactory.createMicroservice(AppModule, microserviceConfig);
3. 健康检查
typescript
@Controller('health')
export class HealthController {
@Get()
getHealth() {
return { status: 'ok', timestamp: new Date().toISOString() };
}
}
// 微服务健康检查
@Controller()
export class MicroserviceHealthController {
@EventPattern('health_check')
handleHealthCheck() {
return { status: 'healthy', timestamp: new Date().toISOString() };
}
}
4. 监控和日志
typescript
@Injectable()
export class LoggingInterceptor implements NestInterceptor {
intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
const start = Date.now();
return next.handle().pipe(
tap(() => {
const end = Date.now();
console.log(`Request took ${end - start}ms`);
}),
);
}
}
// 在微服务中使用
@Controller()
export class MonitoredController {
@MessagePattern({ cmd: 'process' })
@UseInterceptors(LoggingInterceptor)
process(@Payload() data: any) {
return { processed: true, data };
}
}
微服务测试
微服务单元测试
typescript
// math.service.spec.ts
import { Test } from '@nestjs/testing';
import { MathController } from './math.controller';
describe('MathController', () => {
let mathController: MathController;
beforeEach(async () => {
const moduleRef = await Test.createTestingModule({
controllers: [MathController],
}).compile();
mathController = moduleRef.get<MathController>(MathController);
});
describe('sum', () => {
it('should sum an array of numbers', () => {
const result = mathController.sum([1, 2, 3, 4, 5]);
expect(result).toBe(15);
});
});
});
微服务集成测试
typescript
// microservice.e2e-spec.ts
import { Test } from '@nestjs/testing';
import { INestMicroservice } from '@nestjs/common';
import { Transport } from '@nestjs/microservices';
import { AppModule } from '../src/app.module';
describe('Microservice', () => {
let app: INestMicroservice;
beforeAll(async () => {
app = await Test.createTestingModule({
imports: [AppModule],
})
.compile()
.then(module => module.createNestMicroservice({
transport: Transport.TCP,
options: {
host: 'localhost',
port: 8877,
},
}));
await app.listen();
});
afterAll(async () => {
await app.close();
});
it('should handle sum command', async () => {
const result = await app
.getClientByPattern({ cmd: 'sum' })
.send({ cmd: 'sum' }, [1, 2, 3])
.toPromise();
expect(result).toBe(6);
});
});
安全性
认证和授权
typescript
// 认证守卫
@Injectable()
export class AuthGuard implements CanActivate {
canActivate(context: ExecutionContext): boolean {
const pattern = context.switchToRpc().getData();
// 实现认证逻辑
return true;
}
}
// 在微服务中使用
@Controller()
@UseGuards(AuthGuard)
export class SecureController {
@MessagePattern({ cmd: 'secure_data' })
getSecureData(@Payload() user: any) {
return { data: 'secure', user: user.id };
}
}
性能优化
消息批处理
typescript
@Controller()
export class BatchController {
@MessagePattern({ cmd: 'batch_process' })
async batchProcess(@Payload() payloads: any[]) {
// 批量处理消息
const results = await Promise.all(
payloads.map(payload => this.processIndividual(payload)),
);
return results;
}
private async processIndividual(payload: any) {
// 处理单个负载
return { processed: true, payload };
}
}
连接池
typescript
@Injectable()
export class PooledClientService {
private clients: ClientProxy[] = [];
private currentIndex = 0;
constructor() {
// 创建多个客户端连接
for (let i = 0; i < 5; i++) {
const client = ClientProxyFactory.create({
transport: Transport.TCP,
options: {
host: 'localhost',
port: 8877 + i, // 不同端口
},
});
this.clients.push(client);
}
}
async call(pattern: any, data: any) {
const client = this.clients[this.currentIndex % this.clients.length];
this.currentIndex++;
return client.send(pattern, data).toPromise();
}
}
总结
NestJS 微服务提供了构建分布式系统的强大工具,支持多种传输协议和通信模式。通过合理的架构设计和最佳实践,可以构建可扩展、可维护的微服务应用程序。
微服务的主要特点:
- 支持多种传输策略
- 提供请求-响应和事件驱动模式
- 支持多种消息队列系统
- 包含错误处理和重试机制
- 支持负载均衡和容错
- 提供测试工具
- 支持配置管理
- 包含监控和日志功能