< 실습 - 빅쿼리를 이용한 개별성 검사 >
- 들어가기 전에 -
이번에는 빅쿼리를 이용해서 개별성 검사를 진행해보도록 하겠다. 데이터 셋이 개별적인지 판단하는 일을 하는 이유는, 이전에 소개한 베이즈 분류에서 각 빈에서 지연된 항공편의 비율을 계산해야 하는데 이 이전에 지연과 거리라는 개념을 양자화 하는 방법을 결정해야하기 때문이다. 클라우드 데이터프록에 실행중인 노트북에서 빅쿼리를 호출할 수 있다. dataproc과 관련된 내용은 미흡한 부분이 있어, ssh가 아닌, GCP에서 실습을 이어간다. dataproc은 책에 소개된 내용과는 다른 방법으로 이어 설명을 하도록 하겠다. datalab에 레퍼지토리를 먼저 클론하자.
!git clone https://github.com/GoogleCloudPlatform/data-science-on-gcp/
이후에 해당 창을 닫고 폴더를 열어보면 깃 레퍼지토리가 저장이 되어있는 것을 알 수 있다. 이후에 실습은 quantization 노트북으로 진행하도록 한다. 열어보면 다음과 같다.
- 빅쿼리를 이용한 개별성 검사 -
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
import google.cloud.bigquery as bigquery
bq = bigquery.Client()
sql = """
SELECT DISTANCE, DEP_DELAY
FROM `flights.tzcorr`
WHERE RAND() < 0.001 AND dep_delay > -20 AND dep_delay < 30 AND distance < 2000
"""
df = bq.query(sql).to_dataframe()
sns.set_style("whitegrid")
g = sns.jointplot(df['DISTANCE'], df['DEP_DELAY'], kind="hex", size=8, joint_kws={'gridsize':20})
※ 관련 그래프에 대한 분석 및 comment는 추후에 자세하게 남기겠으나, 거리가 1000마일까지는 균등한 분포를 보여주고, 그 이후로는 급격하게 감소함을 알 수 있다.
- 구글 클라우드 데이터 랩의 스파크 SQL -
먼저 pyspark 설치를 위해 pip으로 pyspark를 설치해준다.
!pip install pyspark
책에 나오는 spark session과 관련된 내용은 stackoverflow에 검색해본 결과 더이상 쓸 필요가 없어졌다.
따라서 다음 코드는 사용하지 않아도 되므로 실행시키지 않고 삭제하여도 무방하다. ans) spark.sql(...) 와 같이 실행시킴.
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Bayes classification using Spark") \
.getOrCreate()
다음은 우리가 필요한 세가지 칼럼(arr-delay, dep_delay, distance)을 보정하는 과정이다. 해당하는 코드는 다음과 같다.
from pyspark.sql.types import StringType, FloatType, StructType, StructField
header = 'FL_DATE,UNIQUE_CARRIER,AIRLINE_ID,CARRIER,FL_NUM,ORIGIN_AIRPORT_ID,ORIGIN_AIRPORT_SEQ_ID,ORIGIN_CITY_MARKET_ID,ORIGIN,DEST_AIRPORT_ID,DEST_AIRPORT_SEQ_ID,DEST_CITY_MARKET_ID,DEST,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,CANCELLED,CANCELLATION_CODE,DIVERTED,DISTANCE,DEP_AIRPORT_LAT,DEP_AIRPORT_LON,DEP_AIRPORT_TZOFFSET,ARR_AIRPORT_LAT,ARR_AIRPORT_LON,ARR_AIRPORT_TZOFFSET,EVENT,NOTIFY_TIME'
def get_structfield(colname):
if colname in ['ARR_DELAY', 'DEP_DELAY', 'DISTANCE']:
return StructField(colname, FloatType(), True)
else:
return StructField(colname, StringType(), True)
schema = StructType([get_structfield(colname) for colname in header.split(',')])
print(schema)
추출된 스키마를 출력해보면 내용은 다음과 같다. => StructType(List(StructField(FL_DATE,StringType,true),StructField(UNIQUE_CARRIER,StringType,true),StructField(AIRLINE_ID,StringType,true),StructField(CARRIER,StringType,true),StructField(FL_NUM,StringType,true),StructField(ORIGIN_AIRPORT_ID,StringType,true),StructField(ORIGIN_AIRPORT_SEQ_ID,StringType,true),StructField(ORIGIN_CITY_MARKET_ID,StringType,true),StructField(ORIGIN,StringType,true),StructField(DEST_AIRPORT_ID,StringType,true),StructField(DEST_AIRPORT_SEQ_ID,StringType,true),StructField(DEST_CITY_MARKET_ID,StringType,true),StructField(DEST,StringType,true),StructField(CRS_DEP_TIME,StringType,true),StructField(DEP_TIME,StringType,true),StructField(DEP_DELAY,FloatType,true),StructField(TAXI_OUT,StringType,true),StructField(WHEELS_OFF,StringType,true),StructField(WHEELS_ON,StringType,true),StructField(TAXI_IN,StringType,true),StructField(CRS_ARR_TIME,StringType,true),StructField(ARR_TIME,StringType,true),StructField(ARR_DELAY,FloatType,true),StructField(CANCELLED,StringType,true),StructField(CANCELLATION_CODE,StringType,true),StructField(DIVERTED,StringType,true),StructField(DISTANCE,FloatType,true),StructField(DEP_AIRPORT_LAT,StringType,true),StructField(DEP_AIRPORT_LON,StringType,true),StructField(DEP_AIRPORT_TZOFFSET,StringType,true),StructField(ARR_AIRPORT_LAT,StringType,true),StructField(ARR_AIRPORT_LON,StringType,true),StructField(ARR_AIRPORT_TZOFFSET,StringType,true),StructField(EVENT,StringType,true),StructField(NOTIFY_TIME,StringType,true)))
전체 데이터셋을 전부 읽어오면 용량이 너무 많으므로, 1/30만 읽어오기로 하였다. input file은 다음과 같이 수정할 수 있다.
inputs = 'gs://{}/flights/tzcorr/all_flights-00000-*'.format(BUCKET) # 1/30th
#inputs = 'gs://{}/flights/tzcorr/all_flights-*'.format(BUCKET) # FULL
flights = spark.read\
.schema(schema)\
.csv(inputs)
# this view can now be queried ...
flights.createOrReplaceTempView('flights')
"Data Science on the Google Cloud Platform by Valliappa Lakshmanan (O'Reilly). Copyright 2018 Google Inc."
'[Data Engineering] > [Gloud-GCP]' 카테고리의 다른 글
[GCP] 6-1. Dataproc cluster, bayes classification (0) | 2020.03.04 |
---|---|
[GCP] 5-3. Data refining & model eval (0) | 2020.02.25 |
[GCP] 5-2. Cloud Data LAB (4) | 2020.02.24 |
[GCP] 5-1. Bigquery, Data Loading (0) | 2020.02.19 |
[GCP] 4-2. Stream Processing (2) | 2020.02.19 |