임시저장 기능 고도화 해보기 (4)
들어가며
앞선 글에서는 사용자와 소켓을 연결하고 세션을 효율적으로 관리하기 위한 전략을 구상한 후, 기능을 구현해보았습니다.
이제 클라이언트가 소켓에 연결하여 임시 저장소에 데이터를 저장하며 임시저장을 할 수 있게 되었는데요, 만약 서버가 어떤 이유로 고장 나면 이 휘발성 임시 저장 데이터는 어떻게 해야 할까요?
휘발성 데이터의 일관성을 보장하기 위해 백업 전략을 구상하고 구현하려고 합니다.
아키텍쳐

전체적인 아키텍쳐는 이와 같다.
이번에도 구상하고 구현할 아키텍처는 위와 같습니다.
이번 아키텍처에서는 백업을 언제, 어떻게 진행할지를 구상하고 구현해볼 계획입니다.
백업은 언제 이뤄져야 할까?
백업을 언제, 어떤 상황에서 수행해야 할지에 대해 먼저 생각해보았습니다. 가장 기본적으로 백업은 주기적으로 이루어져야 합니다. 정해진 스케줄에 따라 데이터를 안전하게 보존하는 것이 임시저장 데이터의 안정성을 증가시키는 것이기 때문입니다.
하지만 주기적인 백업만으로는 충분하지 않습니다. 예기치 못한 상황에서도 데이터를 지키기 위한 추가적인 백업 타이밍이 필요했었습니다. 대표적으로는 다음과 같은 경우가 있는데요,
- 소켓 연결이 끊어질 때: 연결이 비정상적으로 종료되면 데이터 손실 가능성이 생김
- Pod가 종료될 때: 특히 Kubernetes 환경과 같이 Pod가 갑작스럽게 죽는 상황이 발생할 수 있음
주기적인 백업
주기적인 백업은 BullMQ의 기능을 활용해 구현해보았는데요, BullMQ에서는 repeatable이라고 불리는 메타 잡(meta job) 개념을 제공하는데, 이를 사용하면 반복적으로 실행되는 작업을 손쉽게 등록할 수 있습니다.

반복적으로 job이 등록된다.
특정 큐에 잡(job)을 등록하게 되면, 그 시점을 기준으로 하여 주기적으로 동일한 잡이 다시 등록되는 구조입니다. 한 번 등록해두면 계속해서 정해진 간격으로 새로운 잡이 이어서 생성되는 방식이라고 보시면 됩니다.
async createSubmissionBackupCron({
submissionId,
}: {
submissionId: string;
}) {
const cronJobName = `backup-${submissionId}`;
const existingJobs = await this.backupQueue.getJobs(['delayed'], 0, -1);
const hasExistingCron = existingJobs.some(
(job) => job?.name === cronJobName,
);
if (hasExistingCron) {
return;
}
await this.backupQueue.add(
JOB.DRAFT_SUBMISSION_BACKUP,
{
submissionId,
type: 'scheduled',
requestedBy: this.podId,
},
{
repeat: {
pattern: CronExpression.EVERY_MINUTE,
key: cronJobName,
},
jobId: cronJobName,
},
);
}
코드도 간단합니다. 먼저 backupQueue에 해당 cron job이 이미 등록되어 있는지를 확인하고, 만약 존재하지 않는다면 그때 반복 실행되는 job을 새로 추가하는 방식으로 동작합니다.
@Injectable()
@Processor(QUEUE.DRAFT_SUBMISSION_BACKUP, {
concurrency: 3,
})
export class BackupProcessor extends WorkerHost {
constructor(
@InjectRedis() private readonly redis: Redis,
private readonly draftSubmissionService: DraftSubmissionService,
) {
super();
}
async process(job: Job<BackupJobData>) {
const { submissionId } = job.data;
try {
await this.performBackup(submissionId);
...
return { success: true, message };
} catch (error) {
const message = `Backup job failed: ${error instanceof Error ? error.message : 'Unknown error'}`;
throw new Error(message);
}
}
private async performBackup(submissionId: string) {
await this.draftSubmissionService.migrateDraftSubmissionToMongo({
submissionId,
});
}
}
해당 부분은 반복 실행되는 job에 대한 코드인데요, 반복적으로 SQLite에 임시 저장된 데이터를 영속성 스토리지인 MongoDB로 백업합니다. 최종적으로 이 과정을 통해 5분마다 주기적인 백업이 수행되고 있습니다.
async migrateDraftSubmissionToMongo({
submissionId,
}: {
submissionId: string;
}) {
const draftSubmission = this.findDraftSubmission({ submissionId });
...
const draftSubmissionData =
this.submissionTransformer.transform(draftSubmission);
await this.draftSubmissionRepository.upsert({
submissionId,
draftSubmission: draftSubmissionData,
});
return {
success: true,
data: draftSubmissionData,
};
}
코드도 단순합니다. SQLite에서 임시 저장된 데이터를 가져온 뒤, MongoDB 스키마에 맞게 변환 작업을 거쳐 MongoDB에 저장합니다.
앞서 언급했듯이, MongoDB는 설문 문항, 답변, 설정, 단계, 테마 등 다양한 정보를 하나의 객체에 중첩된 구조로 저장하고 있지만, SQLite에서는 이 데이터를 각 항목별로 별도 테이블로 분리해 정규화하고 있습니다.
이러한 구조적 차이 때문에 변환 과정이 필요합니다!
소켓 연결이 끊어질 때 백업

끊어질때를 맞춰 백업을 진행해야한다.
소켓이 끊어진 경우는 두가지가 있습니다.
- 사용자가 명시적으로 소켓 연결을 끊는 경우
- 클라이언트가 서버가 보낸 하트비트를 수신하지 못한 경우
소켓이 끊어진다면 사용자가 임시저장하던 데이터를 백업하고 보존해야합니다.
클라이언트가 서버가 보낸 하트비트를 수신하지 못한 경우
사용자가 명시적으로 소켓 연결을 끊는 경우도 있지만, 특히 클라이언트 측에서는 소켓이 끊어진 사실을 요청을 보내기 전까지는 알기 어렵습니다. 그래서 서버에서 먼저 하트비트 신호를 보내고, 클라이언트가 이에 대한 응답(ping-pong)을 하지 않으면 연결이 끊어진 것으로 판단합니다.
@WebSocketGateway({
cors: {
origin: process.env.NODE_ENV === 'production' ? /\.goorm\.io$/ : '*',
credentials: true,
methods: ['GET', 'POST'],
},
namespace: 'draft-submission',
})
@UseWebSocketRoleGuard(GemUserRole.MANAGER)
export class DraftSubmissionGateway
implements OnGatewayConnection, OnGatewayInit, OnModuleDestroy
{
@WebSocketServer()
namespace: Namespace;
afterInit(namespace: Namespace) {
this.namespace = namespace;
this.namespace.on('connection', (socket) => {
let lastPongTime = Date.now();
const TIMEOUT_MS = 60 * 1000;
const pingInterval = setInterval(() => {
if (!socket.connected) {
clearInterval(pingInterval);
return;
}
const now = Date.now();
const timeSinceLastPong = now - lastPongTime;
if (timeSinceLastPong > TIMEOUT_MS) {
socket.disconnect(true);
clearInterval(pingInterval);
return;
}
socket.emit('ping', {
timestamp: now,
socketId: socket.id,
});
}, 5000);
socket.on('disconnect', async () => {
await this.handleDisconnect(socket);
clearInterval(pingInterval);
});
socket.on('pong', () => {
lastPongTime = Date.now();
});
});
}
...
}
연결이 끊어졌다고 판단되면, 아래 이어서 나올 케이스에 대한 처리 메서드(handleDisconnect)를 호출합니다.
사용자가 명시적으로 소켓 연결을 끊는 경우
클라이언트 측에서 소켓 연결을 끊는 상황도 자연스럽게 발생합니다. 예를 들어, 사용자가 소켓을 사용하는 페이지를 떠날 때 연결이 종료됩니다.
@WebSocketGateway({
cors: {
origin: process.env.NODE_ENV === 'production' ? /\.goorm\.io$/ : '*',
credentials: true,
methods: ['GET', 'POST'],
},
namespace: 'draft-submission',
})
@UseWebSocketRoleGuard(GemUserRole.MANAGER)
export class DraftSubmissionGateway
implements OnGatewayConnection, OnGatewayInit, OnModuleDestroy
{
@WebSocketServer()
namespace: Namespace;
async handleDisconnect(client: Socket) {
const submissionId = client.handshake?.query?.submissionId as
| string
| undefined;
try {
if (submissionId) {
await this.draftSubmissionBackupService.removeSubmissionBackupCron(
{
submissionId,
},
);
const result =
await this.draftSubmissionService.migrateDraftSubmissionToMongo(
{
submissionId,
},
);
if (result?.success) {
this.draftSubmissionService.flushDraftSubmission({
submissionId,
});
}
}
client.emit('disconnection', 'disconnected');
} catch (error) {
this.apm.captureError({
message: 'Error during disconnect',
stack: error instanceof Error ? error.stack : undefined,
name: error instanceof Error ? error.name : 'DisconnectError',
});
}
}
...
}
소켓이 끊겼다고 판단이 되었을때는 임시저장소에 있던 데이터를 영구 저장소로 백업해줘야합니다.
아래 세가지 작업을 진행합니다.
- 5분 간격으로 백업을 진행하는 백업 Job 삭제
- SQLite에 저장된 데이터를 mongoDB로 이관
- SQLite에 저장된 데이터 삭제
해당 작업이 모두 완료된 이후에야 서버가 클라이언트에게 연결 해제가 정상적으로 처리되었다는 메시지를 emit합니다.
Pod에 문제가 생겼을 때 백업
Kubernetes에서 pod를 운영하다 보면 메모리 초과나 CPU 초과와 같은 예기치 못한 상황으로 인해 pod가 종료될 수 있습니다. pod가 종료되면 서버뿐 아니라 그 안에 있던 소켓 커넥션도 모두 끊기게 되고, 임시 저장소로 사용했던 SQLite도 초기화됩니다.
이런 상황에 대비하기 위해, pod가 종료 이벤트를 감지한 뒤 실제로 종료(terminated)되기 전까지 백업 작업을 진행해야 합니다.
terminationGracePeriodSeconds를 30초로 설정하여, 종료까지 남은 시간 동안 백업을 수행합니다.
@WebSocketGateway({
cors: {
origin: process.env.NODE_ENV === 'production' ? /\.goorm\.io$/ : '*',
credentials: true,
methods: ['GET', 'POST'],
},
namespace: 'draft-submission',
})
@UseWebSocketRoleGuard(GemUserRole.MANAGER)
export class DraftSubmissionGateway
implements OnGatewayConnection, OnGatewayInit, OnModuleDestroy
{
@WebSocketServer()
namespace: Namespace;
async onModuleDestroy() {
const draftSubmissionIdList =
this.draftSubmissionService.getAllDraftSubmission({
page: 1,
pageSize: 1000,
});
this.draftSubmissionBackupService.requestDataBackupForAllSubmissions({
draftSubmissionIdList,
});
await this.draftSubmissionBackupService.storeDatabaseToS3();
}
...
}
pod가 종료 신호를 받을 때 NestJS 서버도 같은 종료 신호를 받아서 모듈을 종료하며 onModuleDestroy 훅을 실행할 수 있습니다.
Kubernetes 환경에서는 pod가 종료될 때 SIGTERM 신호가 컨테이너에 전달되는데요, 이 때 NestJS의 onModuleDestroy 메서드를 통해 필요한 정리 작업이 가능합니다.

백업에는 아래 두가지를 진행합니다.
- pod 내부에 임시 저장된 모든 설문조사 id를 가져와 백업을 진행합니다.
- SQLite 파일을 S3에 백업합니다.
이렇게 두가지 작업을 통해 pod에 있는 모든 데이터가 백업됩니다.
마치며
이처럼, pod 내부에 인메모리로 임시 저장된 데이터로 인해 발생할 수 있는 데이터 정합성 문제와 휘발성 이슈를 관리해 보았습니다.
먼저 전체 설계도를 작성하고, 백업이 필요한 경우들을 체계적으로 정리하여 구현했기 때문에 다양한 엣지 케이스에 대비할 수 있었던 것 같아요.
물론 여전히 예상하지 못한 상황이 있을 수 있지만, 운영 과정에서 발생하는 문제들을 빠르게 파악하고 적용해 나갈 계획입니다.
