ITエンジニアとして経験・学習したこと

ITエンジニアとして経験したり学習したことを忘れないよう、書いていきたいと思います。少しでも皆様のお役に立てれば幸いです。

GCS(Google Cloud Storage)とBigQueryで連動するプログラムを作成した(3)

ここでは、GCS(Google Cloud Storage)とBigQueryで連動するPythonプログラムを作成する前に行った環境構築と、作成したPython 2.7のプログラムの紹介を行う。

 

前提条件

以下の記事での環境構築が完了していること

www.purin-it.work

 

環境構築

ここでは、前提条件で記載した以外の環境構築手順について述べる。

 

1) コマンドプロンプト上で、「pip install (--upgrade) google-cloud-storage」を実行し、GCSを利用するためのライブラリをインストールする

f:id:purin_it:python_prepare_1

なお、--upgradeオプションを指定すると、インストール済ライブラリを最新化できる

 

2) コマンドプロンプト上で、「pip install (--upgrade) google-cloud-bigquery」を実行し、BigQueryを利用するためのライブラリをインストールする

f:id:purin_it:python_prepare_2

 

作成したPython 2.7のプログラム

ここでは、作成したPython 2.7のプログラムを紹介する。

 

1) GCS上のCSVファイル(insert_bigquery_sales.csv)のデータを、BigQueryのsalesテーブルに追加するプログラム   (プログラム名:insert_into_sales.py)

# -*- coding: utf-8 -*-
import sys
from google.cloud import bigquery
def insert_into_sales(): # 文字コードをUTF-8に設定 reload(sys) sys.setdefaultencoding('utf-8')
# ロードするBigQuery上のデータセットとテーブルの設定 dataset_id = 'bigquery_purin_it' table_id = 'sales' client = bigquery.Client() dataset_ref = client.dataset(dataset_id)
# BigQuery上のテーブルにロードするジョブの設定
# ここでは、テーブルデータを全て削除してからロードしている
job_config = bigquery.LoadJobConfig() job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE job_config.schema = [ bigquery.SchemaField('sale_date', 'DATE') , bigquery.SchemaField('product_name', 'STRING') , bigquery.SchemaField('place_name', 'STRING') , bigquery.SchemaField('sales_amount', 'INTEGER') ] job_config.skip_leading_rows = 1 job_config.source_format = bigquery.SourceFormat.CSV load_job = client.load_table_from_uri( 'gs://test_purin_bucket/insert_bigquery_sales.csv' , dataset_ref.table(table_id) , job_config = job_config) # ジョブを実行し、終了を待つ print('Starting job {}'.format(load_job.job_id)) load_job.result() print('Job Finished.')
if __name__ == '__main__': insert_into_sales()

 

2) BigQueryのsalesテーブルのデータを、JSONファイル(sales.json)に出力するプログラム  (プログラム名:select_from_sales.py)

# -*- coding: utf-8 -*-
import sys
import json
import collections as cl
from google.cloud import bigquery

def query_from_sales():
    # 文字コードをUTF-8に設定
    reload(sys)
    sys.setdefaultencoding('utf-8')

    # BigQuery salesテーブルからデータを抽出
    client = bigquery.Client()
    query_job = client.query("""
       SELECT 
           sale_date
         , product_name
         , place_name
         , sales_amount
       FROM `(GCPのプロジェクト名).bigquery_purin_it.sales`
     """)
    results = query_job.result()
    
    # 抽出した結果をJSONファイルに出力
    ys = cl.OrderedDict()
    i = 0
    for row in results:
        data = cl.OrderedDict()
        data["sale_date"] = row.sale_date.strftime('%Y-%m-%d')
        data["product_name"] = row.product_name
        data["place_name"] = row.place_name
        data["sales_amount"] = row.sales_amount
        ys[i + 1] = data
        i = i + 1
        
    fw = open('C:\work\gcp\sales.json', 'w')
    json.dump(ys, fw, ensure_ascii=False, indent=4)
    fw.close()
    print('JSONファイル出力完了'.decode('utf-8'))

if __name__ == '__main__':
    query_from_sales()

 

作成したPython 2.7のプログラムの実行結果

以下の記事を参照のこと。

www.purin-it.work