개요
지금 학습 흐름이 아래와 같다.
- IoC & DIP & DI
- FrameWork의 DI Container로 여러 의존성 관리
- 싱글톤 패턴 & DI Container 직접 만들어서 사용하기
이 흐름에 따라서, 이번에는 DI Container를 직접 구현해서 사용할 때, 비동기 테스크 큐들이 어떤 이점을 누릴 수 있는지 살펴보자.
- 먼저 비동기 테스크 큐와 파이썬 생태계의 대표적 라이브러리인 Celery를 알아보고,
- 규모가 큰 분산 환경 프로젝트에서 Celery와 같은 비동기 테스크 큐를
- DI Container에 등록하는 방법과 이점을 살펴볼 것이다.
1. 비동기 테스크 큐
1.1 비동기 테스크 큐란?
쉽게 말해서 지금 당장 처리하지 않아도 되는 일을, 나중에 따로 처리하게 만드는 시스템이다.

백엔드 서버는 보통 아래와 같은 구조다.
- 클라이언트가 HTTP 요청 보냄
- 서버가 요청 처리
- 응답 반환
⚫️ 그런데 어떤 작업들은...
- 오래 걸림 (예: 이미지 리사이즈, 영상 인코딩, 대용량 CSV 처리)
- 외부 API를 여러 번 호출 (결제 정산, 써드파티 연동)
- 꼭 실시간일 필요 없음 (알림 발송, 이메일 발송)
⚫️ 이들을 기존 구조로 처리하면 :
- 사용자는 쓸데없이 오래 기다림
- 웹 서버 스레드를 금방 소진해서 전체 서비스가 느려짐
=> 따라서 위와 같은 작업들은 큐에 넣어두고 나중에 따로 처리하게 한다!
1.2 동작 구조(아주 기본적인)
- 웹 서버(Producer)
- 사용자의 요청을 받음
- 해당 작업은 할일 목록(큐)에 넣어두고, 바로 응답을 돌려줌
- 큐(메시지 브로커)
- 할일 목록
- Redis, RabbitMQ, SQS 등
- 작업 하나하나가 메시지처럼 저장됨
- 워커(Consumer)
- 메시지 브로커 큐에서 할일을 하나씩 처리함
1.3 장점
- 오래 걸리는 작업은 워커 서버에 위임 하고, 사용자에게는 빠른 응답을 제공해서 서비스 품질을 높인다.
- ex. 메일 전송, 알림, 로그 등의 작업
- 요청 처리 시간이 큐에 넣는 시간 정도로 짧아진다.
- 트래픽이 많아질 때는 워커 수만 스케일 아웃하면 되므로, 적은 비용으로 많은 효과를 누릴 수 있다.
2. 파이썬 생태계의 작업 큐, Celery
Celery는 Python 언어로 작성된, 분산시스템에서 복잡한 작업들을 효과적으로 정의하고 처리할 수 있도록 도와주는 비동기 작업 큐 라이브러리다.
2.1 간단한 코드 예시
- celery task 정의
from celery import Celery
app = Celery(
"my_app",
broker="redis://localhost:6379/0", # 브로커 (예: Redis)
backend="redis://localhost:6379/1", # 결과 저장용 (옵션)
)
# celery 환경의 테스크임을 표현하는 데코레이터
@app.task
def resolve_domain(domain: str):
# dnspython으로 dns 조회 로직...
# redis 저장...
return {"domain": domain, "ip": "1.2.3.4"}
- HTTP API에서 활용
from fastapi import FastAPI
from app import resolve_domain
app = FastAPI()
@app.post("/lookup")
def lookup(domain: str):
# 비동기로 테스크 큐에 넣기
task = resolve_domain.delay(domain)
return {"task_id": task.id}
결과적으로 `/lookup` 호출은 금방 응답하고, 실제 DNS 조회는 Celery 워커가 백그라운드에서 처리한다.
2.2 Celery 개념들
⚫️ Task
사용자가 비동기로 처리하려는 작업 그자체(메소드)다. 함수 단위로 정의되는 작업들을 Celery의 라이프사이클로 싣기 위해서 데코레이터 표현으로 정의할 수 있다.
from celery import Celery
app = Celery(__name__)
# celery 데코레이터 표현
@app.task
def simple_task():
return hello
⚫️ Task Signature
`.apply()`, `.delay()` 등의 표현으로 메시지 브로커에 작업을 보내는 코드를 작성하게 되는데, 이러한 코드는 내부적으로 다음과 같은 처리 과정을 거친다.
- `add.delay(1, 2)` 호출
- 내부에서 `add.s(1, 2)`같은 Signature 객체 만듦(또는 재사용)
- 그 시그니처를 기반으로 메시지를 직렬화(보통 JSON)해서
- 브로커에 발행
- 워커가 그 메시지를 가져와서 역직렬화 + 해당 함수 실행
=> Celery Signature 프로토콜에 맞게 작업 요청 메시지를 작성해서 브로커로 보내면, 이 언어가 무엇이든 Celery 워커가 받아서 작업을 처리할 수 있다.
⚫️ Kombu
Celery는 이벤트 메시지 포맷, 워커 로직에만 집중한다.
실제 브로커와의 통신은 kombu의 trasport 시스템이 맡아서, 다양한 브로커를 URL만 바꿔 끼우면 되는 수준으로 추상화 해준다.
- kombu: 여러 브로커(RabbitMQ, Redis, SQS...)를 같은 방식으로 다룰 수 있게 해주는 플러그블 메시징 레이어
GitHub - celery/kombu: Messaging library for Python.
Messaging library for Python. Contribute to celery/kombu development by creating an account on GitHub.
github.com
⚫️ 작업 컨트롤
- Rate Limiting: 워커가 특정 테스크를 수행하는 것에 대한 속도를 조절할 수 있다.
- Schedular[ETA 방식, Countdown 방식]: 원하는 시간에 작업이 시작될 수 있도록 한다.
- Retry: 작업이 실패하면 적절한 Backoff 메커니즘을 가지고 재시도를 수행한다.
- Expiration: 오래 쌓여있던 작업들은 처리하지 않고 만료시킨다.
- Task Routing: 작업들을 격리하거나 우선순위를 위한 큐를 정의하고, 작업들을 라우팅한다.
- Canvas: Backend를 활용해 작업 결과들을 파이프라이닝해서 처리할 수 있는 메커니즘을 제공해 준다.
3. DI Container와 비동기 테스크 큐
지금 필자는 회사에서 클라우드 자원 관리 서비스에 기여하고 있는데, 비즈니스 특성 상 클라우드 자원 관리 작업이 복잡하고 오래 걸리는 경우가 많다. 그래서 클라우드 리소스들을 프로비저닝하는 작업은 Celery로 관리되는데... 이 예시로 해당 주제를 풀어보자.
3.1 웹 애플리케이션과 워커의 프로세스

FastAPI App과 Celery Worker가 처리하는 것은 완전 다른 별개의 프로세스다. Worker 프로세스는 FastAPI 프로세스의 메모리에 접근할 수 없으므로, 각자 초기화해야 한다. 하지만 프로젝트 전체를 관통하는 CoreDIContainer 클래스와 같은 설정을 사용해야 하므로 코드 레벨에서 통일성을 보장한다.
3.2 웹 App과 워커가 같은 설정을 사용해야 하는 이유
FastAPI는 사용자 요청을 받아서 "무엇을 할지" 기록하고, Celery Worker는 실제로 클라우드에서 "그 작업을 수행"한다.
예를 들어, 사용자가 "alpha 환경에 S3 버킷을 만들어주세요"라고 요청하면, FastAPI는 이 요청을 DB에 기록한다. 그 후 Celery Worker가 이 기록을 보고 AWS에 실제 버킷을 생성한다.

그런데 이때, 애플리케이션과 Celery Worker의 설정이 다르면 DB에는 Alpha 환경에 버킷 생성이라고 되어있지만, 실제로는 Prod 환경에 버킷이 만들어진다. 시스템이 알고있는 정보와 클라우드 상태가 달라지는 것이다. 비용 청구도 잘못되고, 보안 문제도 발생할 수 있다.
따라서 FastAPI와 Celery Worker가 다른 프로세스에서 실행되더라도, 반드시 같은 Settings를 기반으로 DIContainer를 초기화해야 한다. 같은 AWS 계정에 접근하고, 같은 DB를 사용하며, 같은 환경 정책을 적용해야 전체 시스템이 일관되게 동작한다.
1) Broker Mode일 때의 DI Container 주입: 초기화 클래스 정의
- 프로젝트가 실행될 때, 가장 먼저 공용 커스텀 DI Container를 초기화하고, 이 객체가 Celery App 생성에 사용되도록 전달한다.
@asynccontextmanager
async def lifespan(app: FastAPI):
core = KrpCore(app.state.settings) # 1. DI Container 초기화
celery = create_celery_app(app.state.settings, core=core) # 2. Celery에 core 전달
core.load_services(celery) # 3. 서비스들 초기화
app.state.core = core # 4. FastAPI state에 저장
yield
- 이 구조에 따라서 Celery로 처리되는 프로비저닝 작업들은 AWS Client Map이나 DB 세션 메이커 등 필요한 의존성 항목들이 기대한 대로 준비된다.
class LoadKrpCoreStep(bootsteps.StartStopStep):
"""Celery Worker가 시작될 때 공용 DI Container를 초기화 한다.
- 공용 컨테이너 초기화,
- celery app state에 KrpCore(공용 컨테이너) 셋업
- Fast API 구동과 동일한 접근성을 보장하기 위함.
- fastapi.state.aws
"""
requires = {"celery.worker.components:Pool"}
def create(self, parent, **kwargs):
logger.info("Prepare KrpCore")
app.state.core = KrpCore(settings)
2) Eager Mode에서의 DI Container 주입: 초기화
Eager 모드에서는 Worker 프로세스가 없으므로, Worker에서처럼 공용 컨테이너 초기화 로직이 실행되지 않는다. 따라서 FastAPI가 전달한 core를 통해 의존성을 처리하게 된다.
- 프로젝트가 실행될 때, 가장 먼저 공용 커스텀 DI Container를 초기화하고, 이 객체가 Celery App 생성에 사용되도록 전달한다.
@asynccontextmanager
async def lifespan(app: FastAPI):
core = KrpCore(app.state.settings) # 1. DI Container 초기화
celery = create_celery_app(app.state.settings, core=core) # 2. Celery에 core 전달
core.load_services(celery) # 3. 서비스들 초기화
app.state.core = core # 4. FastAPI state에 저장
yield
- Celery App을 초기화할 때, 전달받은 공용 DI Container 객체를 사용한다.
- Worker 프로세스들이 FastAPI App과 동일한 접근성을 보장하기 위함이다.
def create_app(settings: KrpSettings, core: KrpCore | None = None):
app = Celery()
app.state = State()
app.state.core = core # FastAPI가 준 core를 celery.state에 저장!
이렇게 비동기 큐와 인프라 리소스 프로비저닝 & 비동기 큐 활용 및 프로젝트 관리법을 알아보았다..! 하나하나 따라가느라 시간도 많이 오래걸리고 어려웠지만 앞으로 작업에 많은 도움이 될 학습이었다.
'Dev > Backend' 카테고리의 다른 글
| [BackEnd] 싱글톤 패턴 & DI Container 직접 만들어서 사용하기 (0) | 2025.11.09 |
|---|---|
| [BackEnd] Framework 이해하기 - DI Container로 여러 의존성 관리 (0) | 2025.11.03 |
| [BackEnd] IoC & DIP & DI (0) | 2025.10.20 |
| [BackEnd] 커뮤니티 게시물 목록 조회 API 쿼리를 QueryDsl로 구현해보기 (0) | 2024.10.23 |
| [BackEnd] API의 멱등성을 고려하여 개발하기 (0) | 2024.04.26 |