framwork/NestJs

[NestJS] Queue 사용하기 (Redis)

닉네임없음ㅎ 2024. 12. 26. 22:49

nestjs Queue사용하기

프레임워크 : NestJS
언어 : Typescript
저장소 : Redis

큐를 사용하는 이유는 여러가지 경우가 있겠지만, 
오래 걸리거나 당장 응답이 필요하지 않은 작업들을 Queue에 넣고, 워커 노드들이 큐에서 
작업을 가져가는 구조는 매우 유용하다. 

동작과정
1. 클라이언트 요청 : 클라이언트가 Rest API 서버에 요청을 보낸다. 
2. API 응답 및 작업 큐 삽입 : Rest API 가 클라이언트에 즉시 응답을 보내고, 작업은 Queue에 넣는다.
3. 워커 노드의 처리 : Worker Node가 Queue에서 작업을 가져와 처리를 완료한다. 

 

이 과정에서 Rest API는 Producer(생산자),
Redis는 큐저장소,
Worker는 Consumer(작업처리자)의 역할을 한다. 

 


*Redis 설치 
(로컬로 설치해도 되지만 Redis Cloud를 이용하였음)

위와 같이 설정하면 무료로 30MB의 데이터베이스를 사용가능하다. 

 

데이터베이스가 생성되고나면 

 

 

Edit 를 클릭하고 
Durability에서

Data eviction policy를 no eviction으로 변경한뒤 save database를 눌러야함 

-> 변경하는 이유 ? 
Bull MQ를 사용해서 Queue를 사용할건데 bull mq에서 이렇게 해주라고 적혀있음. 

 

 

데이터베이스를 생성하고나면 출력되는 퍼블릭엔드포인트 사용 ! 
끝의 5자리가 포트이고 앞의 문자들이 호스트임 
redis-~~~~~~.com:16654

사용자는 default 

 

 

패키지 설치 

pnpm i @nestjs/bullmq bullmq

 

queue를 사용하고자 하는 모듈에서 bullModule 세팅을 해준다. 

import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { BullModule } from '@nestjs/bullmq';

@Module({
  imports: [
    TypeOrmModule.forFeature([]),
    /**
     * 1. 작업들을 올려둘 queue를 redis로 사용할건데 redis endpoint 세팅을 해야함.
     * 2. queue 등록
     */
    BullModule.forRoot({
      connection: {
        host: 'redisd.com',
        port: 16654,
        username: 'default',
        password: 'QNx4zvAPR',
      },
    }),
    BullModule.registerQueue({
      name: 'thumbnail-generation', // 실제 작업의 이름
    }),
  ],
  controllers: [],
  providers: [],
  exports: [],
})
export class CommonModule {}

 

 

컨트롤러에서 큐에  메세지를 발행하는 producer의 역할을 하는 애를 만들어주고, 

import {
  BadRequestException,
  Controller,
  Post,
  UploadedFile,
  UseInterceptors,
} from '@nestjs/common';
import { FileInterceptor } from '@nestjs/platform-express';
import { ApiBearerAuth } from '@nestjs/swagger';
import { CommonService } from './common.service';
import { Queue } from 'bullmq';
import { InjectQueue } from '@nestjs/bullmq';

@Controller('common')
@ApiBearerAuth()
export class CommonController {
  constructor(
  	// 대기열 큐 주입 - BullModule.registerQueue에 지정한 이름과 동일해야함
    @InjectQueue('thumbnail-generation')		
    private readonly thumbnailQueue: Queue,
  ) {}

  @Post('video')
  @UseInterceptors(
    FileInterceptor('video', {
      limits: {
        fileSize: 200000000,
      },
      fileFilter(req, file, callback) {
        console.log(file);
        if (file.mimetype !== 'video/mp4') {
          return callback(
            new BadRequestException('MP4 타입만 업로드 가능합니다.'),
            false,
          );
        }
        return callback(null, true);
      },
    }),
  )
  async createVideo(@UploadedFile() video: Express.Multer.File) {
    await this.thumbnailQueue.add(
      'thumbnail', 	// 작업의 이름
      {
        videoId: video.filename,
        videoPath: video.path,
      },
      {
      	// add 메서드의 3번째 파라미터로 옵션을 지정할 수 있음
        priority: 1, // 숫자가 낮을수록 높은 우선순위
        delay: 100, // 100msec 기다렸다가 작업하는 것
        attempts: 3, // 작업이 실패하는 경우 몇번 다시 시도할지
        lifo: true, // last in first out -> true 로 설정하는 경우 스택처럼 사용가능,
        removeOnComplete: false, // 성공한경우 redis에서 삭제
        removeOnFail: true, // 실패한경우 redis에서 삭제
      },
    );
    return {
      fileName: video.filename,
    };
  }
}

 

 

consumer 역할을 하는 worker를 생성해준다. 

import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';
import { join } from 'path';
import { cwd } from 'process';
import * as ffmpegFluet from 'fluent-ffmpeg';

@Processor('thumbnail-generation') // 큐의 이름 
export class ThumbnailGenerationProcess extends WorkerHost {

  async process(job: Job, token?: string): Promise<any> {
    const { videoPath, videoId } = job.data;

    const outputDirectory = join(cwd(), 'public', 'thumbnail');
    console.log(`영상 트랜스 코딩중 Id : ${videoId}`);

    ffmpegFluet(videoPath)
      .screenshots({
        count: 1,
        filename: `${videoId}.png`,
        folder: outputDirectory,
        size: '320x240',
      })
      .on('end', () => {
        console.log(`썸네일 생성완료 Id : ${videoId}`);
      })
      .on('error', () => {
        console.log(`썸네일 생성실패 Id : ${videoId}`);
      });
  }
}

@Processor('thumbnail-generation')
-> thumbnail-generation 이라는 이름의 BullMQ 작업 대기열을 처리하는 작업자임을 선언함 ! 
이 작업자는 특정 대기열(thumbnail-generationn)에 작업이 추가될 때 해당 작업을 처리함.

process 메서드
-> 작업 대기열에서 전달된 작업 데이터를 받아 실제 작업을 수행함. 

 

 

Worker 모듈을 생성하고, 

providers로 ThumbnailGenerationProcess를 입력한다. 

import { Module } from '@nestjs/common';
import { ThumbnailGenerationProcess } from './thumbnail-generation.worker';

@Module({
  providers: [ThumbnailGenerationProcess],
})
export class WorkerModule {}

 

app.module.ts 에 WorkerModule을 import 한다. 

 

 

Redis Insight를 다운받아 삽입된 대기열을 확인할 수 있다. 

(Redis CLI로도 가능함)