이전 포스팅 : Cloud-Monitoring으로 VM인스턴스 로그 분석하기
buildabetterworld.tistory.com/124?category=859595
이전 포스팅에 이어서, 이번에는 팀원들과 사이드 프로젝트로 진행중인 플랫폼의 Notification + Chatting server에 들어갈 Celery에 대해 포스팅 하려고 한다.
[ Celery? ]
Celery는 Python에서 비동기 작업을 위해 사용한다. 기본적으로 분산 메세지 전달을 기반으로 한 비동기식 Task Queue 라고 정의할 수 있다. 그리고 AMQP(Advanced Message Queueing Protocol)를 기반으로 만들어졌기 때문에 '최소한 한번은 전달된다'는 아이디어를 내포하고 있다. 아래 공식 레퍼런스를 보면 셀러리의 전반적인 내용을 확인 할 수 있다.
그중에서 핵심이 될만한 내용을 간추려서 가져와 봤다.
A Celery system can consist of multiple workers and brokers, giving way to high availability and horizontal scaling.
( 여기에서 셀러리 시스템은 여러 Worker와 Broker들로 이루어져 있고, 고가용성과 수평적 확장을 제공하는 것을 알 수 있다.)
[ Celery의 구성요소 ]
Celery communicates via messages, usually using a broker to mediate between clients and workers.
( 셀러리의 핵심적인 구성들이 전부 등장했다. 하나씩 정리해 보면 다음과 같다 )
- Messages = 작업
- Broker = 메세지를 전달하는 놈
- Worker = 작업을 수행하는 놈
- Client = 작업을 요청하는 놈
이제 위의 구성품들이 Flask상에서 전체적으로 어떻게 동작하는지를 보면 아래 그림과 같다. 먼저 Client에서 Flask에게 작업을 요청하면, 이 작업들(Messages)이 Message Broker로 이동한다. 이 Broker에는 RabbitMQ나 Redis, 요즘 관심을 가지고 있는 Kafka 와 같은 메세지 큐를 사용하고 있으며 각각의 메세지 Broker에 대한 자세한 내용들은 추후에 추자적으로 포스팅하도록 하겠다. 본론으로 돌아와서, Broker에 쌓인 task들은 다시 Celery의 worker로 이동하게 된다. 그러면 worker들은 할당받은 작업을 처리해, 그 결과를 DB에 저장하고, Flask에서는 그 결과를 가지고 다시 Client에 뿌려주는 수순이다.
... Document 내용 중...
To initiate a task the client adds a message to the queue, the broker then delivers that message to a worker. A task queue’s input is a unit of work called a task. Dedicated worker processes constantly monitor task queues for new work to perform. Task queues are used as a mechanism to distribute work across threads or machines. Celery requires a message transport to send and receive messages. The RabbitMQ and Redis broker transports are feature complete, but there’s also support for a myriad of other experimental solutions, including using SQLite for local development.
[ Celery 사용법 ]
Celery를 사용하는 방법을 요약해보면 다음과 같다. 1) 먼저 Message Broker를 선택해서 설치한다. 1) Celery를 설치하고, task를 생성한다. 3) worker를 수행시키고, task를 호출한다. 4) task에 대한 결과값을 지속적으로 모니터링하면서 동작과정을 감시한다.
- Choosing and installing a message transport (broker).
- Installing Celery and creating your first task.
- Starting the worker and calling tasks.
- Keeping track of tasks as they transition through different states, and inspecting return values
[ Celery 사용예시 ]
그러면 이제 실제로 Celery를 적용시켜보도록 하자.
1. 먼저 가상환경을 만들고, 들어가서 셀러리를 설치한다.
python3 -m venv CeleryTest
pip install Celery
2. 그다음 Broker로 RabbitMQ를 설치한다.
로컬에 설치해도 되지만 heroku라는 Paas 사이트에서 제공되는 RabbitMQ를 사용해 보려고 한다.
devcenter.heroku.com/articles/heroku-cli
먼저 Heroku 에서 제공하는 CLI를 이용하기 위해 터미널에 Heroku CLI를 설치하자 맥 유저를 위한 Brew커맨드는 다음과 같다.
brew tap heroku/brew && brew install heroku
설치가 완료되면 로그인 하자. 클라우드환경에서 인스턴스에 접근하기 위해 CLI를 사용하는 것과 동일한 개념이다.
heroku login -i
이렇게 접근해서 RabbitMQ를 설치할 수 도 있고, 다음과 같이 CloudAMQP를 이용하면 간편하게 배포할 수 있다.
이렇게 생성하고 나면 다음과 같은 결과화면이 보인다.
이제 CloudAMQP에 접속하면 다음과 같이 우리가 이용할 AMQP URL을 발견할 수 있다.
해당 URL을 복사해 두고 진행하도록 하자.
3. Celery 실행파일 만들기
이제 실행할 Celery 실행파일을 만든다. 다음과 같이 app이름과 broker를 지정해주고(로컬의 rabbit,redis의 경우에는 localhost를 사용하면 된다.)
# amqp url: amqps://bolerruj:aLYjDUCN************VZ@jaguar.rmq.************erruj
from celery import celery
from time import sleep
# 앱 이름과 broker 설정(heroku상의 rabbitmq url)
app = celery('tasks', broker='amqps://bolerruj:aLY*************-EeMMLjd15VZ@jaguar.rmq.cl*****rruj')
@app.task
def reverse(text):
sleep(3)
return text[::-1]
4. 실행 및 테스트
실제로 이제 실행을 시켜보도록 하자. 먼저 일반적인 명령어로 위의 파일을 실행시켜보면 다음과 같이 잘 동작하는 것을 볼 수 있다.
이번엔 이어서 다음 커맨드로 celery로 실행을 시켜보도록 하자.
celery -A tasks worker --loglevel=info
그러면 다음과 같이 동작과정들을 지켜볼 수 있게 된다.
그럼 이 상태에서 큐에 작업을 넘겨보도록 하자. 이번에는 그냥 reverse함수를 호출하지 말고, reverse.delay()로 전달한다. 그러면 아래 화면의 하단에서 확인할 수 있듯이, 비동기에 대한 결과가 바로 전송이 되었고, celery쪽을 확인해보면 대략 3초의 delay(코드에서 적용한)가 발생한 다음 succeeded가 뜨면서 결과가 나오는 것을 알 수 있다.
그래서 빠른 시간에 4개의 task를 부여하면, 비동기 작업에 대한 응답은 실행 즉시 리턴되고, 대략 3초의 delay뒤에 큐에 결과가 나타나는 것을 알 수 있다!!
5. 데이터베이스에 결과 저장
이제 마지막으로 데이터베이스에 결과를 저장해보도록 하자! 아까 app = Celery부분에서 backend부분을 추가해 준다. 간단하게 sqlite를 사용했지만 Redis를 사용해도 무방하다.
app = Celery('tasks', broker='amqps://bole*********************qp.com/bolerruj', backend='db+sqlite///db.sqlite3')
그리고 다시 실행시킨 다음, 각 reverse함수에 대한 리턴값을 찍어보면 작업상태도 알 수 있다.
최종적으로 sqlite에 저장된 결과들을 확인해 보면, 작업한 결과들이 무사히 저장되어 있는 것을 알 수 있다!
다음 포스팅에서는 Celery를 Flask에 적용시켜 보도록 하겠다!