본문 바로가기

[Data Engineering]/[Gloud-GCP]

[GCP] 2-2. Periodic scheduling, flask web application


 < 실습 - 주기적인 데이터 스케줄링, 이어서 ... >

  아래에 보이는 코드는 ingest_flights.py의 전체 코드이다. 2장의 실습에 핵심적인 내용을 포함하고 있으며, 관련 주석으로 부족한 코드 설명을 보완함을 밝힌다. (일일이 코드를 분석,설명해주는 것은 이 포스팅의 목적이 아니다.)  이어서 나오는 사진은 Cloud-Shell에서 ingest_flights.py를 실행시킨 모습이다.


< ingest_flights.py code 전문 >

#!/usr/bin/env python

# Copyright 2016 Google Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#     http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import shutil
import logging
import os.path
import zipfile
import datetime
import tempfile
from google.cloud import storage
from google.cloud.storage import Blob

    from urllib.request import urlopen as impl
    # For Python 3.0 and later
    def urlopen(url, data):
        return impl(url, data.encode('utf-8'))
    def remove_quote(text):
        return text.translate(str.maketrans('','', '"'))
except ImportError as error:
    # Fall back to Python 2's urllib2
    from urllib2 import urlopen
    def remove_quote(text):
        return text.translate(None, '"')

def download(YEAR, MONTH, destdir): #bash script가 하드코딩된 연도(2015)에서 데이터를 가져오는데에 비해, python코드는 연도와 월을 입력값으로 받는다.
     Downloads on-time performance data and returns local filename
     YEAR e.g.'2015'
     MONTH e.g. '01 for January
   logging.info('Requesting data for {}-{}-*'.format(YEAR, MONTH))

   PARAMS=" <파라미터 생략>".format(YEAR, MONTH)

   url='https://www.transtats.bts.gov/DownLoad_Table.asp?Table_ID=236&Has_Group=3&Is_Zipped=0' # 이 URL이 나오게 된 이유는 (보완1)에 밝힌다. 
   filename = os.path.join(destdir, "{}{}.zip".format(YEAR, MONTH))
   with open(filename, "wb") as fp:
     response = urlopen(url, PARAMS)
   logging.debug("{} saved".format(filename))
   return filename

def zip_to_csv(filename, destdir): # 압축해제 후 csv파일을 추출한다.
   zip_ref = zipfile.ZipFile(filename, 'r')
   cwd = os.getcwd() # 현재 자신의 디렉토리를 반환해준다
   os.chdir(destdir) # 현재 디렉터리의 위치를 변경한다.
   zip_ref.extractall() # 현재 디렉터리에 내용물을 전부 압축해제한다.
   os.chdir(cwd) # 원래 디렉터리로 돌아간다.
   csvfile = os.path.join(destdir, zip_ref.namelist()[0]) # 경로를 병합하여 새 경로를 만든다. 압축 파일의 모든 멤버 리스트중 첫번째 값과 병합
   logging.info("Extracted {}".format(csvfile))
   return csvfile

class DataUnavailable(Exception):
   def __init__(self, message):
      self.message = message

class UnexpectedFormat(Exception):
   def __init__(self, message):
      self.message = message

def verify_ingest(csvfile): # 책에서는 ingest data를 확인하는 코드도 추가해 놓았다. 추가한 두가지 내용은 다음과 같다.
	# 1) 다운받은 코드가 한줄 이상인지를 확인한다.
    # 2) csv파일이 정확한 헤더를 가지고 있는지를 확인한다.
   #확인하려하는 헤더는 다음과 같다.

   with open(csvfile, 'r') as csvfp: # close하는 실수를 줄여주는 중요한 coding style
      firstline = csvfp.readline().strip() # .strip()함수는 문자열이 있을 때 양쪽 끝에 있는 공백과 \n기호를 삭제시켜준다.
      if (firstline != expected_header): # 첫라인의 헤더가 예상한 헤더가 아닐 경우에 (내용 2)에 해당 )
         msg = 'Got header={}, but expected={}'.format(
                             firstline, expected_header)
         raise UnexpectedFormat(msg)

      if next(csvfp, None) == None: # 다음요소인 내부 데이터가 한줄 이상인지 확인 (내용 1)에 해당 )
         msg = ('Received a file from BTS that has only the header and no content')
         raise DataUnavailable(msg)

def remove_quotes_comma(csvfile, year, month): # 따옴표와 줄 끝의 쉼표를 제거하는 코드
     returns output_csv_file or raises DataUnavailable exception
   outfile = os.path.join(os.path.dirname(csvfile),
                          '{}{}.csv'.format(year, month))
   with open(csvfile, 'r') as infp: # close 필요없이 블럭 벗어나면 자동으로 close
     with open(outfile, 'w') as outfp: # 마찬가지. 
        for line in infp: 
           outline = line.rstrip().rstrip(',') # 따옴표와 줄 끝의 쉼표를 제거할 수 있다. 
           outline = remove_quote(outline) # quote는 none으로 translate, (line38참조)
           outfp.write(outline) # 처리 끝난 내용 작성
           outfp.write('\n') # 마감 줄처리 이어서 작성
   logging.debug('Ingested {} ...'.format(outfile))
   return outfile
   logging.debug("... removing {}".format(csvfile))

def upload(csvfile, bucketname, blobname): # 주어진 월의 csv파일을 클라우드 스토리지에 업로드한다.
   client = storage.Client() 
   bucket = client.get_bucket(bucketname)
   blob = Blob(blobname, bucket) # blob은 간접객체이다. 간접객체에 대한 설명은 (보완2)에 밝힌다.
   gcslocation = 'gs://{}/{}'.format(bucketname, blobname) # 이전 부분과 동일. 이하생략.
   logging.info('Uploaded {} ...'.format(gcslocation))
   return gcslocation

def ingest(year, month, bucket): 
   ingest flights data from BTS website to Google Cloud Storage
   return cloud-storage-blob-name on success.
   raises DataUnavailable if this data is not on BTS website
   tempdir = tempfile.mkdtemp(prefix='ingest_flights') # mkdtemp()는 임시 디렉토리를 생성한다.
      zipfile = download(year, month, tempdir) # download(상단 정의) 호출
      bts_csv = zip_to_csv(zipfile, tempdir) # zip_to_csv(상단 정의) 호출
      csvfile = remove_quotes_comma(bts_csv, year, month) # remove_quotes_comma(상단 정의) 호출
      verify_ingest(csvfile) # 동일
      gcsloc = 'flights/raw/{}'.format(os.path.basename(csvfile))
      return upload(csvfile, bucket, gcsloc)
      logging.debug('Cleaning up by removing {}'.format(tempdir))
      shutil.rmtree(tempdir) # 생성했던 임시 디렉토리 제거

def next_month(bucketname): 
     Finds which months are on GCS, and returns next year,month to download
   client = storage.Client()
   bucket = client.get_bucket(bucketname) # bucket 객체 생성
   blobs  = list(bucket.list_blobs(prefix='flights/raw/')) # blob
   files = [blob.name for blob in blobs if 'csv' in blob.name] # csv files only
   lastfile = os.path.basename(files[-1]) # 예시-> 201503.csv 
   logging.debug('The latest file on GCS is {}'.format(lastfile))
   year = lastfile[:4] # 2015
   month = lastfile[4:6] # 03
   return compute_next_month(year, month)

def compute_next_month(year, month): # 다음달을 계산하기위한 함수
   dt = datetime.datetime(int(year), int(month), 15) # 15th of month
   dt = dt + datetime.timedelta(30) # will always go to next month 이 방법으로 28일~31일 월 전부 커버가능하다.
   logging.debug('The next month is {}'.format(dt))
   return '{}'.format(dt.year), '{:02d}'.format(dt.month)
if __name__ == '__main__':
   import argparse # 인자값을 받아 기능을 수행하게 해주는 모듈
   parser = argparse.ArgumentParser(description='ingest flights data from BTS website to Google Cloud Storage')
   parser.add_argument('--bucket', help='GCS bucket to upload data to', required=True)
   parser.add_argument('--year', help='Example: 2015.  If not provided, defaults to getting next month')
   parser.add_argument('--month', help='Specify 01 for January. If not provided, defaults to getting next month')

      logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.INFO) # 로깅 포맷 변경
      args = parser.parse_args()
      if args.year is None or args.month is None:
         year, month = next_month(args.bucket)
         year = args.year
         month = args.month
      logging.debug('Ingesting year={} month={}'.format(year, month))
      gcsfile = ingest(year, month, args.bucket)
      logging.info('Success ... ingested to {}'.format(gcsfile))
   except DataUnavailable as e:
      logging.info('Try again later: {}'.format(e.message))

< ingest_flights.py 를 실행시킨 화면 >


  ingest_flights.py는 클라우드 스토리지 버킷을 한달에 한 번 업데이트하는 기능을 가진 Data 입수 프로그램이다. 이번에는 이를 서버리스 방식으로 동작할 수 있는 cron 호출형 flask-web-application을 적용시켜본다.


< 실습 - Flask Web Application (optional) >

flask-web-application을 실행시켜보려고 책에 나온 대로 02_ingest에 들어가 init_appengine.sh를 실행시키려고 했는데..

  appengine.sh 파일이 없다???  그 외에 token generate와 setup_cron등의 실행과 관련한 파일들이 눈에 보여서 다 까보았지만 실행순서가 명확하지 않았고 책에 관련 내용이 없어 처음에 꽤나 당황했었다. 잠시 방황하다가 저자를 믿고 readme를 읽어보기로 하였다.


< 역시나 친절하게 다 설명해 놓았다 >

  읽으라고 올려놓은 파일을 열어보지도 않은 내 잘못이다.... 안심하고 순서대로 실행시켜보도록 하겠다. 정확한 관련코드는 저자의 깃허브와 저서를 참고하길 권장한다. (너무 다 소개하기엔 날로먹는것 같아...) 


< token 생성>
< 생성된 토큰으로 main.py에서 수정 >


< ... >

  역시나 한방에 될 리가 없다. cloud function api가 enable 되어있지 않는 문제인듯 하니, GCP로 돌아가서 해당 api를 찾아 추가해주도록 한다.

< Cloud Function API 검색 >
< 해당 API를 프로젝트에 추가 >


< 이후에도 계속 트러블슈팅 과정이므로 생략하고자 하면 생략해도 된다. >


.......문제는 해결된 줄 알았지만 계속해서 error가 발생했다.... main.py에 ingest_flights_randomstring의 함수명이 필요하다는 내용이였다. cat 커맨드로  deploy_cf.sh와 main.py에서 ingest_flights()라는 function은 있었지만 그 뒤로 저런식으로 random string이 붙는 경우는 없었기에 당황스러웠다.

< 또다시 발생한 error >


  해결방법은 Google Cloud의 deploy reference에서 찾을수 있었다.  gcloud 명령어를 사용할 때 NAME에 해당하는 부분에서 --entry-point가 옵션으로 들어가 있지 않는 경우에는 NAME이라는 함수명이 반드시 코드 내부에 정의되어 있어야 한다는 내용이였다. 


  reference에 소개된 대로 --entry-point 뒤에 원래 사용하려던 function인 ingest_flights를 추가해주고 해당 코드를 실행시키니 에러없이 무사히 지나갔다. 이후의 이미지들은 나머지 실습과정을 수행한 것이며 책에 소개되어있지 않고, 새롭게 수정된 README 파일에 있는 내용이다. 실습에 도움이 되도록 아래에 첨부하겠다.


[Optional] Scheduling monthly downloads

  • Go to the 02_ingest/monthlyupdate folder in the repo.
  • Generate a new Personal Access Token by running ./generate_token.sh -- Note the token printed.
  • Modify main.py to have this token.
  • Deploy Cloud Function in your project by running ./deploy_cf.sh -- Note the URL that is printed.
  • Try out ingest by running ./call_cf.sh supplying the necessary parameters.
  • Schedule ingest by running ./setup_cron.sh supplying the necessary parameters.
  • Visit the GCP Console for Cloud Functions and Cloud Scheduler and delete the function and the scheduled task—you won’t need them any further.


< 스케줄러에 확인되는 monthlyupdate 작업 >




< 다운로드 URL과 BLOB객체에 관련한 설명은 추후에 보충한다. >



"Data Science on the Google Cloud Platform by Valliappa Lakshmanan (O'Reilly). Copyright 2018 Google Inc."

'[Data Engineering] > [Gloud-GCP]' 카테고리의 다른 글

[GCP] 3-3. DashBoard  (0) 2020.02.18
[GCP] 3-2. Decision Model  (0) 2020.02.18
[GCP] 3-1. How to make Dataset  (0) 2020.02.18
[GCP] 2-1. Fixed Data-set scheduling  (0) 2020.02.18
[GCP] 1. Introduction  (4) 2020.02.18