동시성 프록시 서버 단계별 구현 및 테스트하기
이번 글에서는 반복형 프록시 서버 구현의 다음 단계로, 스레드 풀(Thread Pool)을 이용해 여러 클라이언트 요청을 동시에 처리하는 멀티스레드 기반 동시성 프록시 서버를 구현해 볼 건데요.
이전 글에서는 단계마다 대략적인 방향을 제시하고 마지막에 전체 코드를 제시하는 형태로 구성했었는데요. 난이도가 올라감에 따라 초심자도 쉽게 따라올 수 있도록 한 단계씩, 주석이 가득한 코드로 설명할 예정이니 안심하고 따라오셨으면 좋겠습니다.
동시성 프록시 서버 단계별로 구현하기
0단계: 목표 정리하기
클라이언트 요청을 받고 이를 스레드 풀에 있는 워커 스레드가 처리하는 멀티스레드 기반 동시성 프록시 서버 완성하기
- 서버 시작 시 고정된
N
개의 워커 스레드 생성 (스레드 풀) - 메인 스레드는
accept()
로 연결만 받고,connfd
를 작업 큐에 저장 - 워커 스레드는 큐에서
connfd
를 꺼내func()
실행
스레드 풀을 사용해야 하는 이유
요청마다 스레드를 만드는 식으로 멀티스레드 기반 동시성 서버를 손쉽게 구현할 수도 있는데요. 다만 이 경우에 CPU Context-switch 비용과 메모리 사용이 급증합니다. 반면 스레드 풀은 미리 만들어 둔 스레드를 재사용하므로, 지연 시간 감소와 예측 가능한 자원 사용이라는 장점이 있습니다.
또한, 이는 이후의 캐시 프록시 서버 구현(3단계)에 매우 적합한 구조입니다. 왜냐하면 다수의 클라이언트 요청이 동시에 들어올 경우, 캐시 접근 시 동기화 관리와 일관된 요청 처리 흐름이 중요해지는데, 스레드 풀 구조는 이러한 동시성 제어와 자원 재활용을 안정적으로 지원하기 때문입니다.
1단계: 스레드 풀 구조 이해하기
스레드를 필요할 때마다 새로 생성하는 것이 아니라, 서버 시작 시에 고정된 수의 스레드(N개)를 미리 만들어 두고, 작업 큐를 통해 요청을 분배하는 구조를 스레드 풀 구조라고 합니다.
1-1. 처리 흐름도
클라이언트 요청
↓
[ 메인 스레드: accept() ]
↓
[ 작업 큐 (connfd enqueue) ]
↓ ↓ ↓
[ 워커 1 ] [ 워커 2 ] … [ 워커 N ]
↓ ↓ ↓
func() func() func()
응답 전송 응답 전송 응답 전송
1-2. 동기화 포인트
- mutex : 작업 큐 접근 시 상호 배제(자물쇠)
- 조건변수 slots/items
- 큐가 가득 차면 메인 스레드는
slots
에서 대기 - 큐가 비어 있으면 워커는
items
에서 대기
- 큐가 가득 차면 메인 스레드는
2단계 작업 큐(Ring Buffer) 구현하기
해당 단계에서는 아래의 내용을 목표로 구현합니다.
- 요청을 저장하는
connfd
큐 만들기 (FIFO) enqueue()
/dequeue()
함수 구현- 큐 접근을 보호하기 위한
pthread_mutex_t
,pthread_cond_t
연동
2-1. 구조체 정의 (sbuf_t, shared buffer 구조체)
배열의 끝에 도달하면 처음으로 되돌아가는 순환 버퍼 구조를 이용해, 클라이언트 연결을 저장하고 이를 스레드 간에 안전하게 공유하는 작업 큐 구조체입니다.
typedef struct {
int *buf; // 연결 파일디스크립터(connfd) 저장 배열
int front; // 꺼낼 위치 (dequeue index)
int rear; // 넣을 위치 (enqueue index)
int n; // 버퍼 전체 크기
pthread_mutex_t mutex; // 큐 접근 동기화용
pthread_cond_t slots; // 빈 슬롯 발생 시 signal
pthread_cond_t items; // 아이템 도착 시 signal
} sbuf_t;
2-2. 초기화 함수 (sbuf_init)
작업 큐 구조체를 초기화하여 버퍼와 동기화 도구(mutex, 조건 변수)를 사용할 준비를 하는 함수입니다.
void sbuf_init(sbuf_t *sp, int n) {
sp->buf = calloc(n, sizeof(int)); // connfd 저장용 배열 할당
sp->n = n; // 버퍼 크기 저장
sp->front = sp->rear = 0; // 초기 인덱스는 0 (비어 있음)
pthread_mutex_init(&sp->mutex, NULL); // mutex 초기화
pthread_cond_init(&sp->slots, NULL); // 빈 슬롯 대기 조건 변수 초기화
pthread_cond_init(&sp->items, NULL); // 아이템 대기 조건 변수 초기화
}
2-3. 삽입 함수 (sbuf_insert, 메인 스레드에서 사용)
작업 큐에 클라이언트 연결(connfd
)을 추가하며, 큐가 가득 차 있을 경우 빈 슬롯이 생길 때까지 대기하는 함수입니다.
void sbuf_insert(sbuf_t *sp, int item) {
pthread_mutex_lock(&sp->mutex); // 큐 접근 mutext 잠금
while (((sp->rear + 1) % sp->n) == sp->front) { // 큐가 가득 찬 경우
pthread_cond_wait(&sp->slots, &sp->mutex); // 빈 슬롯이 생길 때까지 대기
}
sp->buf[sp->rear] = item; // connfd를 rear 위치에 저장
sp->rear = (sp->rear + 1) % sp->n; // rear 인덱스 증가 (원형 회전)
pthread_cond_signal(&sp->items); // 대기 중인 워커 스레드에게 작업이 생겼음을 알림
pthread_mutex_unlock(&sp->mutex); // mutex 잠금 해제
}
2-4. 제거 함수 (sbuf_remove, 워커 스레드에서 사용)
작업 큐에서 클라이언트 연결(connfd
)을 꺼내어 반환하며, 큐가 비어 있으면 아이템이 들어올 때까지 대기합니다.
int sbuf_remove(sbuf_t *sp) {
pthread_mutex_lock(&sp->mutex); // 큐 접근 잠금
while (sp->front == sp->rear) { // 큐가 비어있는 경우
pthread_cond_wait(&sp->items, &sp->mutex); // 아이템이 들어올 때까지 대기
}
int item = sp->buf[sp->front]; // front 위치의 connfd 가져오기
sp->front = (sp->front + 1) % sp->n; // front 인덱스 증가 (원형 회전)
pthread_cond_signal(&sp->slots); // 대기 중인 메인 스레드에게 공간이 생겼음을 알림
pthread_mutex_unlock(&sp->mutex); // 잠금 해제
return item; // connfd 반환
}
3단계 워커 스레드 함수 구현 및 메인 루프 변경하기
3-1. 워커 스레드 함수 (*thread)
클라이언트 연결을 반복적으로 처리하는 워커 스레드의 작업 루틴 함수로, 메인 함수에서 스레드 생성 시 이 함수를 루틴으로 등록하여 각 스레드가 요청을 처리하도록 합니다.
void *thread(void *vargp) {
pthread_detach(pthread_self());
while (1) {
int connfd = sbuf_remove(&sbuf); // 작업 꺼냄
func(connfd); // 요청 처리
close(connfd); // 소켓 닫기
}
}
3-2. 메인 함수
메인 함수는 서버 실행 시 정해진 수(N)의 워커 스레드를 생성하고, 클라이언트의 연결 요청을 수락할 때마다 해당 연결의 파일 디스크립터(connfd
)를 작업 큐에 넣습니다. 이후 대기 중이던 워커 스레드가 큐에서 connfd
를 꺼내 클라이언트 요청을 처리합니다.
#define NTHREADS 4
#define SBUFSIZE 16
sbuf_t sbuf;
int main(int argc, char **argv) {
int listenfd, connfd;
socklen_t clientlen;
struct sockaddr_storage clientaddr;
sbuf_init(&sbuf, SBUFSIZE); // 작업 큐 초기화
pthread_t tid;
for (int i = 0; i < NTHREADS; i++)
pthread_create(&tid, NULL, thread, NULL); // 워커 생성
listenfd = Open_listenfd(argv[1]);
while (1) {
clientlen = sizeof(clientaddr);
connfd = Accept(listenfd, (SA *)&clientaddr, &clientlen);
sbuf_insert(&sbuf, connfd); // 큐에 요청 넣기
}
return 0;
}
최종 구현 코드
#include <stdio.h>
#include "csapp.h"
/* Recommended max cache and object sizes */
#define MAX_CACHE_SIZE 1049000
#define MAX_OBJECT_SIZE 102400
#define NTHREADS 4
#define SBUFSIZE 16
typedef struct {
int *buf; // connfd 저장 배열
int front; // dequeue 인덱스
int rear; // enqueue 인덱스
int n; // 현재 들어있는 아이템 수
pthread_mutex_t mutex; // 큐 접근 mutex
pthread_cond_t slots; // 빈 슬롯이 생겼을 때 signal
pthread_cond_t items; // 아이템이 추가되었을 때 signal
} sbuf_t; // 작업 큐 구조체
void *thread(void *vargp);
void read_requesthdrs(rio_t *rp);
int parse_uri(char *uri, char*host, char *port, char *path);
void func(int connfd);
// 스레드 풀 함수
void sbuf_init(sbuf_t *sp, int n); // 큐 초기화
void sbuf_insert(sbuf_t *sp, int item); // connfd 저장 (enqueue)
int sbuf_remove(sbuf_t *sp); // connfd 꺼내기 (dequeue)
/* You won't lose style points for including this long line in your code */
static const char *user_agent_hdr =
"User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:10.0.3) Gecko/20120305 "
"Firefox/10.0.3\r\n";
sbuf_t sbuf;
int main(int argc, char **argv) {
int listenfd, connfd;
socklen_t clientlen;
struct sockaddr_storage clientaddr;
/* Check command line args */
if (argc != 2)
{
fprintf(stderr, "usage: %s <port>\n", argv[0]);
exit(1);
}
sbuf_init(&sbuf, SBUFSIZE); // 작업 큐 초기화
// 워커 스레드 생성
pthread_t tid;
for (int i = 0; i < NTHREADS; i++) {
pthread_create(&tid, NULL, thread, NULL);
}
// 메인 스레드: 클라이언트 연결 수락 및 큐에 삽입
listenfd = Open_listenfd(argv[1]);
while (1) {
clientlen = sizeof(clientaddr);
connfd = Accept(listenfd, (SA *)&clientaddr, &clientlen);
sbuf_insert(&sbuf, connfd); // connfd를 큐에 삽입
}
return 0;
}
void func(int connfd) {
rio_t client_rio, server_rio;
char buf[MAXLINE], req[MAX_OBJECT_SIZE];
char method[MAXLINE], uri[MAXLINE], version[MAXLINE];
char host[MAXLINE], port[10], path[MAXLINE];
Rio_readinitb(&client_rio, connfd);
// 1. 요청 라인 먼저 읽기
if (!Rio_readlineb(&client_rio, buf, MAXLINE)) return;
sscanf(buf, "%s %s %s", method, uri, version);
if (parse_uri(uri, host, port, path) == -1) {
fprintf(stderr, "올바른 URI가 아닙니다: %s\n", uri);
return;
}
// 요청 라인 → path와 HTTP/1.0으로 변경
sprintf(req, "GET %s HTTP/1.0\r\n", path);
// 2. 헤더 읽기 및 필터링
while (Rio_readlineb(&client_rio, buf, MAXLINE) > 0 && strcmp(buf, "\r\n") != 0) {
if (strncasecmp(buf, "Host", 4) == 0 ||
strncasecmp(buf, "User-Agent", 10) == 0 ||
strncasecmp(buf, "Connection", 10) == 0 ||
strncasecmp(buf, "Proxy-Connection", 16) == 0) {
continue;
}
strcat(req, buf); // 유효한 헤더는 저장
}
// 표준 헤더 추가
sprintf(buf, "Host: %s\r\n", host); strcat(req, buf);
sprintf(buf, "%s", user_agent_hdr); strcat(req, buf);
sprintf(buf, "Connection: close\r\n"); strcat(req, buf);
sprintf(buf, "Proxy-Connection: close\r\n\r\n"); strcat(req, buf);
printf("최종 요청:\n%s\n", req);
// 원 서버 Tiny와 연결 + 요청 전송
int serverfd = Open_clientfd(host, port);
if (serverfd < 0) {
fprintf(stderr, "원 서버 연결 실패\n");
return;
}
Rio_readinitb(&server_rio, serverfd);
// 요청 전체 전송
Rio_writen(serverfd, req, strlen(req));
/* --- 응답 헤더 전달(라인 별) --- */
int n;
while ((n = Rio_readlineb(&server_rio, buf, MAXLINE)) > 0) {
Rio_writen(connfd, buf, n);
if (strcmp(buf, "\r\n") == 0) break; // 헤더 끝 감지
}
/* --- 응답 바디 전달(바이트 스트림 전체) --- */
while ((n = Rio_readnb(&server_rio, buf, MAXBUF)) > 0) {
Rio_writen(connfd, buf, n);
}
Close(serverfd);
}
int parse_uri(char *uri, char*host, char *port, char *path) {
// "http://www.example.com:8000/index.html"
char *hostbegin, *hostend, *pathbegin, *portbegin;
if (strncasecmp(uri, "http://", 7) != 0) { // 대소문자 구분 없이 접두어 확인
return -1;
}
// 파일 경로 찾기
hostbegin = uri + 7; // http:// 이후부터 시작
pathbegin = strchr(hostbegin, '/');
if (pathbegin) { // '/'가 있다면
strcpy(path, pathbegin); // 경로에 이후 내용(파일 경로)을 저장
*pathbegin = '\0'; // '/' 이전 이후로 문자열 분리
} else {
strcpy(path, "/"); // '/'가 없다면 path에 '/'를 저장
}
// 포트 번호 찾기
portbegin = strchr(hostbegin, ':');
if (portbegin) { // ':'가 있다면
*portbegin = '\0'; // ':'를 기준으로 host시작부를 분리
strcpy(host, hostbegin);
strcpy(port, portbegin + 1);
} else {
strcpy(host, hostbegin); // ':'가 없다면 전부 호스트 문자열
strcpy(port, "80"); // 기본 포트 설정
}
return 0;
}
void sbuf_init(sbuf_t *sp, int n) {
sp->buf = Calloc(n, sizeof(int)); // connfd 저장용 배열 할당
sp->n = n; // 버퍼 크기 저장
sp->front = sp->rear = 0; // 초기 인덱스 0
pthread_mutex_init(&sp->mutex, NULL); // mutex 초기화
pthread_cond_init(&sp->slots, NULL); // 빈 슬롯 대기 조건 변수 초기화
pthread_cond_init(&sp->items, NULL); // 아이템 대기 조건 변수 초기화
}
void sbuf_insert(sbuf_t *sp, int item) {
pthread_mutex_lock(&sp->mutex); // 큐 접근 mutext 잠금
while (((sp->rear + 1) % sp->n) == sp->front) { // 큐가 가득 찬 경우
pthread_cond_wait(&sp->slots, &sp->mutex); // 빈 슬롯이 생길 때까지 대기
}
sp->buf[sp->rear] = item; // connfd를 rear 위치에 저장
sp->rear = (sp->rear + 1) % sp->n; // rear 인덱스 증가 (원형 회전)
pthread_cond_signal(&sp->items); // 대기 중인 워커 스레드에게 작업이 생겼음을 알림
pthread_mutex_unlock(&sp->mutex); // mutex 잠금 해제
}
int sbuf_remove(sbuf_t *sp) {
pthread_mutex_lock(&sp->mutex); // 큐 접근 잠금
while (sp->front == sp->rear) { // 큐가 비어있는 경우
pthread_cond_wait(&sp->items, &sp->mutex); // 아이템이 들어올 때까지 대기
}
int item = sp->buf[sp->front]; // front 위치의 connfd 가져오기
sp->front = (sp->front + 1) % sp->n; // front 인덱스 증가 (원형 회전)
pthread_cond_signal(&sp->slots); // 대기 중인 메인 스레드에게 공간이 생겼음을 알림
pthread_mutex_unlock(&sp->mutex); // 잠금 해제
return item; // connfd 반환
}
void *thread(void *vargp) {
pthread_detach(pthread_self()); // 스레드 자원 자동 회수
while (1) {
int connfd = sbuf_remove(&sbuf); // 작업 큐에서 connfd 꺼내기
func(connfd); // 요청 처리 함수 호출
close(connfd); // 클라이언트와의 연결 종료
}
}
테스트 진행하기
동시성 프록시 서버의 구현이 끝났다면 #Concurrency
부터 concurrencyScore
까지의 주석을 해제한 뒤, 이전과 동일하게 driver.sh
를 실행하면 됩니다.
그런데 만약 Timeout이 발생하면서 테스트가 정상적으로 진행되지 않는다면 nop-server.py 파일의 CRLF(윈도우 줄바꿈) 문제일 수 있으니 [해당 글]을 참고해 주시길 바랍니다. 저도 Timeout 문제가 발생했었는데, 해당 방법을 통해 문제 해결 후 정상적으로 테스트 점수를 받을 수 있었습니다.
스레드 풀 구조를 도입하면, 반복형 프록시의 병목을 해결하고 보다 안정적인 서비스를 만들 수 있는데요. 다음 글에서는 캐시와 LRU 정책을 도입해 더 빠르고 똑똑한 프록시 서버로 발전시켜볼 예정입니다. 읽어주셔서 감사합니다!
© 2025 정경호 · 학습 공유용
'크래프톤 정글 > Code 정글(C언어)' 카테고리의 다른 글
[WebProxy] 캐시 기능이 추가된 동시성 프록시 서버 단계별 구현 및 테스트하기 (0) | 2025.05.07 |
---|---|
[VSCode] 내가 쓰는 VSCode 유용한 단축키 모음 (0) | 2025.05.06 |
[WebProxy] 반복형 프록시 서버 단계별 구현 및 테스트하기 (0) | 2025.05.05 |
[WebProxy] Proxy & Tiny 서버 자동 실행 스크립트: run_proxy.sh (2) | 2025.05.05 |
[WebProxy] Bash 스크립트 "cannot execute: required file not found" 에러 해결 방법 (0) | 2025.05.05 |