본문 바로가기

[Python]/[Flask & Django]

[Celery] Side-Project(Ant Platform) - 1. Celery + RabbitMQ 간단실습

728x90

이전 포스팅 : Cloud-Monitoring으로 VM인스턴스 로그 분석하기
buildabetterworld.tistory.com/124?category=859595

 

[GCP] Cloud Monitoring 로 VM 인스턴스 로그 수집하기 - 2. 부하테스트(Locust)

[ 부하테스트 ] 다음은 지정한 URL로 부하테스트를 해보려고 한다. 사용할 툴은 바로 Locust! Python으로 스트립트를 작성하기도 하고, 또 사용하기 쉽다고 해서 바로 써보기로 했다. 정식 document는 다

buildabetterworld.tistory.com

  이전 포스팅에 이어서, 이번에는 팀원들과 사이드 프로젝트로 진행중인 플랫폼의 Notification + Chatting server에 들어갈 Celery에 대해 포스팅 하려고 한다.

 

[ Celery? ]

    Celery는 Python에서 비동기 작업을 위해 사용한다. 기본적으로 분산 메세지 전달을 기반으로 한 비동기식 Task Queue 라고 정의할 수 있다. 그리고 AMQP(Advanced Message Queueing Protocol)를 기반으로 만들어졌기 때문에 '최소한 한번은 전달된다'는 아이디어를 내포하고 있다. 아래 공식 레퍼런스를 보면 셀러리의 전반적인 내용을 확인 할 수 있다. 

 

Introduction to Celery — Celery 5.0.5 documentation

This document describes the current stable version of Celery (5.0). For development docs, go here. Introduction to Celery Task queues are used as a mechanism to distribute work across threads or machines. A task queue’s input is a unit of work called a t

docs.celeryproject.org

그중에서 핵심이 될만한 내용을 간추려서 가져와 봤다.

A Celery system can consist of multiple workers and brokers, giving way to high availability and horizontal scaling.
( 여기에서 셀러리 시스템은 여러 Worker와 Broker들로 이루어져 있고, 고가용성과 수평적 확장을 제공하는 것을 알 수 있다.)

 

[ Celery의 구성요소 ]

    Celery communicates via messages, usually using a broker to mediate between clients and workers.
    ( 셀러리의 핵심적인 구성들이 전부 등장했다. 하나씩 정리해 보면 다음과 같다 )

  • Messages = 작업
  • Broker = 메세지를 전달하는 놈
  • Worker = 작업을 수행하는 놈
  • Client = 작업을 요청하는 놈

    이제 위의 구성품들이 Flask상에서 전체적으로 어떻게 동작하는지를 보면 아래 그림과 같다. 먼저 Client에서 Flask에게 작업을 요청하면, 이 작업들(Messages)이 Message Broker로 이동한다. 이 Broker에는 RabbitMQ나 Redis, 요즘 관심을 가지고 있는 Kafka 와 같은 메세지 큐를 사용하고 있으며 각각의 메세지 Broker에 대한 자세한 내용들은 추후에 추자적으로 포스팅하도록 하겠다. 본론으로 돌아와서, Broker에 쌓인 task들은 다시 Celery의 worker로 이동하게 된다. 그러면 worker들은 할당받은 작업을 처리해, 그 결과를 DB에 저장하고, Flask에서는 그 결과를 가지고 다시 Client에 뿌려주는 수순이다. 

,

 ... Document 내용 중...
To initiate a task the client adds a message to the queue, the broker then delivers that message to a worker. A task queue’s input is a unit of work called a task. Dedicated worker processes constantly monitor task queues for new work to perform.
Task queues are used as a mechanism to distribute work across threads or machines. Celery requires a message transport to send and receive messages. The RabbitMQ and Redis broker transports are feature complete, but there’s also support for a myriad of other experimental solutions, including using SQLite for local development.

 

[ Celery 사용법 ]

Celery를 사용하는 방법을 요약해보면 다음과 같다. 1) 먼저 Message Broker를 선택해서 설치한다. 1) Celery를 설치하고, task를 생성한다. 3) worker를 수행시키고, task를 호출한다. 4) task에 대한 결과값을 지속적으로 모니터링하면서 동작과정을 감시한다.

  • Choosing and installing a message transport (broker).
  • Installing Celery and creating your first task.
  • Starting the worker and calling tasks.
  • Keeping track of tasks as they transition through different states, and inspecting return values

 

[ Celery  사용예시 ]

그러면 이제 실제로 Celery를 적용시켜보도록 하자.

1. 먼저 가상환경을 만들고, 들어가서 셀러리를 설치한다. 

python3 -m venv CeleryTest 
pip install Celery 


2. 그다음 Broker로 RabbitMQ를 설치한다.

로컬에 설치해도 되지만 heroku라는 Paas 사이트에서 제공되는 RabbitMQ를 사용해 보려고 한다. 
devcenter.heroku.com/articles/heroku-cli

 

The Heroku CLI | Heroku Dev Center

Last updated October 28, 2020 The Heroku Command Line Interface (CLI) makes it easy to create and manage your Heroku apps directly from the terminal. It’s an essential part of using Heroku. Download and install The Heroku CLI requires Git, the popular ve

devcenter.heroku.com

먼저 Heroku 에서 제공하는 CLI를 이용하기 위해 터미널에 Heroku CLI를 설치하자 맥 유저를 위한 Brew커맨드는 다음과 같다.

brew tap heroku/brew && brew install heroku

설치가 완료되면 로그인 하자. 클라우드환경에서 인스턴스에 접근하기 위해 CLI를 사용하는 것과 동일한 개념이다.

heroku login -i

이렇게 접근해서 RabbitMQ를 설치할 수 도 있고, 다음과 같이 CloudAMQP를 이용하면 간편하게 배포할 수 있다. 

이렇게 생성하고 나면 다음과 같은 결과화면이 보인다.

이제 CloudAMQP에 접속하면 다음과 같이 우리가 이용할 AMQP URL을 발견할 수 있다.

해당 URL을 복사해 두고 진행하도록 하자.

3. Celery 실행파일 만들기

   이제 실행할 Celery 실행파일을 만든다. 다음과 같이 app이름과 broker를 지정해주고(로컬의 rabbit,redis의 경우에는 localhost를 사용하면 된다.)

# amqp url:  amqps://bolerruj:aLYjDUCN************VZ@jaguar.rmq.************erruj 

from celery import celery
from time import sleep

# 앱 이름과 broker 설정(heroku상의 rabbitmq url)
app = celery('tasks', broker='amqps://bolerruj:aLY*************-EeMMLjd15VZ@jaguar.rmq.cl*****rruj')

@app.task
def reverse(text):
	sleep(3)
    return text[::-1]
    


4. 실행 및 테스트

실제로 이제 실행을 시켜보도록 하자. 먼저 일반적인 명령어로 위의 파일을 실행시켜보면 다음과 같이 잘 동작하는 것을 볼 수 있다.

이번엔 이어서 다음 커맨드로 celery로 실행을 시켜보도록 하자.

celery -A tasks worker --loglevel=info

그러면 다음과 같이 동작과정들을 지켜볼 수 있게 된다. 

    그럼 이 상태에서 큐에 작업을 넘겨보도록 하자. 이번에는 그냥 reverse함수를 호출하지 말고, reverse.delay()로 전달한다. 그러면 아래 화면의 하단에서 확인할 수 있듯이, 비동기에 대한 결과가 바로 전송이 되었고, celery쪽을 확인해보면 대략 3초의 delay(코드에서 적용한)가 발생한 다음 succeeded가 뜨면서 결과가 나오는 것을 알 수 있다.

    그래서 빠른 시간에 4개의 task를 부여하면, 비동기 작업에 대한 응답은 실행 즉시 리턴되고, 대략 3초의 delay뒤에 큐에 결과가 나타나는 것을 알 수 있다!!

 

5. 데이터베이스에 결과 저장

    이제 마지막으로 데이터베이스에 결과를 저장해보도록 하자! 아까 app = Celery부분에서 backend부분을 추가해 준다. 간단하게 sqlite를 사용했지만 Redis를 사용해도 무방하다.

app = Celery('tasks', broker='amqps://bole*********************qp.com/bolerruj', backend='db+sqlite///db.sqlite3')

그리고 다시 실행시킨 다음, 각 reverse함수에 대한 리턴값을 찍어보면 작업상태도 알 수 있다.

최종적으로 sqlite에 저장된 결과들을 확인해 보면, 작업한 결과들이 무사히 저장되어 있는 것을 알 수 있다!

 

다음 포스팅에서는 Celery를 Flask에 적용시켜 보도록 하겠다!

728x90