큐를 활용한 배치 시스템을 분석하고 개선해보자 (3)

1. 들어가며

이전 글에서는 기존 배치 시스템의 문제점을 분석하고, 배치 방식과 즉시 적재 방식을 시뮬레이션해보았습니다. 실험 결과 배치 방식은 메모리 효율성과 API 효율성은 좋았지만 사용자 대기 시간 문제가 있었고, 즉시 적재 방식은 사용자 경험은 좋았지만 API 호출이 너무 많아 비효율적이었습니다.

결국 스케줄러를 걷어내고 두 방식을 최대한 만족시키는 이벤트 기반의 큐 시스템으로 전환하기로 결정했고, Redis 버퍼에 전체 데이터 대신 ID만 저장하는 방식으로 메모리를 최적화하기로 했습니다.

이번 글에서는 이러한 결정을 바탕으로 새롭게 설계한 부분에 대해 기록해보고자 합니다.

2. 전체 설계 개요

새로운 설계는 이벤트 기반으로 동작하며, 기존의 1분마다 도는 스케줄러를 제거하고 데이터가 들어오는 즉시 Job을 생성하거나 기존 Job을 조정하는 방식으로 개선했습니다. 주요 구성 요소는 다음과 같습니다.

Redis Key 구조

Redis에 저장되는 버퍼의 Key 구조는 다음과 같습니다.

submission_answer_sheet_flush:buffer:{sheetCreateUserId}:{submissionId}:{spreadSheetId}:{sheetId}
  • 타입: List (Redis List 자료구조)
  • 값: 답변 id 리스트 (고유 ID만 저장)
  • 최대 크기: 제한 없음 (하지만 배치당 최대 5000개씩 처리)

이전 글에서 분석했듯이, 전체 답변 데이터를 저장하는 대신 ID만 저장함으로써 메모리를 99.3% 절감할 수 있습니다. 실제 데이터는 처리 시점에 DB에서 조회하여 사용합니다.

배치 처리 전략

배치 처리는 최대 5000개씩 진행되며, Redis에서 최대 5000개의 ID를 한 번에 조회합니다.

조회한 ID 리스트를 DB에서 실제 데이터로 변환하고, Sheet 형식으로 변환한 후 Google API로 적재합니다.

첫 배치이고 헤더가 필요한 경우 헤더를 추출하여 함께 추가합니다. 성공적으로 적재되면 Redis List를 LTRIM으로 자르고, 시트 적재에 대한 트랜잭션 로그인 SheetTransaction을 저장합니다.

이 과정을 ID 리스트가 비어있을 때까지 반복합니다. 한 번에 최대 5000개씩 처리하는 이유는 메모리 사용량을 고려하면서도 효율적으로 처리하기 위함입니다.

Rate Limiting

Google Sheet API의 rate limit을 준수하기 위해 Rate Limiter를 사용하는데요, 정보는 아래와 같습니다.

  • 제한: 60개 Job/분 (구글 API 호출 credential 기준으로 구분)
  • 윈도우: 60초 슬라이딩 윈도우
  • 초과 시: Job을 60초 후로 지연 (moveToDelayed)하여 Queue로 다시 돌려보냄

이를 통해 동일한 사용자가 여러 SpreadSheet에 동시에 데이터를 적재하는 경우에도 rate limit을 초과하지 않도록 보장합니다.

Job 실행 조건

Job은 버퍼 크기에 따라 실행 시점이 결정됩니다

첫번째, 즉시 실행

  • 버퍼 크기 ≥ 500개일 때
  • 또는 기존 Delayed Job이 있고 버퍼가 500개 이상일 때 promote()를 호출하여 즉시 실행

두번째, 지연 실행 (delay: 10초)

  • 버퍼 크기 < 500개일 때
  • 새로운 Job을 생성할 때

500개라는 기준은 실험 결과를 바탕으로 설정했습니다. 평상시에는 10초 정도 기다려 여러 데이터를 모아 배치로 처리하고, 피크 시간대에는 500개가 넘으면 즉시 처리하여 사용자 대기 시간을 최소화합니다.

데이터 변환 프로세스

배치 처리를 위해 DB에서 조회한 답변 document를 Google Sheet에 적재할 수 있는 형식으로 변환하는 과정입니다:

  1. 필드 추출합니다: id, userName, submittedAt, answer 등의 필요한 필드를 추출합니다.
  2. 필터링: Jumbotron 타입의 답변은 제외하고, PersonalInfo 타입의 답변은 복호화합니다.
  3. Sheet 행으로 변환: 배열 형태의 문자열(string[])로 변환합니다.

최종적으로 지원서 id, 제출자, 제출시간, 각 질문에 대한 답변들로 구성된 Sheet 행 데이터가 생성됩니다.

3. 새로운 설계 톺아보기

사용자가 답변을 제출하는 순간부터 Google Sheet에 데이터가 적재될 때까지의 전체 플로우를 정의해보았습니다.

전체 플로우

flowchart TD Start([사용자 답변 제출]) --> Validate[시트 관련 validation] --> Append[SubmissionAnswerSpreadSheetService.append] Append --> CheckSpreadSheet{연결된 SpreadSheet<br/>존재 여부} CheckSpreadSheet -->|없음| End1([종료]) CheckSpreadSheet -->|있음| LoopSpreadSheet[각 SpreadSheet별 처리] LoopSpreadSheet --> AppendToBuffer[appendToSpreadSheetBuffer] AppendToBuffer --> RedisRPush[Redis List에 ID 추가<br/>RPUSH key submissionAnswerId<br/>고유 ID만 저장] RedisRPush --> CheckJob{기존 Job 존재<br/>및 활성 상태?} CheckJob -->|Job 없음/비활성| CreateJob[새 Job 생성<br/>createFlushJob] CheckJob -->|Job 활성 + 버퍼 ≥ 500<br/>+ Job Delayed| PromoteJob[Job 즉시 실행<br/>promote] CheckJob -->|Job 활성 + 그 외| Wait([Job 대기 중]) CreateJob --> CheckBufferSize{버퍼 크기<br/>≥ 500?} CheckBufferSize -->|Yes| ImmediateJob[즉시 실행<br/>delay: 0] CheckBufferSize -->|No| DelayedJob[10초 후 실행<br/>delay: 10000ms] ImmediateJob --> QueueJob[BullMQ Queue에 Job 추가] DelayedJob --> QueueJob PromoteJob --> WaitForConsumer([Consumer 대기]) QueueJob --> WaitForConsumer WaitForConsumer --> Consumer[SubmissionAnswerSheetFlushConsumer<br/>Job 처리 시작] Consumer --> RateLimit{Rate Limiter<br/>체크<br/>60개/분} RateLimit -->|제한 초과| DelayJob[Job 지연<br/>moveToDelayed] DelayJob --> WaitForConsumer RateLimit -->|통과| WriteSheet[writeToGoogleSpreadsheet] WriteSheet --> CheckHeader[시트 헤더 존재 여부<br/>확인 A1:Z1] CheckHeader --> FetchMetadata[질문 메타데이터 조회<br/>personalInfo, jumbotron] FetchMetadata --> ProcessBatches[processBatches<br/>배치 처리 루프] ProcessBatches --> FetchFromRedis[Redis에서 ID 리스트 조회<br/>LRANGE key 0 499<br/>최대 5000개씩] FetchFromRedis --> CheckEmpty{ID 리스트<br/>비어있음?} CheckEmpty -->|Yes| Complete([처리 완료]) CheckEmpty -->|No| FetchFromDB[DB에서 SubmissionAnswer<br/>조회 findMany] FetchFromDB --> ConvertToSheet[답변을 Sheet 형식으로<br/>변환 convertSingleAnswerToSheetForm] ConvertToSheet --> ProcessBatch[processSingleBatch] ProcessBatch --> CheckFirstBatch{첫 배치<br/>+ 헤더 필요?} CheckFirstBatch -->|Yes| ExtractHeader[시트 헤더 추출<br/>extractSubmissionHeader] CheckFirstBatch -->|No| AppendToSheet ExtractHeader --> AppendToSheet[Google API로 시트에 추가<br/>appendCellsInSheet] AppendToSheet --> TrimRedis[Redis List 자르기<br/>LTRIM key batchSize -1] TrimRedis --> SaveTransaction[SheetTransaction 저장<br/>COMPLETED] SaveTransaction --> ProcessBatches style Validate fill:#e24ee style Start fill:#e1f5ff style End1 fill:#ffe1e1 style Complete fill:#e1ffe1 style Consumer fill:#fff4e1 style RedisRPush fill:#f0e1ff style AppendToSheet fill:#e1fff0

전체 플로우는 크게 세 단계로 나뉩니다.

1단계: 버퍼 추가 및 Job 스케줄링

사용자가 답변을 제출하면 CQRS 이벤트 핸들러인 AppendSpreadsheetHandler가 SubmissionAnswerSubmittedEvent를 처리합니다.

핸들러는 먼저 연결된 SpreadSheet가 존재하는지 확인하고, SpreadSheet 설정(스프레드시트 ID, 시트 ID, 사용자 credential 등)을 검증합니다.

async handle({ submissionAnswer, submission, submittedAt }: SubmissionAnswerSubmittedEvent) {
  if (!this.shouldAppendSpreadSheet({ submission })) {
    return;
  }

  await this.updateSheetTransaction({
    submissionAnswer,
    submission,
    type: SheetTransactionType.INIT,
  });

  if (!(await this.validateSpreadSheetSettings({ submission }))) {
    await this.updateSheetTransaction({
      submissionAnswer,
      submission,
      type: SheetTransactionType.VALIDATION_FAILED,
    });
    return;
  }

  await this.submissionAnswerSpreadSheetService.append({
    submissionAnswer: { ...submissionAnswer, submittedAt },
    submission,
  });
}

검증을 통과하면 SubmissionAnswerSpreadSheetService.appen`가 호출되어 각 SpreadSheet별로 Redis 답변 ID만 추가합니다.

const FLUSH_DELAY_MS = 10 * 1000; // 10초
const FLUSH_THRESHOLD = 500; // 버퍼 임계값
const REMOVE_ON_COMPLETE_AGE = 7 * 24 * 60 * 60; // 7일
const REMOVE_ON_COMPLETE_COUNT = 1000; // 최대 1000개 보관

@Injectable()
export class SubmissionAnswerSpreadSheetService {
  constructor(
    @InjectRedis() private readonly redis: Redis,
    @InjectQueue(QUEUE.SUBMISSION_ANSWER_SHEET_FLUSH)
    private readonly submissionAnswerSheetFlushQueue: Queue,
  ) {}

  async append({
    submissionAnswer,
    submission,
  }: {
    submissionAnswer: Pick<SubmissionAnswer, 'id' | 'submissionId' | 'userName' | 'answer' | 'submittedAt'>;
    submission: Submission;
  }) {
    const connectedSpreadSheetList = submission?.spreadSheet;

    if (!connectedSpreadSheetList?.length || !submission.stepList?.length) {
      return;
    }

    await Promise.all(
      connectedSpreadSheetList.map(async (spreadSheet) => {
        await this.appendToSpreadSheetBuffer(
          submissionAnswer.id,
          submission.id,
          spreadSheet,
        );
      }),
    );
  }

  private async appendToSpreadSheetBuffer(
    submissionAnswerId: string,
    submissionId: string,
    spreadSheet: Pick<SpreadSheet, 'spreadSheetId' | 'sheetId' | 'createUserId'>,
  ) {
    const key = submissionAnswerSheetRedisKeyFactory
      .buffer({
        sheetCreateUserId: spreadSheet.createUserId,
        submissionId,
        spreadSheetId: spreadSheet.spreadSheetId,
        sheetId: String(spreadSheet.sheetId),
      })
      .toKey();

    const newBufferSize = await this.redis.rpush(key, submissionAnswerId);

    const existingJob = await this.submissionAnswerSheetFlushQueue.getJob(key);
    const isJobActive =
      existingJob &&
      !(await existingJob.isCompleted()) &&
      !(await existingJob.isFailed());

    if (!isJobActive) {
      if (existingJob) {
        await existingJob.remove();
      }
      await this.createFlushJob(key, newBufferSize);
      return;
    }

    if (newBufferSize >= FLUSH_THRESHOLD && (await existingJob.isDelayed())) {
      await existingJob.promote();
    }
  }

  private async createFlushJob(key: string, bufferSize: number) {
    const delay = bufferSize >= FLUSH_THRESHOLD ? 0 : FLUSH_DELAY_MS;

    await this.submissionAnswerSheetFlushQueue.add(
      JOB.SUBMISSION_ANSWER_SHEET_FLUSH,
      { key },
      {
        jobId: key,
        delay,
        removeOnComplete: {
          age: REMOVE_ON_COMPLETE_AGE,
          count: REMOVE_ON_COMPLETE_COUNT,
        },
        removeOnFail: true,
      },
    );
  }
}

append 메서드는 연결된 모든 SpreadSheet에 대해 병렬로 처리하며, 각 SpreadSheet별로 Redis 버퍼에 ID를 추가합니다.

appendToSpreadSheetBuffer에서는 기존 Job의 상태를 확인하는데요,

만약 Job이 없거나 비활성 상태라면 기존 Job이 있다면 제거하고, 버퍼 크기에 따라 새로운 Job을 생성합니다. 버퍼 크기가 500개 이상이면 즉시 실행(delay: 0), 미만이면 10초 후 실행(delay: 10000ms)합니다.

또한 Job이 활성 상태이고 버퍼가 500개 이상이며 Job이 Delayed 상태라면, promote()를 호출하여 Job을 즉시 실행합니다.

그 외의 경우에는 기존 Job이 처리 중이므로 대기합니다.

참고로 각 단계마다 SheetTransaction의 상태를 업데이트하여(INIT, VALIDATION_FAILED, ENQUEUED) 처리 과정을 추적할 수 있습니다. 검증 실패 시에는 Slack 알림을 전송하여 개발자가 즉시 파악할 수 있도록 합니다.

이렇게 한 이유는 10초 동안은 버퍼에 쌓아서 최대한으로 동시 요청을 묶어서 작업하기 위함입니다. 데이터가 적을 때는 10초 정도 기다려 배치 효율을 높이고, 데이터가 많을 때는 즉시 처리하여 실시간성을 확보합니다.

2단계: Consumer 처리 및 Rate Limiting

Job이 BullMQ Queue에 등록되면 SubmissionAnswerSheetFlushConsumer가 처리합니다. process 메서드가 진입점이며, 먼저 Rate Limiter를 체크하여 Google Sheet API의 제한(60개/분)을 넘지 않는지 확인합니다.

async process(job: Job<SubmissionAnswerSheetFlushJob>) {
  const { key } = job.data;
  const { sheetCreateUserId, submissionId, spreadSheetId, sheetId } =
    this.getInfoFromKey(key);

  await this.checkAndDelayIfRateLimited({
    sheetCreateUserId,
    job,
  });

  await this.writeToGoogleSpreadsheet({
    submissionId,
    spreadSheetId,
    sheetId,
    sheetCreateUserId,
    key,
  });
}

private async checkAndDelayIfRateLimited({
  sheetCreateUserId,
  job,
}: {
  sheetCreateUserId: string;
  job: Job<SubmissionAnswerSheetFlushJob>;
}) {
  const canProcess = await this.rateLimiter.checkLimit({
    key: submissionAnswerSheetRedisKeyFactory
      .limit({ sheetCreateUserId })
      .toKey(),
    options: {
      limit: RATE_LIMIT_COUNT, // 60개
      windowMs: RATE_LIMITER_WINDOW_MS, // 60초
    },
  });

  if (!canProcess) {
    await job.moveToDelayed(Date.now() + RATE_LIMITER_WINDOW_MS);
    throw new DelayedError();
  }
}

Rate Limit을 초과하면 Job을 60초 후로 지연시켜 다시 Queue로 돌려보냅니다. 통과하면 writeToGoogleSpreadsheet를 호출하여 배치 처리를 시작합니다.

에러가 발생하면 handleFailed 메서드가 호출되는데요, 구글 credential이 일시적으로 만료된 INVALID_CREDENTIAL이나 EXPIRED_CREDENTIAL 같은 복구 가능한 에러인 경우, 에러 처리용 데이터를 조회하여 Dead Letter Queue에 저장합니다.

protected async handleFailed(
  job: Job<SubmissionAnswerSheetFlushJob>,
  error: Error & { cause?: string },
) {
  const { key } = job.data;
  const { sheetCreateUserId, submissionId, spreadSheetId, sheetId } =
    this.getInfoFromKey(key);
  const idList = await this.redis.lrange(key, 0, -1);

  if (this.isRecoverableError({ error })) {
    await this.handleRecoverableError({
      key,
      sheetCreateUserId,
      submissionId,
      spreadSheetId,
      sheetId,
      idList,
    });
  }

  await this.saveSheetTransaction({
    idList,
    spreadSheetId,
    sheetId,
    type: SheetTransactionType.FAILED,
    errorMessage: error.message,
  });

  await this.redis.del(key);
  
  return {
    shouldSendSlack: true,
    slackTitle: SHEET_FLUSH_FAIL_MESSAGE_TITLE,
    additionalMessage: `key: ${key}`,
  };
}

private isRecoverableError({ error }: { error: Error }) {
  return (
    error.cause === GOOGLE_API_ERROR.INVALID_CREDENTIAL.code ||
    error.cause === GOOGLE_API_ERROR.EXPIRED_CREDENTIAL.code
  );
}

private async saveToDLQ({
  key,
  idList,
  sheetFormAnswerList,
  sheetCreateUserId,
  spreadSheetId,
  sheetId,
  submissionId,
}: {
  key: string;
  idList: string[];
  sheetFormAnswerList: string[][];
  sheetCreateUserId: string;
  spreadSheetId: string;
  sheetId: number;
  submissionId: string;
}) {
  await this.submissionAnswerSheetFlushDeadLetterQueue.add(
    JOB.SUBMISSION_ANSWER_SHEET_FLUSH_DEAD_LETTER_QUEUE,
    {
      sheetFormAnswerList,
      sheetCreateUserId,
      spreadSheetId,
      sheetId,
      submissionId,
      failedSubmissionAnswerIdList: idList,
    },
    {
      jobId: `${key}:dlq:${Date.now()}`,
      delay: DLQ_DELAY_MILLISECONDS, // 7일 후 재시도
    },
  );
}

3단계: 배치 처리 및 Sheet 작성

writeToGoogleSpreadsheet 메서드에서 시트 헤더 존재 여부를 확인하고, 질문 메타데이터를 조회한 후 배치 처리 루프를 시작합니다.

private async writeToGoogleSpreadsheet({
  submissionId,
  spreadSheetId,
  sheetId,
  sheetCreateUserId,
  key,
}: {
  submissionId: string;
  spreadSheetId: string;
  sheetId: number;
  sheetCreateUserId: string;
  key: string;
}) {
  const existingHeader = await this.googleSpreadSheetService.getCellsInSheetByRange({
    sheetId,
    spreadSheetId,
    userId: sheetCreateUserId,
    range: 'A1:Z1',
  });

  const needsHeader = existingHeader.length === 0;

  const questionMetadata = await this.fetchQuestionMetadata({
    submissionId,
  });

  await this.processBatches({
    key,
    submissionId,
    spreadSheetId,
    sheetId,
    sheetCreateUserId,
    needsHeader,
    questionMetadata,
  });
}

private async fetchQuestionMetadata({ submissionId }: { submissionId: string }) {
  const [personalInfoQuestionList, jumbotronQuestionList] = await Promise.all([
    this.submissionService.getPersonalInfoQuestionIdList({ submissionId }),
    this.submissionService.getJumbotronQuestionIdList({ submissionId }),
  ]);

  return {
    personalInfoQuestionIds: new Set(personalInfoQuestionList?.map((q) => q.id) || []),
    jumbotronQuestionIds: new Set(jumbotronQuestionList?.map((q) => q.id) || []),
  };
}

processBatches 메서드에서 배치 처리를 진행하는데요, 최대 5000개씩 처리하며, 한 번에 5000개까지 Redis에서 ID를 조회합니다.

private async processBatches({
  key,
  submissionId,
  spreadSheetId,
  sheetId,
  sheetCreateUserId,
  needsHeader,
  questionMetadata,
}: {
  key: string;
  submissionId: string;
  spreadSheetId: string;
  sheetId: number;
  sheetCreateUserId: string;
  needsHeader: boolean;
  questionMetadata: {
    personalInfoQuestionIds: Set<string>;
    jumbotronQuestionIds: Set<string>;
  };
}) {
  let isFirstBatch = true;
  let totalProcessed = 0;

  while (true) {
    const idList = await this.redis.lrange(key, 0, BATCH_SIZE - 1); // 최대 5000개

    if (!idList || idList.length === 0) break;

    const batchSize = idList.length;

    const submissionAnswerList = await this.fetchSubmissionAnswerList({ idList });

    const sheetFormAnswerList = await this.fetchSumbissionAnswer({
      submissionAnswerList,
      submissionId,
      questionMetadata,
    });

    await this.processSingleBatch({
      sheetFormAnswerList,
      batchSize,
      key,
      submissionId,
      spreadSheetId,
      sheetId,
      sheetCreateUserId,
      isFirstBatch,
      needsHeader,
    });

    await this.sheetTransactionRepository.upsertMany({
      sheetTransactionList: idList.map((id) => ({
        submissionAnswerId: id,
        spreadSheetId,
        sheetId,
        type: SheetTransactionType.COMPLETED,
      })),
    });

    totalProcessed += batchSize;
    isFirstBatch = false;
  }
}

각 배치는 processSingleBatch 메서드에서 처리됩니다. 첫 배치이고 헤더가 필요한 경우 헤더를 추출하여 함께 추가합니다.

private async processSingleBatch({
  sheetFormAnswerList,
  batchSize,
  key,
  spreadSheetId,
  sheetId,
  sheetCreateUserId,
  isFirstBatch,
  needsHeader,
  submissionId,
}: {
  sheetFormAnswerList: string[][];
  batchSize: number;
  key: string;
  spreadSheetId: string;
  sheetId: number;
  sheetCreateUserId: string;
  isFirstBatch: boolean;
  needsHeader: boolean;
  submissionId: string;
}) {
  if (sheetFormAnswerList.length === 0) {
    await this.redis.ltrim(key, batchSize, -1);
    return;
  }

  let values = sheetFormAnswerList;

  if (isFirstBatch && needsHeader) {
    const header = await this.extractSubmissionHeader({ submissionId });
    values = [header, ...sheetFormAnswerList];
  }

  await this.googleSpreadSheetService.appendCellsInSheet({
    sheetId,
    spreadSheetId,
    userId: sheetCreateUserId,
    values,
    valueInputOption: ValueInputOption.USER_ENTERED,
  });

  await this.redis.ltrim(key, batchSize, -1);
}

private async extractSubmissionHeader({ submissionId }: { submissionId: string }) {
  const header: string[] = ['submissionId', '제출자', '제출시간'];
  const submission = await this.submissionRepository.findOneById({ id: submissionId });

  submission.stepList?.forEach((step) => {
    step.questionList?.forEach((question) => {
      if (question.type === QuestionType.JUMBOTRON) {
        return;
      }
      header.push(question.title ?? '');
    });
  });

  return header;
}

Google API로 시트에 데이터를 추가한 후, Redis List를 LTRIM으로 자르고, SheetTransaction을 COMPLETED 상태로 저장합니다. 이 과정을 ID 리스트가 비어있을 때까지 반복합니다.

데이터 변환은 convertSingleAnswerToSheetForm 메서드에서 처리되며, Pre Process 단계에서 설명한 대로 필드 추출, 필터링(Jumbotron 제외, PersonalInfo 복호화), Sheet 행 변환, 날짜 포맷 변환을 수행합니다.

에러 처리 플로우

flowchart TD ConsumerError[Consumer 처리 중<br/>에러 발생] --> IsRecoverable{복구 가능한<br/>에러?<br/>INVALID_CREDENTIAL<br/>EXPIRED_CREDENTIAL} IsRecoverable -->|Yes| FetchData[에러 처리용 데이터 조회<br/>fetchQuestionMetadata<br/>fetchSubmissionAnswerList<br/>fetchSumbissionAnswer] FetchData --> CheckEmpty{변환된 데이터<br/>비어있음?} CheckEmpty -->|Yes| End1([종료]) CheckEmpty -->|No| SaveDLQ[Dead Letter Queue에 저장<br/>7일 후 재시도] IsRecoverable -->|No| SaveTransactionFailed SaveDLQ --> SaveTransactionFailed[SheetTransaction 저장<br/>FAILED 상태] SaveTransactionFailed --> DeleteRedis[Redis Key 삭제] DeleteRedis --> SendSlack[Slack 알림 전송] SendSlack --> End2([에러 처리 완료]) style ConsumerError fill:#ffe1e1 style SaveDLQ fill:#fff4e1 style SaveTransactionFailed fill:#ffe1e1 style End1 fill:#e1ffe1 style End2 fill:#e1ffe1

앞서 살펴본 바와 같이 Consumer 처리 중 에러가 발생하면 handleFailed 메서드가 호출되는데요, 복구 가능 여부를 먼저 판단하며, INVALID_CREDENTIAL이나 EXPIRED_CREDENTIAL 같은 복구 가능한 에러인 경우 따로 처리하게 됩니다.

protected async handleFailed(
  job: Job<SubmissionAnswerSheetFlushJob>,
  error: Error & { cause?: string },
) {
  const { key } = job.data;
  try {
    const { sheetCreateUserId, submissionId, spreadSheetId, sheetId } =
      this.getInfoFromKey(key);
    const idList = await this.redis.lrange(key, 0, -1);

    if (this.isRecoverableError({ error })) {
      await this.handleRecoverableError({
        key,
        sheetCreateUserId,
        submissionId,
        spreadSheetId,
        sheetId,
        idList,
      });
    }

    await this.saveSheetTransaction({
      idList,
      spreadSheetId,
      sheetId,
      type: SheetTransactionType.FAILED,
      errorMessage: error.message,
    });

    await this.redis.del(key);
  } catch (handleError) {
    this.logger.error(
      { err: handleError, key, originalError: error },
      '[SubmissionAnswerSheetFlushConsumer] handleFailed 처리 중 에러 발생',
    );
  }

  return {
    shouldSendSlack: true,
    slackTitle: SHEET_FLUSH_FAIL_MESSAGE_TITLE,
    additionalMessage: `key: ${key}`,
  };
}

복구 가능한 에러인 경우, handleRecoverableError에서 에러 처리용 데이터를 조회하여 변환하고, Dead Letter Queue에 저장하여 7일 후 재시도하도록 합니다.

private async handleRecoverableError({
  key,
  sheetCreateUserId,
  submissionId,
  spreadSheetId,
  sheetId,
  idList,
}: {
  key: string;
  sheetCreateUserId: string;
  submissionId: string;
  spreadSheetId: string;
  sheetId: number;
  idList: string[];
}) {
  const questionMetadata = await this.fetchQuestionMetadata({ submissionId });
  const submissionAnswerList = await this.fetchSubmissionAnswerList({ idList });
  const sheetFormAnswerList = await this.fetchSumbissionAnswer({
    submissionAnswerList,
    submissionId,
    questionMetadata,
  });

  if (sheetFormAnswerList.length === 0) return;

  await this.saveToDLQ({
    key,
    sheetFormAnswerList,
    sheetCreateUserId,
    spreadSheetId,
    sheetId,
    submissionId,
    idList,
  });
}

private async saveToDLQ({
  key,
  idList,
  sheetFormAnswerList,
  sheetCreateUserId,
  spreadSheetId,
  sheetId,
  submissionId,
}: {
  key: string;
  idList: string[];
  sheetFormAnswerList: string[][];
  sheetCreateUserId: string;
  spreadSheetId: string;
  sheetId: number;
  submissionId: string;
}) {
  await this.submissionAnswerSheetFlushDeadLetterQueue.add(
    JOB.SUBMISSION_ANSWER_SHEET_FLUSH_DEAD_LETTER_QUEUE,
    {
      sheetFormAnswerList,
      sheetCreateUserId,
      spreadSheetId,
      sheetId,
      submissionId,
      failedSubmissionAnswerIdList: idList,
    },
    {
      jobId: `${key}:dlq:${Date.now()}`,
      delay: DLQ_DELAY_MILLISECONDS, // 7일 후 재시도
    },
  );
}

아 물론, 복구 불가능한 에러인 경우에도 동일하게 SheetTransaction을 FAILED 상태로 저장합니다. 모든 에러 처리 후에는 Redis Key를 삭제하고, handleFailed 메서드는 Slack 알림 전송을 위한 정보를 반환하여 실무자인 저와 팀원들이 즉시 파악할 수 있도록 합니다.

DLQ Consumer 처리

Dead Letter Queue에 저장된 Job은 7일 후 SubmissionAnswerSheetFlushDLQConsumer가 처리하거나 명시적으로 재시도할 수 있습니다.

@Processor(QUEUE.SUBMISSION_ANSWER_SHEET_FLUSH_DEAD_LETTER_QUEUE)
export class SubmissionAnswerSheetFlushDLQConsumer extends BaseConsumer<SubmissionAnswerSheetFlushDeadLetterQueueJob> {
  async process(job: Job<SubmissionAnswerSheetFlushDeadLetterQueueJob>) {
    const {
      sheetFormAnswerList,
      sheetCreateUserId,
      spreadSheetId,
      sheetId,
      submissionId,
      failedSubmissionAnswerIdList,
    } = job.data;

    const valuesWithHeader = await this.prepareSheetValues({
      sheetFormAnswerList,
      sheetCreateUserId,
      spreadSheetId,
      sheetId,
      submissionId,
    });

    await this.googleSpreadSheetService.appendCellsInSheet({
      sheetId,
      spreadSheetId,
      userId: sheetCreateUserId,
      values: valuesWithHeader,
      valueInputOption: ValueInputOption.USER_ENTERED,
    });

    await this.updateCompletedSheetTransaction({
      failedSubmissionAnswerIdList,
      spreadSheetId,
      sheetId,
    });
  }

  private async prepareSheetValues({
    sheetFormAnswerList,
    sheetCreateUserId,
    spreadSheetId,
    sheetId,
    submissionId,
  }: {
    sheetFormAnswerList: string[][];
    sheetCreateUserId: string;
    spreadSheetId: string;
    sheetId: number;
    submissionId: string;
  }) {
    const isHeaderExists = await this.checkHeaderExists({
      sheetId,
      spreadSheetId,
      userId: sheetCreateUserId,
    });

    if (isHeaderExists) {
      return sheetFormAnswerList;
    }

    const header = await this.extractSubmissionHeader({ submissionId });
    return [header, ...sheetFormAnswerList];
  }
}

DLQ에서 하는 작업은 결국 기존 컨슈머에서 성공하지 못했던 로직을 재실행하는 것입니다.

결론적으로는 실패한 ID들에 대해서 Google Sheet API를 통해서 시트 적재를 다시 시도하는 것이지요!

이때 운영 경험을 바탕으로 저희는 실패하는 요소들이면서 복구할 수 있는 케이스는 각 유저의 credential이 만료되었을 때라고 보았습니다. 이러한 경우에는 유저들에게 알림을 보내서 credential을 재갱신하도록 유도하고, 재갱신이 완료되면 해당 실패한 작업을 다시 시도하면 되기 때문이에요.

protected async handleFailed(
  job: Job<SubmissionAnswerSheetFlushDeadLetterQueueJob>,
  error: Error & { cause?: string },
) {
  if (this.isRecoverableError({ error })) {
    await job.remove();

    await this.submissionAnswerSheetFlushDeadLetterQueue.add(
      JOB.SUBMISSION_ANSWER_SHEET_FLUSH_DEAD_LETTER_QUEUE,
      job.data,
      {
        jobId: `${job.id}:retry:${Date.now()}`,
        delay: DLQ_RETRY_DELAY_MILLISECONDS, // 7일 후 재시도
      },
    );
  }

  return {
    shouldSendSlack: true,
    slackTitle: SHEET_FLUSH_DLQ_FAIL_MESSAGE_TITLE,
    additionalMessage: `jobData: ${JSON.stringify(job.data)}`,
  };
}

DLQ Consumer에서도 에러가 발생하면 handleFailed가 호출됩니다. 이번에도 복구 가능한 에러인 경우, 기존 Job을 제거하고 7일 후 다시 재시도하도록 새 Job을 추가해서 재시도할 수 있도록 조치합니다.

DLQ 처리도 실패하면 이제는 정말 복구할 수 없는 에러이기 때문에, Slack 알림을 전송하여 개발자가 즉시 파악할 수 있도록 합니다.

이를 통해 운영 중 발생하는 에러를 빠르게 감지하고 적절히 대응할 수 있습니다.

5. 마무리하며

이번 글에서는 이벤트 기반의 새로운 아키텍처 설계를 상세히 살펴보았습니다. 스케줄러를 걷어내고 데이터가 들어오는 즉시 Job을 생성하거나 조정하는 방식으로 변경해 사용자 대기 시간을 최소화했고, Redis에는 ID만 저장해 메모리를 절감했습니다.

무엇보다도 기존에는 개발자가 수동으로 처리해야 했던 시트 동기화 오류를 API 레벨에서 일일이 복구하는 대신, DLQ에 적재된 실패 Job을 배치로 재처리하는 방식으로 전환하면서 운영성을 크게 개선했습니다. 이로써 장애 상황에서도 데이터를 안정적으로 유지할 수 있을 뿐 아니라, 반복되는 수작업을 자동화해 사용성과 운영 편의성까지 함께 끌어올릴 수 있었던게 너무 좋은 경험이였습니다.

앞으로도 개발할 때 리소스를 최적으로 사용하며 요구사항을 충족하고, 운영성을 만족할 수 있도록 많은 고민을 더 하면서 개발해보려고 합니다.