임시저장 기능 고도화 해보기 (3)

들어가며

앞선 글에서는 임시 저장소로 사용할 SQLite에 데이터를 저장하기 위해 Drizzle을 활용하여 엔티티를 정의하고 데이터를 CRUD할 수 있는 메서드를 작성하였습니다.

지금은 특정 메서드를 통해 데이터를 임시 저장소에 저장할 수 있게 되었는데요. 그런데 만약 변경사항을 실시간으로 저장하려면 어떻게 해야 할까요? 단순히 HTTP 통신을 사용하면 요청이 반복적으로 발생하면서 부하가 커지고, 속도 면에서도 지연이 생길 수 있습니다. 이런 문제를 해결하기 위해 소켓 통신을 활용해 구현해보려 합니다.

아키텍쳐

draft_img

전체적인 아키텍쳐는 이와 같다.

이번에도 구상하고 구현할 아키텍처는 위와 같습니다.

이번에는 사용자와 소켓을 연결하고 세션을 효율적으로 관리하기 위한 전략을 구상한 뒤, 이를 기반으로 세팅을 진행할 계획입니다.

@nestjs/websockets

Documentation | NestJS - A progressive Node.js framework
Documentation | NestJS - A progressive Node.js framework
Nest is a framework for building efficient, scalable Node.js server-side applications. It uses progressive JavaScript, is built with TypeScript and combines elements of OOP (Object Oriented Programming), FP (Functional Programming), and FRP (Functional Reactive Programming).

NestJS는 추상화를 통해 HTTP, WebSocket, Microservices 환경 전반에서 Nest의 라이프사이클과 컴포넌트를 동일하게 사용할 수 있게 합니다.

역시나 WebSocket 기능도 지원하는데요, Nest에서는 간단하게 @WebSocketGateway() 데코레이터를 활용해 게이트웨이를 생성하고, 기존 웹소켓 라이브러리와 동일하게 동작하도록 구현할 수 있습니다.

Gateway

@WebSocketGateway({
	cors: {
		...
	},
	namespace: 'draft-submission',
})
@UseWebSocketRoleGuard(GemUserRole.MANAGER)
export class DraftSubmissionGateway
	implements OnGatewayConnection, OnGatewayInit, OnModuleDestroy
{
	@WebSocketServer()
	namespace: Namespace;

	...

}

draft-submission라는 Namespace를 할당해주었는데요, 소켓에서 Namespace는 하나의 물리적 소켓 연결을 여러 개의 독립적인 논리적 통신 채널로 분할하는 역할을 하는데요, 이를 통해 같은 연결 안에서 여러 개의 별도 통신 공간을 만들 수 있어 애플리케이션의 로직을 각 채널별로 분리하고 관리할 수 있게 해줍니다. (multiplexing)

orm

예를들어, 네임스페이스를 사용하면 클라이언트가 하나의 소켓 연결로 여러 영역(e.g 일반 사용자용 '/', 관리자용 '/admin' 등)에 동시 접속하고, 각각 독립적인 이벤트나 메시지를 주고받을 수 있습니다.

서버도 네임스페이스별로 별도의 이벤트 핸들러를 관리할 수 있어 복잡한 실시간 통신 구조를 깔끔하게 분리할 수 있습니다.

handleConnection

async handleConnection(@ConnectedSocket() client: Socket) {
		try {
			...
			// 사용자 인증

			const { sid } = client.handshake.query as { sid?: string };

            // 세션 관리
			const syncResult = await this.sessionManager.handleDataSync({
				userId: user.id,
				submissionId,
				sid: sid ?? 'default',
			});

			if (!syncResult.success) {
				throw new WsException({
					success: false,
					message: `Sync failed: ${syncResult.message}`,
					code: 'SYNC_FAILED',
				});
			}

            // sqlite 데이터베이스 초기화
			const initResult =
				await this.draftSubmissionService.initializeSubmission({
					submissionId,
					userId: user.id,
				});

			if (initResult?.success) {
				await client.join(
					`${DRAFT_SUBMISSION_ROOM_PREFIX}:${submissionId}`,
				);

				client.emit('connection', {
					message: 'connected',
				});
			} else {
				throw new WsException({
					success: false,
					message: initResult?.error,
					code: initResult?.code,
				});
			}
		} catch (error: unknown) {
			this.apm.captureError({
				message: 'Connection error',
				stack: error instanceof Error ? error.stack : undefined,
				name: error instanceof Error ? error.name : 'ConnectionError',
			});
		}
	}

handleConnection 메서드는 OnGatewayConnection 라이프사이클 훅에서 필수적으로 구현해야 하는 메서드로, 사용자가 게이트웨이에 연결할 때 수행할 동작들을 해당 메서드에 정의합니다.

클라이언트가 소켓 연결을 맺으면 연결된 소켓을 인수로 받는 이 메서드가 호출되어 연결 초기화 작업이나 인증, 혹은 로그 기록 등 필요한 처리를 수행할 수 있어요.

저희의 목표는 클라이언트가 소켓 연결을 맺게 될 때 임시저장하고 있던 원본 데이터를 가져와서 초기화하는 작업입니다.

orm

저번에 킵해두신 데이터 가져왔습니다.

우선 해당 handleConnection 메서드에서 중요한 부분이 두가지가 있는데요, 아래 두가지입니다.

  1. 다중 Pod 환경에서 사용자 세션 관리하는 부분
  2. 영구 저장소(mongoDB)에서 이전에 작성하던 임시저장 데이터를 가져와서 임시 저장소(SQLite)에 초기화하는 부분

다중 Pod 환경에서 사용자 세션 관리

다중 Pod 환경에서 각 Pod가 별도의 SQLite 데이터베이스에 임시 데이터를 저장하면 데이터가 분산되는 문제가 발생하는데요, 비록 Sticky Session 설정으로 사용자가 같은 Pod에 연결되도록 하더라도 사용자가 항상 동일한 Pod에 머무는 것은 아닙니다.

orm

같은 서버를 여러대 띄웠을 때 세션이 유지되어야한다.

그래서 서버에서는 사용자의 연결 상태를 세션으로 관리해야 하는데요, 이를 위해 중앙 집중식 저장소 역할을 하는 Redis를 통해 세션을 저장하고 관리해보았습니다.

사용자가 어떤 Pod에 연결되더라도 동일한 세션 정보를 기반으로 임시 데이터를 일관되게 조회하고 저장할 수 있도록 합니다!

세션, 설문조사 데이터기반 데이터 동기화 시나리오

orm

유저는 갑자기 데이터를 잃을 수도 있다.

각 Pod에 SQLite가 설치되어 있고 해당 데이터베이스에 임시저장 데이터를 저장하고 있는 상황이라고 가정할 때, 특정 유저가 작업 중에 다른 Pod로 연결된다면 데이터가 날아간 것 처럼 느껴질 것입니다.

이러한 상황에서도 이전에 임시 저장한 데이터를 이어서 불러오고 작업을 계속할 수 있어야 합니다.

graph TB subgraph ConnectionFlow["연결 처리 Flow"] Step1[1단계: Redis 세션 확인] Step2[2단계: 데이터 위치 파악] Step3[3단계: Pod 라우팅 결정] Step4[4단계: 데이터 동기화] Step1 --> Step2 Step2 --> Step3 Step3 --> Step4 end subgraph SyncScenarios["데이터 동기화 시나리오"] Scenario1[시나리오 1<br/>같은 Pod 연결<br/>데이터 그대로 사용] Scenario2[시나리오 2<br/>다른 Pod 연결<br/>데이터 이전 필요] Scenario3[시나리오 3<br/>Pod 장애<br/>백업에서 복구] end classDef userClass fill:#e1f5fe classDef ingressClass fill:#f3e5f5 classDef podClass fill:#fff3e0 classDef redisClass fill:#ffebee classDef flowClass fill:#e8f5e8 classDef scenarioClass fill:#fce4ec class UserA,UserB userClass class Ingress,Service ingressClass class Pod1,Pod2 podClass class Redis,RedisSession,SessionStore,DataLocation redisClass class ConnectionFlow,Step1,Step2,Step3,Step4 flowClass class SyncScenarios,Scenario1,Scenario2,Scenario3 scenarioClass

무조건 위와 같은 상황으로 사용자가 Pod가 연결되는 것은 아니고, 여러가지 케이스로 연결됩니다.

Pod 연결 시나리오는 크게 다음 세 가지로 나눌 수 있습니다.

  • 같은 Pod에 연결되어 SQLite 데이터베이스를 그대로 사용

  • 다른 Pod에 연결되어 이전 Pod에 저장된 임시 데이터를 새 Pod로 이전하여 사용

  • Pod 장애 발생으로 인해 백업 데이터를 복구하여 사용

이러한 시나리오를 처리하기 위해서 Redis을 활용하여 세션과 설문조사 임시저장 데이터 위치를 추적하고, Pod 간 데이터 복제 및 동기화 전략을 수립하고 구현해보았습니다.

세션 및 설문조사 임시저장 데이터

세션 정보와 설문조사 임시저장 데이터는 분산 환경에서 중앙 집중식 저장소 역할을 하는 Redis에 저장되는데요, 우선 주요 필드부터 설정하고 가겠습니다.

// 세션 데이터 (Hash)
// key: session_data:${userId}:${submissionId}:${sid}
{
  "userId": "...",
  "submissionId": "...",
  "podId": "...",
  "sid": "...",
  "createdAt": "..."
}

사용자 세션 데이터는 사용자 고유 ID, 작성 중인 지원서의 고유 ID, 사용자가 연결된 소켓 Pod의 ID, 세션 ID, 생성 시점 등의 정보를 포함합니다.

// 지원서 데이터 (Hash)
// key: submission_data:${submissionId}
{
  "submissionId": "...",
  "podName": "...",
  "lastSync": "...",
  "backupStatus": "...",
  "version": "...",
  "failedAt": "..."
}

설문조사 임시저장 데이터에는 지원서 고유 ID, 임시 저장 및 작성 중인 Pod 이름, 마지막 동기화 시점, 백업 상태, 버전, 실패 기록과 시점 등의 정보가 저장됩니다.

handleDataSync

	async handleDataSync({
		userId,
		submissionId,
		sid,
	}: {
		userId: string;
		submissionId: string;
		sid: string;
	}) {
		const syncDecision = await this.determineSyncScenario({
			userId,
			submissionId,
			sid,
		});

		switch (syncDecision.scenario) {
			case SyncScenario.SAME_POD:
				return await this.handleSamePodConnection({
					userId,
					submissionId,
					sid,
				});

			case SyncScenario.DIFFERENT_POD:
				return await this.handleDifferentPodConnection({
					userId,
					submissionId,
					sid,
				});

			case SyncScenario.POD_FAILURE:
				return await this.handlePodFailureRecovery({
					userId,
					submissionId,
					sid,
				});

			default:
				return { success: false, message: 'Unknown sync scenario' };
		}
	}

이제 세션 데이터와 설문조사 임시저장 데이터를 기반으로 각 Pod 간 동기화 상태를 판별하고, 그에 따라 필요한 작업을 진행합니다.

우선 세션 id를 기반으로 현재 세션(currentSidSession), 유저 고유 id를 기반으로 최신 세션(latestSession), 임시저장하고 있는 설문조사의 고유 id를 기반으로 데이터 위치(dataLocation) 정보를 가져옵니다.

SAME_POD

  • 사용자의 현재 세션(currentSidSession) 또는 최신 세션(latestSession)이 존재하지 않으면서 데이터 위치(dataLocation)가 존재하지 않는 경우 (첫 연결)
  • 데이터 위치(dataLocation)에 저장된 Pod ID가 현재 실행 중인 Pod ID와 같은 경우
  • 데이터 위치(dataLocation) 정보에서 Pod 이름이 현재 Pod ID와 동일한 경우

이 경우, 사용자는 같은 Pod에 연결되어 있다고 판단하며, 기존 SQLite 데이터베이스를 그대로 사용하여 임시 데이터를 이어서 작업합니다.

DIFFERENT_POD

  • 데이터 위치(dataLocation) 정보가 존재하지만 Pod 이름이 현재 Pod ID와 다른 경우
  • 세션 정보가 존재하지만 세션에 저장된 Pod ID가 현재 Pod ID와 다를 경우

사용자는 다른 Pod에 연결된 것으로 간주되며, 이전 Pod에 저장된 임시 데이터를 새 Pod로 이전하거나 동기화하는 처리가 필요합니다.

POD_FAILURE

  • 데이터 위치(dataLocation)의 백업 상태(backupStatus)가 pod_failed로 표시된 경우

Pod 장애로 인해 기존 데이터가 정상적이지 않은 상태로 판단되며, 백업 저장소에서 데이터를 복구하는 등의 장애 대응 작업이 필요합니다.

handleSamePodConnection

async handleSamePodConnection({
	userId,
	submissionId,
	sid,
}: {
	userId: string;
	submissionId: string;
	sid: string;
}) {
	await this.updateSession({ userId, submissionId, sid });
	await this.draftSubmissionBackupService.createSubmissionBackupCron({
		submissionId,
	});

	return { success: true, message: 'Connected to same pod' };
}

SAME_POD 케이스에 실행하는 작업으로, 세션을 최신 세션으로 업데이트합니다.

handleDifferentPodConnection

async handleDifferentPodConnection({
	userId,
	submissionId,
	sid,
}: {
	userId: string;
	submissionId: string;
	sid: string;
}) {
	await this.draftSubmissionBackupService.updateDataLocation({
		submissionId,
	});
	await this.updateSession({ userId, submissionId, sid });

	return { success: true, message: 'Pod migration completed' };
}

DIFFERENT_POD 케이스에 실행하는 작업으로, 세션을 최신 세션으로 업데이트 하고, 임시저장하고 있는 설문조사의 고유 id를 기반 데이터 위치를 현재 pod id로 변경합니다.

handlePodFailureRecovery

async handlePodFailureRecovery({
	submissionId,
}: {
	submissionId: string;
}): Promise<{ success: boolean; message: string }> {
	const currentLocation = await this.getDataLocation({ submissionId });

	if (currentLocation?.backupStatus === 'pod_failed') {
		await this.markRecovering({ submissionId });
	}

	await this.updateDataLocation({ submissionId });
	await this.markRecovered({ submissionId });

	return { success: true, message: 'Recovered from pod failure' };
}

POD_FAILURE 케이스에 실행하는 작업으로, backupStatus가 pod_failed로 기록되어 있는 임시저장하고 있는 설문조사의 고유 id를 기반 데이터 위치 정보를 recovered로 변경하고, 위치를 현재 pod id로 변경합니다.

정리

graph TB UserA[사용자 A<br/>브라우저 1] UserB[사용자 A<br/>다른 브라우저] UserA --> Ingress[Ingress Controller<br/>1차 Sticky Session<br/>] UserB --> Ingress Ingress --> Service[K8s Service<br/>sessionAffinity] Service --> Pod1[Pod 1<br/>SQLite DB<br/>임시 데이터] Service --> Pod2[Pod 2<br/>SQLite DB<br/>임시 데이터] subgraph RedisSession["Redis"] SessionStore[세션 정보 추적<br/>user_session:userA:submissionId<br/>dataLocation: pod-1<br/>lastActivity: timestamp] DataLocation[데이터 위치 추적<br/>submission_data:submissionId<br/>podName: pod-1<br/>lastSync: timestamp] end Pod1 -.->|세션 등록 및 업데이트| SessionStore Pod2 -.->|세션 등록 및 업데이트| SessionStore SessionStore -.->|세션 확인 및 조회| Pod1 SessionStore -.->|세션 확인 및 조회| Pod2 Pod1 -.->|데이터 위치 등록 및 업데이트| DataLocation Pod2 -.->|데이터 위치 등록 및 업데이트| DataLocation DataLocation -.->|데이터 위치 확인 및 조회| Pod1 DataLocation -.->|데이터 위치 확인 및 조회| Pod2 classDef userClass fill:#e1f5fe classDef ingressClass fill:#f3e5f5 classDef podClass fill:#fff3e0 classDef redisClass fill:#ffebee classDef flowClass fill:#e8f5e8 classDef scenarioClass fill:#fce4ec class UserA,UserB userClass class Ingress,Service ingressClass class Pod1,Pod2 podClass class Redis,RedisSession,SessionStore,DataLocation redisClass class ConnectionFlow,Step1,Step2,Step3,Step4 flowClass class SyncScenarios,Scenario1,Scenario2,Scenario3 scenarioClass

세션 정보와 데이터 위치를 기반으로 사용자가 어떤 pod에서 임시 데이터를 보관하고 있는지를 기록할 수 있습니다.

해당 플로우의 목적은 어떤 pod로 라우팅될지 제어하는 것이 아니라, 애플리케이션에서 직접 제어 앱 단에서 사용자의 세션 정보를 추적하고 지속적으로 갱신함으로써, 특정 사용자가 어떤 pod와 연결되어야 하는지를 관리하는 것이 목적입니다.

즉 이 설문조사의 임시저장 데이터는 어떠한 파드에서, 어떠한 유저에 의해 저장되고 있다를 판별할 수 있는 환경을 구축해두는 것입니다!

영구 저장소의 설문조사 데이터를 임시 저장소로 초기화

const initResult =
	await this.draftSubmissionService.initializeSubmission({
		submissionId,
		userId: user.id,
	});

이제 세션 정보와 설문조사 임시저장 데이터 위치 정보를 초기화 하였으니, 실제 임시저장 데이터를 초기화 하면 됩니다.

private async loadExistingSubmission({
	submissionId,
	userId,
}: {
	submissionId: string;
	userId?: string;
}) {
	try {
		const mongoSubmission = await this.submissionRepository.findOneById(
			{
				id: submissionId,
			},
		);

		...

		const submissionData = this.db
			.select()
			.from(submission)
			.where(eq(submission.id, submissionId))
			.limit(1)
			.get();

		if (submissionData) {
			return {
				success: true,
				data: submissionData,
			};
		}
		...

		this.db
			.insert(submission)
			.values({
				id: submissionId,
				createUserId: mongoSubmission.createUserId ?? '',
				updateUserId: mongoSubmission.updateUserId ?? userId,
				title: mongoSubmission.title,
				description: mongoSubmission.description,
			})
			.run();

		this.saveThemeToSQLite({
			submissionId,
			mode: mongoSubmission.theme ?? ThemeMode.LIGHT,
			color: mongoSubmission.color ?? Color.PINK,
			banner: mongoSubmission.banner ?? {
				type: BannerType.NONE,
			},
		});

		if (
			mongoSubmission.stepList &&
			mongoSubmission.stepList.length > 0
		) {
			this.saveStepsToSQLite({
				stepList: mongoSubmission.stepList,
				submissionId,
			})
		}

		return {
			success: true,
			data: {
				submissionId,
				title: mongoSubmission.title ?? '',
			},
		};
	} catch (error) {
		return {
			success: false,
			error: `Failed to load existing submission: ${error instanceof Error ? error.message : 'Unknown error'}`,
			code: DRAFT_SUBMISSION_ERROR.UNKNOWN_ERROR.code,
		};
	}
}

실제 영속성 저장소인 MongoDB에 임시로 작성해두었던 설문조사 데이터를 가져와, SQLite에 적합한 여러 테이블로 변환하여 저장합니다.

MongoDB에는 설문 문항, 답변, 설정, 단계, 테마 등 다양한 정보가 하나의 객체에 중첩된 구조로 저장되어 있는데요, 이 데이터를 SQLite로 이관할 때 각 항목을 별도의 테이블로 분리해 정규화하고 있습니다.

물론 SQLite에 이미 설문조사 데이터가 존재하는 경우에는 기존 데이터를 그대로 활용합니다!

Connection 연결 성공

client.emit('connection', {
	message: 'connected',
});

마지막으로 클라이언트에게 연결이 성공했음을 알리고 소켓 통신을 시작할 준비가 되었음을 알립니다.

메세지 기반 통신

@SubscribeMessage('edit')
handleEditSubmission(
	@ConnectedSocket() client: Socket,
	@MessageBody() payload: EditSubmissionPayload,
	@WebSocketUser() user: UserData,
) {
	const result =
		this.draftSubmissionService.handleEditSubmission(payload);

	if (result.success === true) {
		const { action, block } = payload;

		const eventData = {
			...result,
			userId: user.id,
			action,
			block,
		};

		client.emit('edit:success', eventData);

		client
			.to(`${DRAFT_SUBMISSION_ROOM_PREFIX}:${payload.submissionId}`)
			.emit('edit:update', eventData);
	} else {
		throw new WsException({
			success: false,
			message: result.error,
			code: 'code' in result ? result.code : '00',
		});
	}
}

이제 클라이언트와 소켓 통신을 통해 실시간으로 임시 저장 기능을 사용할 수 있습니다.

@SubscribeMessage 데코레이터를 활용해 특정 이벤트(edit)를 구독하고, 해당 이벤트가 발생하면 실행되는 메서드를 정의하고 있는데요, 이 메서드는 클라이언트로부터 편집 요청을 받아 handleEditSubmission 메서드를 호출해 실제 편집 작업을 수행하고, 결과를 다시 클라이언트로 전송하는 역할을 합니다.

물론, 임시 저장 수정 작업에 이벤트를 각각 달 수도 있지만, 이벤트가 많아지면 관리가 어려워져 edit라는 하나의 이벤트로 통합해 처리하고 있습니다. 대신 action 필드를 통해 수행된 작업 종류를 구분하고, block 필드를 통해 수정된 블록을 명확히 표시합니다.

이때 block은 전략 패턴으로 각 블록별 처리 로직을 분리해 두어, 새로운 블록이 추가되어도 기존 코드를 크게 변경하지 않고 확장할 수 있습니다.

예를 들어 action이 create이고 block이 question이라면, 문항 블록을 추가하는 작업임을 의미하며, 전략 패턴에 따라 question 전략의 create 메서드가 호출되어 처리되고 있습니다.

disconnect 처리

async handleDisconnect(client: Socket) {
	const submissionId = client.handshake?.query?.submissionId as
		| string
		| undefined;

	try {
		if (submissionId) {
			await this.draftSubmissionBackupService.removeSubmissionBackupCron(
				{
					submissionId,
				},
			);

			const result =
				await this.draftSubmissionService.migrateDraftSubmissionToMongo(
					{
						submissionId,
					},
				);

			if (result?.success) {
				this.draftSubmissionService.flushDraftSubmission({
					submissionId,
				});
			}
		}

		client.emit('disconnection', 'disconnected');
	} catch (error) {
		this.apm.captureError({
			message: 'Error during disconnect',
			stack: error instanceof Error ? error.stack : undefined,
			name: error instanceof Error ? error.name : 'DisconnectError',
		});
	}
}

마지막으로 소켓 연결이 끊어질때 수행할 작업들에 대한 handleDisconnect 메서드를 작성하였습니다.

사용자가 소켓 연결을 끊는다면 임시저장하던 데이터들을 백업하고 영속성 데이터 저장소로 옮길 필요가 있습니다.

이를 위해 우선 자동으로 백업을 진행시켜주는 백업 크론을 없애고, 사용자가 작성 중이던 설문조사 데이터를 영구 저장소인 MongoDB로 이전 후, 임시 저장소인 SQLite에서 해당 데이터를 삭제합니다.

마치며

이렇게 해서 소켓 통신을 활용해 임시 저장 기능을 고도화하는 과정에서 마주했던 고민들과 이를 해결하기 위해 시도했던 다양한 방안들을 정리해보았습니다. 소켓을 연결하고, 끊고, 통신하고, 세션을 관리하고, 데이터를 동기화하는 과정에서 애플리케이션 관점에서 뿐만 아니라 다중환경에서 어떤 케이스들이 발생할지에 대해 고려하고 고민하면서 더 많이 배웠습니다.

이번 글에서 소켓 연결이나 해제에 있어서 백업에 대한 메서드들이 작성되어 있는 것을 확인할 수 있었는데요, 백업은 이러한 인메모리 데이터베이스를 사용하는데 있어서 가장 중요한 역할을 한다고 생각합니다.

다음 글에서는 이 백업에 대한 부분을 좀 더 자세히 다뤄보도록 하겠습니다.