본문 바로가기

[Python]/[Flask & Django]

[Celery] Side-Project(Ant Platform) - 3. Flask에서 Celery로 비동기작업 수행하기(프로젝트 구조+Celery동작)

728x90

[ 프로젝트 동작 구조 ]

1. Trigger Task : 먼저 Task가 home.html에서 handleClick함수로 인해 트리거된다.
    (이 handleClick은 그 아래 _base.html의 script에 의해 main.js로 간다. )

home.html
_base.html


2. AJAX part : main.js에서 handleClick함수에 type이 전달되고, server로 AJAX POST를 보내게 된다. 

# main.js 중에서...
function handleClick(type) {
  fetch('/tasks', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json'
    },
    body: JSON.stringify({ type: type }),
  })
  .then(response => response.json())
  .then(data => getStatus(data.task_id));
}


3. 그러면 server에서는 type을 넘겨받게 된다. 

# views.py 중에서...
@main_blueprint.route("/tasks", methods=["POST"])
def run_task():
    content = request.json
    task_type = content["type"]
    return jsonify(task_type), 202


4. 이제 Celery 파트이다! 
-> 먼저 requirements.txt의 내용(redis,celery)을 추가해주었다. 그 다음, docker-compose.yaml을 업데이트 한다.

services:

  web:
    build: .
    image: web
    container_name: web
    ports:
      - 5004:5000
    command: python manage.py run -h 0.0.0.0
    volumes:
      - .:/usr/src/app
    environment:
      - FLASK_DEBUG=1
      - APP_SETTINGS=project.server.config.DevelopmentConfig
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/0
    depends_on:
      - redis

  worker:
    build: .
    command: celery worker --app=project.server.tasks.celery --loglevel=info
    volumes:
      - .:/usr/src/app
    environment:
      - FLASK_DEBUG=1
      - APP_SETTINGS=project.server.config.DevelopmentConfig
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/0
    depends_on:
      - web
      - redis

  redis:
    image: redis:6-alpine


5. 이어서 Celery를 이용해 task들을 처리할 create_task함수를 선언해 준다. 이 작업을 Celery instance를 만든다는 표현을 사용했다. 여기 예제에서는, 전에 실습때처럼 특별한 로직이라기보다 넘어온 타입에 10을 곱해서 short,mid,long타입의 task들을 분리했다.

import os
import time

from celery import Celery


celery = Celery(__name__)
celery.conf.broker_url = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379")
celery.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379")


@celery.task(name="create_task")
def create_task(task_type):
    time.sleep(int(task_type) * 10)
    return True


6. 다음은 새롭게 생성한 Celery인스턴스를 사용해 새롭게 tasks 라우트를 재정의해준다.

@main_blueprint.route("/tasks", methods=["POST"])
def run_task():
    content = request.json
    task_type = content["type"]
    task = create_task.delay(int(task_type))
    return jsonify({"task_id": task.id}), 202


7. 그다음 다시 도커를 다시 빌드하고, Curl을 통해 Task동작을 확인해본다.

docker-compose up -d --build
$ curl http://localhost:5004/tasks -H "Content-Type: application/json" --data '{"type": 0}'

아름답다...
cur을 쐈을때 리턴된 task_id


8. 이어서 리턴된 작업을 HTML status table에 삽입하는 과정을 마저 코딩해준다.

function getStatus(taskID) {
  fetch(`/tasks/${taskID}`, {
    method: 'GET',
    headers: {
      'Content-Type': 'application/json'
    },
  })
  .then(response => response.json())
  .then(res => {
    const html = `
      <tr>
        <td>${taskID}</td>
        <td>${res.task_status}</td>
        <td>${res.task_result}</td>
      </tr>`;
    const newRow = document.getElementById('tasks').insertRow(0);
    newRow.innerHTML = html;

    const taskStatus = res.task_status;
    if (taskStatus === 'SUCCESS' || taskStatus === 'FAILURE') return false;
    setTimeout(function() {
      getStatus(res.task_id);
    }, 1000);
  })
  .catch(err => console.log(err));
}

그리고 view.py에는 task의 나머지 status, result를 보기 위해서 get_status함수를 업데이트 해준다.

@main_blueprint.route("/tasks/<task_id>", methods=["GET"])
def get_status(task_id):
    task_result = AsyncResult(task_id)
    result = {
        "task_id": task_id,
        "task_status": task_result.status,
        "task_result": task_result.result
    }
    return jsonify(result), 200


9. 마지막으로 다시 빌드하고, 실행시키면 task들이 수행하고, short task를 클릭해보면 다음과 같이 나타단다. 계속해서 작업이 완료되었는지를 polling하다가 결과적으로 완료되었을때 SUCCESS가 뜨는것을 확인할 수 있다!


다음 포스팅에는 이어서 log, flower, test등을 작성해 보도록 하겠다!

 

728x90