본문 바로가기

[Data Engineering]/[Gloud-GCP]

[GCP] 2-2. Periodic scheduling, flask web application

728x90

 < 실습 - 주기적인 데이터 스케줄링, 이어서 ... >

  아래에 보이는 코드는 ingest_flights.py의 전체 코드이다. 2장의 실습에 핵심적인 내용을 포함하고 있으며, 관련 주석으로 부족한 코드 설명을 보완함을 밝힌다. (일일이 코드를 분석,설명해주는 것은 이 포스팅의 목적이 아니다.)  이어서 나오는 사진은 Cloud-Shell에서 ingest_flights.py를 실행시킨 모습이다.

 

< ingest_flights.py code 전문 >

#!/usr/bin/env python

# Copyright 2016 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import shutil
import logging
import os.path
import zipfile
import datetime
import tempfile
from google.cloud import storage
from google.cloud.storage import Blob

try:
    from urllib.request import urlopen as impl
    # For Python 3.0 and later
    def urlopen(url, data):
        return impl(url, data.encode('utf-8'))
    def remove_quote(text):
        return text.translate(str.maketrans('','', '"'))
except ImportError as error:
    # Fall back to Python 2's urllib2
    from urllib2 import urlopen
    def remove_quote(text):
        return text.translate(None, '"')

def download(YEAR, MONTH, destdir): #bash script가 하드코딩된 연도(2015)에서 데이터를 가져오는데에 비해, python코드는 연도와 월을 입력값으로 받는다.
   '''
     Downloads on-time performance data and returns local filename
     YEAR e.g.'2015'
     MONTH e.g. '01 for January
   '''
   logging.info('Requesting data for {}-{}-*'.format(YEAR, MONTH))

   PARAMS=" <파라미터 생략>".format(YEAR, MONTH)

   url='https://www.transtats.bts.gov/DownLoad_Table.asp?Table_ID=236&Has_Group=3&Is_Zipped=0' # 이 URL이 나오게 된 이유는 (보완1)에 밝힌다. 
   filename = os.path.join(destdir, "{}{}.zip".format(YEAR, MONTH))
   with open(filename, "wb") as fp:
     response = urlopen(url, PARAMS)
     fp.write(response.read())
   logging.debug("{} saved".format(filename))
   return filename

def zip_to_csv(filename, destdir): # 압축해제 후 csv파일을 추출한다.
   zip_ref = zipfile.ZipFile(filename, 'r')
   cwd = os.getcwd() # 현재 자신의 디렉토리를 반환해준다
   os.chdir(destdir) # 현재 디렉터리의 위치를 변경한다.
   zip_ref.extractall() # 현재 디렉터리에 내용물을 전부 압축해제한다.
   os.chdir(cwd) # 원래 디렉터리로 돌아간다.
   csvfile = os.path.join(destdir, zip_ref.namelist()[0]) # 경로를 병합하여 새 경로를 만든다. 압축 파일의 모든 멤버 리스트중 첫번째 값과 병합
   zip_ref.close()
   logging.info("Extracted {}".format(csvfile))
   return csvfile


class DataUnavailable(Exception):
   def __init__(self, message):
      self.message = message

class UnexpectedFormat(Exception):
   def __init__(self, message):
      self.message = message

def verify_ingest(csvfile): # 책에서는 ingest data를 확인하는 코드도 추가해 놓았다. 추가한 두가지 내용은 다음과 같다.
	# 1) 다운받은 코드가 한줄 이상인지를 확인한다.
    # 2) csv파일이 정확한 헤더를 가지고 있는지를 확인한다.
    
   #확인하려하는 헤더는 다음과 같다.
   expected_header = 'FL_DATE,UNIQUE_CARRIER,AIRLINE_ID,CARRIER,FL_NUM,ORIGIN_AIRPORT_ID,ORIGIN_AIRPORT_SEQ_ID,ORIGIN_CITY_MARKET_ID,ORIGIN,DEST_AIRPORT_ID,DEST_AIRPORT_SEQ_ID,DEST_CITY_MARKET_ID,DEST,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,CANCELLED,CANCELLATION_CODE,DIVERTED,DISTANCE'

   with open(csvfile, 'r') as csvfp: # close하는 실수를 줄여주는 중요한 coding style
      firstline = csvfp.readline().strip() # .strip()함수는 문자열이 있을 때 양쪽 끝에 있는 공백과 \n기호를 삭제시켜준다.
      if (firstline != expected_header): # 첫라인의 헤더가 예상한 헤더가 아닐 경우에 (내용 2)에 해당 )
         os.remove(csvfile)
         msg = 'Got header={}, but expected={}'.format(
                             firstline, expected_header)
         logging.error(msg)
         raise UnexpectedFormat(msg)

      if next(csvfp, None) == None: # 다음요소인 내부 데이터가 한줄 이상인지 확인 (내용 1)에 해당 )
         os.remove(csvfile)
         msg = ('Received a file from BTS that has only the header and no content')
         raise DataUnavailable(msg)


def remove_quotes_comma(csvfile, year, month): # 따옴표와 줄 끝의 쉼표를 제거하는 코드
 '''
     returns output_csv_file or raises DataUnavailable exception
 '''
 try:
   outfile = os.path.join(os.path.dirname(csvfile),
                          '{}{}.csv'.format(year, month))
   with open(csvfile, 'r') as infp: # close 필요없이 블럭 벗어나면 자동으로 close
     with open(outfile, 'w') as outfp: # 마찬가지. 
        for line in infp: 
           outline = line.rstrip().rstrip(',') # 따옴표와 줄 끝의 쉼표를 제거할 수 있다. 
           outline = remove_quote(outline) # quote는 none으로 translate, (line38참조)
           outfp.write(outline) # 처리 끝난 내용 작성
           outfp.write('\n') # 마감 줄처리 이어서 작성
   logging.debug('Ingested {} ...'.format(outfile))
   return outfile
 finally:
   logging.debug("... removing {}".format(csvfile))
   os.remove(csvfile)

def upload(csvfile, bucketname, blobname): # 주어진 월의 csv파일을 클라우드 스토리지에 업로드한다.
   client = storage.Client() 
   bucket = client.get_bucket(bucketname)
   blob = Blob(blobname, bucket) # blob은 간접객체이다. 간접객체에 대한 설명은 (보완2)에 밝힌다.
   blob.upload_from_filename(csvfile)
   gcslocation = 'gs://{}/{}'.format(bucketname, blobname) # 이전 부분과 동일. 이하생략.
   logging.info('Uploaded {} ...'.format(gcslocation))
   return gcslocation

def ingest(year, month, bucket): 
   '''
   ingest flights data from BTS website to Google Cloud Storage
   return cloud-storage-blob-name on success.
   raises DataUnavailable if this data is not on BTS website
   '''
   tempdir = tempfile.mkdtemp(prefix='ingest_flights') # mkdtemp()는 임시 디렉토리를 생성한다.
   try:
      zipfile = download(year, month, tempdir) # download(상단 정의) 호출
      bts_csv = zip_to_csv(zipfile, tempdir) # zip_to_csv(상단 정의) 호출
      csvfile = remove_quotes_comma(bts_csv, year, month) # remove_quotes_comma(상단 정의) 호출
      verify_ingest(csvfile) # 동일
      gcsloc = 'flights/raw/{}'.format(os.path.basename(csvfile))
      return upload(csvfile, bucket, gcsloc)
   finally:
      logging.debug('Cleaning up by removing {}'.format(tempdir))
      shutil.rmtree(tempdir) # 생성했던 임시 디렉토리 제거

def next_month(bucketname): 
   '''
     Finds which months are on GCS, and returns next year,month to download
   '''
   client = storage.Client()
   bucket = client.get_bucket(bucketname) # bucket 객체 생성
   blobs  = list(bucket.list_blobs(prefix='flights/raw/')) # blob
   files = [blob.name for blob in blobs if 'csv' in blob.name] # csv files only
   lastfile = os.path.basename(files[-1]) # 예시-> 201503.csv 
   logging.debug('The latest file on GCS is {}'.format(lastfile))
   year = lastfile[:4] # 2015
   month = lastfile[4:6] # 03
   return compute_next_month(year, month)

def compute_next_month(year, month): # 다음달을 계산하기위한 함수
   dt = datetime.datetime(int(year), int(month), 15) # 15th of month
   dt = dt + datetime.timedelta(30) # will always go to next month 이 방법으로 28일~31일 월 전부 커버가능하다.
   logging.debug('The next month is {}'.format(dt))
   return '{}'.format(dt.year), '{:02d}'.format(dt.month)
  
if __name__ == '__main__':
   import argparse # 인자값을 받아 기능을 수행하게 해주는 모듈
   parser = argparse.ArgumentParser(description='ingest flights data from BTS website to Google Cloud Storage')
   parser.add_argument('--bucket', help='GCS bucket to upload data to', required=True)
   parser.add_argument('--year', help='Example: 2015.  If not provided, defaults to getting next month')
   parser.add_argument('--month', help='Specify 01 for January. If not provided, defaults to getting next month')

   try:
      logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.INFO) # 로깅 포맷 변경
      args = parser.parse_args()
      if args.year is None or args.month is None:
         year, month = next_month(args.bucket)
      else:
         year = args.year
         month = args.month
      logging.debug('Ingesting year={} month={}'.format(year, month))
      gcsfile = ingest(year, month, args.bucket)
      logging.info('Success ... ingested to {}'.format(gcsfile))
   except DataUnavailable as e:
      logging.info('Try again later: {}'.format(e.message))

< ingest_flights.py 를 실행시킨 화면 >

 

  ingest_flights.py는 클라우드 스토리지 버킷을 한달에 한 번 업데이트하는 기능을 가진 Data 입수 프로그램이다. 이번에는 이를 서버리스 방식으로 동작할 수 있는 cron 호출형 flask-web-application을 적용시켜본다.

 

< 실습 - Flask Web Application (optional) >

flask-web-application을 실행시켜보려고 책에 나온 대로 02_ingest에 들어가 init_appengine.sh를 실행시키려고 했는데..

  appengine.sh 파일이 없다???  그 외에 token generate와 setup_cron등의 실행과 관련한 파일들이 눈에 보여서 다 까보았지만 실행순서가 명확하지 않았고 책에 관련 내용이 없어 처음에 꽤나 당황했었다. 잠시 방황하다가 저자를 믿고 readme를 읽어보기로 하였다.

 

< 역시나 친절하게 다 설명해 놓았다 >

  읽으라고 올려놓은 파일을 열어보지도 않은 내 잘못이다.... 안심하고 순서대로 실행시켜보도록 하겠다. 정확한 관련코드는 저자의 깃허브와 저서를 참고하길 권장한다. (너무 다 소개하기엔 날로먹는것 같아...) 

 

< token 생성>
< 생성된 토큰으로 main.py에서 수정 >

 

< ... >

  역시나 한방에 될 리가 없다. cloud function api가 enable 되어있지 않는 문제인듯 하니, GCP로 돌아가서 해당 api를 찾아 추가해주도록 한다.

< Cloud Function API 검색 >
< 해당 API를 프로젝트에 추가 >

 

< 이후에도 계속 트러블슈팅 과정이므로 생략하고자 하면 생략해도 된다. >

 

.......문제는 해결된 줄 알았지만 계속해서 error가 발생했다.... main.py에 ingest_flights_randomstring의 함수명이 필요하다는 내용이였다. cat 커맨드로  deploy_cf.sh와 main.py에서 ingest_flights()라는 function은 있었지만 그 뒤로 저런식으로 random string이 붙는 경우는 없었기에 당황스러웠다.

< 또다시 발생한 error >

 

  해결방법은 Google Cloud의 deploy reference에서 찾을수 있었다.  gcloud 명령어를 사용할 때 NAME에 해당하는 부분에서 --entry-point가 옵션으로 들어가 있지 않는 경우에는 NAME이라는 함수명이 반드시 코드 내부에 정의되어 있어야 한다는 내용이였다. 

 

  reference에 소개된 대로 --entry-point 뒤에 원래 사용하려던 function인 ingest_flights를 추가해주고 해당 코드를 실행시키니 에러없이 무사히 지나갔다. 이후의 이미지들은 나머지 실습과정을 수행한 것이며 책에 소개되어있지 않고, 새롭게 수정된 README 파일에 있는 내용이다. 실습에 도움이 되도록 아래에 첨부하겠다.

 

[Optional] Scheduling monthly downloads

  • Go to the 02_ingest/monthlyupdate folder in the repo.
  • Generate a new Personal Access Token by running ./generate_token.sh -- Note the token printed.
  • Modify main.py to have this token.
  • Deploy Cloud Function in your project by running ./deploy_cf.sh -- Note the URL that is printed.
  • Try out ingest by running ./call_cf.sh supplying the necessary parameters.
  • Schedule ingest by running ./setup_cron.sh supplying the necessary parameters.
  • Visit the GCP Console for Cloud Functions and Cloud Scheduler and delete the function and the scheduled task—you won’t need them any further.

 

< 스케줄러에 확인되는 monthlyupdate 작업 >

 

 

 

< 다운로드 URL과 BLOB객체에 관련한 설명은 추후에 보충한다. >

 

 

"Data Science on the Google Cloud Platform by Valliappa Lakshmanan (O'Reilly). Copyright 2018 Google Inc."
728x90

'[Data Engineering] > [Gloud-GCP]' 카테고리의 다른 글

[GCP] 3-3. DashBoard  (0) 2020.02.18
[GCP] 3-2. Decision Model  (0) 2020.02.18
[GCP] 3-1. How to make Dataset  (0) 2020.02.18
[GCP] 2-1. Fixed Data-set scheduling  (0) 2020.02.18
[GCP] 1. Introduction  (4) 2020.02.18