nestjs Queue사용하기
프레임워크 : NestJS
언어 : Typescript
저장소 : Redis
큐를 사용하는 이유는 여러가지 경우가 있겠지만,
오래 걸리거나 당장 응답이 필요하지 않은 작업들을 Queue에 넣고, 워커 노드들이 큐에서
작업을 가져가는 구조는 매우 유용하다.
동작과정
1. 클라이언트 요청 : 클라이언트가 Rest API 서버에 요청을 보낸다.
2. API 응답 및 작업 큐 삽입 : Rest API 가 클라이언트에 즉시 응답을 보내고, 작업은 Queue에 넣는다.
3. 워커 노드의 처리 : Worker Node가 Queue에서 작업을 가져와 처리를 완료한다.
이 과정에서 Rest API는 Producer(생산자),
Redis는 큐저장소,
Worker는 Consumer(작업처리자)의 역할을 한다.
*Redis 설치
(로컬로 설치해도 되지만 Redis Cloud를 이용하였음)
위와 같이 설정하면 무료로 30MB의 데이터베이스를 사용가능하다.
데이터베이스가 생성되고나면
Edit 를 클릭하고
Durability에서
Data eviction policy를 no eviction으로 변경한뒤 save database를 눌러야함
-> 변경하는 이유 ?
Bull MQ를 사용해서 Queue를 사용할건데 bull mq에서 이렇게 해주라고 적혀있음.
데이터베이스를 생성하고나면 출력되는 퍼블릭엔드포인트 사용 !
끝의 5자리가 포트이고 앞의 문자들이 호스트임
redis-~~~~~~.com:16654
사용자는 default
패키지 설치
pnpm i @nestjs/bullmq bullmq
queue를 사용하고자 하는 모듈에서 bullModule 세팅을 해준다.
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { BullModule } from '@nestjs/bullmq';
@Module({
imports: [
TypeOrmModule.forFeature([]),
/**
* 1. 작업들을 올려둘 queue를 redis로 사용할건데 redis endpoint 세팅을 해야함.
* 2. queue 등록
*/
BullModule.forRoot({
connection: {
host: 'redisd.com',
port: 16654,
username: 'default',
password: 'QNx4zvAPR',
},
}),
BullModule.registerQueue({
name: 'thumbnail-generation', // 실제 작업의 이름
}),
],
controllers: [],
providers: [],
exports: [],
})
export class CommonModule {}
컨트롤러에서 큐에 메세지를 발행하는 producer의 역할을 하는 애를 만들어주고,
import {
BadRequestException,
Controller,
Post,
UploadedFile,
UseInterceptors,
} from '@nestjs/common';
import { FileInterceptor } from '@nestjs/platform-express';
import { ApiBearerAuth } from '@nestjs/swagger';
import { CommonService } from './common.service';
import { Queue } from 'bullmq';
import { InjectQueue } from '@nestjs/bullmq';
@Controller('common')
@ApiBearerAuth()
export class CommonController {
constructor(
// 대기열 큐 주입 - BullModule.registerQueue에 지정한 이름과 동일해야함
@InjectQueue('thumbnail-generation')
private readonly thumbnailQueue: Queue,
) {}
@Post('video')
@UseInterceptors(
FileInterceptor('video', {
limits: {
fileSize: 200000000,
},
fileFilter(req, file, callback) {
console.log(file);
if (file.mimetype !== 'video/mp4') {
return callback(
new BadRequestException('MP4 타입만 업로드 가능합니다.'),
false,
);
}
return callback(null, true);
},
}),
)
async createVideo(@UploadedFile() video: Express.Multer.File) {
await this.thumbnailQueue.add(
'thumbnail', // 작업의 이름
{
videoId: video.filename,
videoPath: video.path,
},
{
// add 메서드의 3번째 파라미터로 옵션을 지정할 수 있음
priority: 1, // 숫자가 낮을수록 높은 우선순위
delay: 100, // 100msec 기다렸다가 작업하는 것
attempts: 3, // 작업이 실패하는 경우 몇번 다시 시도할지
lifo: true, // last in first out -> true 로 설정하는 경우 스택처럼 사용가능,
removeOnComplete: false, // 성공한경우 redis에서 삭제
removeOnFail: true, // 실패한경우 redis에서 삭제
},
);
return {
fileName: video.filename,
};
}
}
consumer 역할을 하는 worker를 생성해준다.
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';
import { join } from 'path';
import { cwd } from 'process';
import * as ffmpegFluet from 'fluent-ffmpeg';
@Processor('thumbnail-generation') // 큐의 이름
export class ThumbnailGenerationProcess extends WorkerHost {
async process(job: Job, token?: string): Promise<any> {
const { videoPath, videoId } = job.data;
const outputDirectory = join(cwd(), 'public', 'thumbnail');
console.log(`영상 트랜스 코딩중 Id : ${videoId}`);
ffmpegFluet(videoPath)
.screenshots({
count: 1,
filename: `${videoId}.png`,
folder: outputDirectory,
size: '320x240',
})
.on('end', () => {
console.log(`썸네일 생성완료 Id : ${videoId}`);
})
.on('error', () => {
console.log(`썸네일 생성실패 Id : ${videoId}`);
});
}
}
@Processor('thumbnail-generation')
-> thumbnail-generation 이라는 이름의 BullMQ 작업 대기열을 처리하는 작업자임을 선언함 !
이 작업자는 특정 대기열(thumbnail-generationn)에 작업이 추가될 때 해당 작업을 처리함.
process 메서드
-> 작업 대기열에서 전달된 작업 데이터를 받아 실제 작업을 수행함.
Worker 모듈을 생성하고,
providers로 ThumbnailGenerationProcess를 입력한다.
import { Module } from '@nestjs/common';
import { ThumbnailGenerationProcess } from './thumbnail-generation.worker';
@Module({
providers: [ThumbnailGenerationProcess],
})
export class WorkerModule {}
app.module.ts 에 WorkerModule을 import 한다.
Redis Insight를 다운받아 삽입된 대기열을 확인할 수 있다.
(Redis CLI로도 가능함)
'framwork > NestJs' 카테고리의 다른 글
[Nestjs] Custom Validator 데코레이터 적용하기 (1) | 2024.11.22 |
---|---|
[Nestjs] multer로 이미지 파일 업로드하기 (2) | 2024.11.19 |
[NestJS] LifeCycle (생명주기) (1) | 2024.07.02 |
[NestJS] Circular dependency (순환참조) 문제 해결 (0) | 2024.02.07 |