본문 바로가기

[Data Engineering]/[Gloud-GCP]

[GCP] 6-2. Quantization using Spark SQL

728x90

 < 실습 - 빅쿼리를 이용한 개별성 검사 >

 

- 들어가기 전에 -

 

  이번에는 빅쿼리를 이용해서 개별성 검사를 진행해보도록 하겠다. 데이터 셋이 개별적인지 판단하는 일을 하는 이유는, 이전에 소개한 베이즈 분류에서 각 빈에서 지연된 항공편의 비율을 계산해야 하는데 이 이전에 지연과 거리라는 개념을 양자화 하는 방법을 결정해야하기 때문이다. 클라우드 데이터프록에 실행중인 노트북에서 빅쿼리를 호출할 수 있다. dataproc과 관련된 내용은 미흡한 부분이 있어, ssh가 아닌, GCP에서 실습을 이어간다. dataproc은 책에 소개된 내용과는 다른 방법으로 이어 설명을 하도록 하겠다. datalab에 레퍼지토리를 먼저 클론하자. 

!git clone  https://github.com/GoogleCloudPlatform/data-science-on-gcp/

  이후에 해당 창을 닫고 폴더를 열어보면 깃 레퍼지토리가 저장이 되어있는 것을 알 수 있다. 이후에 실습은 quantization 노트북으로 진행하도록 한다. 열어보면 다음과 같다.

 

 - 빅쿼리를 이용한 개별성 검사 -

import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
import google.cloud.bigquery as bigquery

bq = bigquery.Client()

sql = """
SELECT DISTANCE, DEP_DELAY
FROM `flights.tzcorr`
WHERE RAND() < 0.001 AND dep_delay > -20 AND dep_delay < 30 AND distance < 2000
"""
df = bq.query(sql).to_dataframe()

sns.set_style("whitegrid")
g = sns.jointplot(df['DISTANCE'], df['DEP_DELAY'], kind="hex", size=8, joint_kws={'gridsize':20})

< 결과화면 >

※ 관련 그래프에 대한 분석 및 comment는 추후에 자세하게 남기겠으나, 거리가 1000마일까지는 균등한 분포를 보여주고, 그 이후로는 급격하게 감소함을 알 수 있다.

 

- 구글 클라우드 데이터 랩의 스파크 SQL -

  먼저 pyspark 설치를 위해 pip으로 pyspark를 설치해준다.

!pip install pyspark

  책에 나오는 spark session과 관련된 내용은 stackoverflow에 검색해본 결과 더이상 쓸 필요가 없어졌다.

따라서 다음 코드는 사용하지 않아도 되므로 실행시키지 않고 삭제하여도 무방하다. ans) spark.sql(...) 와 같이 실행시킴.

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Bayes classification using Spark") \
    .getOrCreate()

https://stackoverflow.com/questions/46905903/sparksession-initialization-error-unable-to-use-spark-read

 

SparkSession initialization error - Unable to use spark.read

I tried to create a standalone PySpark program that reads a csv and stores it in a hive table. I have trouble configuring Spark session, conference and contexts objects. Here is my code: from pysp...

stackoverflow.com

 

  다음은 우리가 필요한 세가지 칼럼(arr-delay, dep_delay, distance)을 보정하는 과정이다. 해당하는 코드는 다음과 같다.

from pyspark.sql.types import StringType, FloatType, StructType, StructField

header = 'FL_DATE,UNIQUE_CARRIER,AIRLINE_ID,CARRIER,FL_NUM,ORIGIN_AIRPORT_ID,ORIGIN_AIRPORT_SEQ_ID,ORIGIN_CITY_MARKET_ID,ORIGIN,DEST_AIRPORT_ID,DEST_AIRPORT_SEQ_ID,DEST_CITY_MARKET_ID,DEST,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,CANCELLED,CANCELLATION_CODE,DIVERTED,DISTANCE,DEP_AIRPORT_LAT,DEP_AIRPORT_LON,DEP_AIRPORT_TZOFFSET,ARR_AIRPORT_LAT,ARR_AIRPORT_LON,ARR_AIRPORT_TZOFFSET,EVENT,NOTIFY_TIME'

def get_structfield(colname):
   if colname in ['ARR_DELAY', 'DEP_DELAY', 'DISTANCE']:
      return StructField(colname, FloatType(), True)
   else:
      return StructField(colname, StringType(), True)

schema = StructType([get_structfield(colname) for colname in header.split(',')])
print(schema)

추출된 스키마를 출력해보면 내용은 다음과 같다. => StructType(List(StructField(FL_DATE,StringType,true),StructField(UNIQUE_CARRIER,StringType,true),StructField(AIRLINE_ID,StringType,true),StructField(CARRIER,StringType,true),StructField(FL_NUM,StringType,true),StructField(ORIGIN_AIRPORT_ID,StringType,true),StructField(ORIGIN_AIRPORT_SEQ_ID,StringType,true),StructField(ORIGIN_CITY_MARKET_ID,StringType,true),StructField(ORIGIN,StringType,true),StructField(DEST_AIRPORT_ID,StringType,true),StructField(DEST_AIRPORT_SEQ_ID,StringType,true),StructField(DEST_CITY_MARKET_ID,StringType,true),StructField(DEST,StringType,true),StructField(CRS_DEP_TIME,StringType,true),StructField(DEP_TIME,StringType,true),StructField(DEP_DELAY,FloatType,true),StructField(TAXI_OUT,StringType,true),StructField(WHEELS_OFF,StringType,true),StructField(WHEELS_ON,StringType,true),StructField(TAXI_IN,StringType,true),StructField(CRS_ARR_TIME,StringType,true),StructField(ARR_TIME,StringType,true),StructField(ARR_DELAY,FloatType,true),StructField(CANCELLED,StringType,true),StructField(CANCELLATION_CODE,StringType,true),StructField(DIVERTED,StringType,true),StructField(DISTANCE,FloatType,true),StructField(DEP_AIRPORT_LAT,StringType,true),StructField(DEP_AIRPORT_LON,StringType,true),StructField(DEP_AIRPORT_TZOFFSET,StringType,true),StructField(ARR_AIRPORT_LAT,StringType,true),StructField(ARR_AIRPORT_LON,StringType,true),StructField(ARR_AIRPORT_TZOFFSET,StringType,true),StructField(EVENT,StringType,true),StructField(NOTIFY_TIME,StringType,true)))

 

전체 데이터셋을 전부 읽어오면 용량이 너무 많으므로, 1/30만 읽어오기로 하였다. input file은 다음과 같이 수정할 수 있다.

inputs = 'gs://{}/flights/tzcorr/all_flights-00000-*'.format(BUCKET) # 1/30th
#inputs = 'gs://{}/flights/tzcorr/all_flights-*'.format(BUCKET)  # FULL
flights = spark.read\
            .schema(schema)\
            .csv(inputs)

# this view can now be queried ...
flights.createOrReplaceTempView('flights')

 

 

 

 

"Data Science on the Google Cloud Platform by Valliappa Lakshmanan (O'Reilly). Copyright 2018 Google Inc."

 

728x90