ITエンジニアとして経験・学習したこと

ITエンジニアとして経験したり学習したことを忘れないよう、書いていきたいと思います。少しでも皆様のお役に立てれば幸いです。

GCS(Google Cloud Storage)とBigQueryで連動するプログラムを作成した(4)

ここでは、GCS(Google Cloud Storage)とBigQueryで連動する、作成したJava 1.8のプログラムの紹介を行う。

 

前提条件

以下の記事での環境構築が完了していること

www.purin-it.work

 

また、以下の記事での「Eclipseのダウンロードと解凍」が完了していること

www.purin-it.work

 

作成したJava 1.8のプログラム

ここでは、作成したJava 1.8のプログラムを紹介する。

 

1) 以下の構成でのMavenプロジェクト「select_from_sales」を作成する

f:id:purin_it:java_maven_structure

 

2) pom.xmlの内容は以下の通り

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>test.bigquery.select</groupId>
  <artifactId>select-from-sales</artifactId>
  <version>0.0.1-SNAPSHOT</version>

  <!-- 文字コードとJavaのバージョンの設定 -->
  <properties>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      <java.version>1.8</java.version>
  </properties>

  <!-- プラグインの設定 -->
  <build>
      <plugins>
          <!-- Javaファイルのコンパイラの設定 -->
          <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-compiler-plugin</artifactId>
              <version>3.6.1</version>
              <configuration>
                  <source>${java.version}</source>
                  <target>${java.version}</target>
              </configuration>
          </plugin>
          <!-- プロジェクトと依存するライブラリを1つにまとめる設定 -->
          <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.6</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <!-- メインプログラムとして実行するクラスの指定 -->
                            <mainClass>test.TestBigQuery</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
          </plugin>
      </plugins>
  </build>

  <!-- ライブラリ依存関係の設定 -->
  <dependencies>
      <dependency>
          <!-- BigQuery用ライブラリを追加する設定 -->
          <groupId>com.google.cloud</groupId>
          <artifactId>google-cloud-bigquery</artifactId>
          <version>1.49.0</version>
      </dependency>
      <dependency>
          <!-- JSON用ライブラリを追加する設定 -->
          <groupId>org.json</groupId>
          <artifactId>json</artifactId>
          <version>20180813</version>
      </dependency>
  </dependencies>

</project>

 

3) メインとなるTestBigQuery.javaは以下の通り

    ここで、次項で述べる4)5)を順に呼び出している

package test;

public class TestBigQuery {

        public static void main(String[] args) {
                InsertIntoSales.main(args);
                SelectFromSales.main(args);
        }

}

 

4) GCS上のCSVファイル(insert_bigquery_sales.csv)のデータを、BigQueryのsalesテーブルに追加する、InsertIntoSales.javaは以下の通り

package test;

import java.util.UUID;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.CsvOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.LoadJobConfiguration.Builder;
import com.google.cloud.bigquery.Table;

public class InsertIntoSales {

        public static void main(String[] args) {

                // ロードするGCS上のファイルとBigQuery上のデータセットとテーブルの設定
                String sourceUri = "gs://test_purin_bucket/insert_bigquery_sales.csv";
                String dataSet = "bigquery_purin_it";
                String tableName = "sales";

                BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
                Table table = bigquery.getTable(dataSet, tableName);

                // BigQuery上のテーブルにロードするジョブの設定
                // ここでは、テーブルデータを全て削除してからロードしている
                Builder loadConfig = LoadJobConfiguration.newBuilder(table.getTableId(), sourceUri);
                loadConfig.setWriteDisposition(JobInfo.WriteDisposition.WRITE_TRUNCATE);
                com.google.cloud.bigquery.CsvOptions.Builder csvOptions = CsvOptions.newBuilder();
                csvOptions.setSkipLeadingRows(1);
                loadConfig.setFormatOptions(csvOptions.build());

                JobId jobId = JobId.of(UUID.randomUUID().toString());
                Job loadJob = bigquery.create(JobInfo.newBuilder(loadConfig.build()).setJobId(jobId).build());

                try {
                        // ジョブを実行し、終了を待つ
                        System.out.printf("Starting job %s\n", jobId.getJob());
                        loadJob = loadJob.waitFor();

                        // BigQuery salesテーブルからデータを抽出するジョブのエラーチェック
                        if (loadJob == null) {
                                throw new RuntimeException("Job no longer exists");
                        } else if (loadJob.getStatus().getError() != null) {
                                throw new RuntimeException(loadJob.getStatus().getError().toString());
                        }

                        // ジョブの完了メッセージを出力
                       System.out.println("Job Finished.");

                } catch (Exception e) {
                       System.out.println(e);
                }
       }

}

 

5) BigQueryのsalesテーブルのデータを、JSONファイル(sales.json)に出力する、SelectFromSales.javaは以下の通り

package test;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.UUID;

import org.json.JSONException;
import org.json.JSONObject;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;

public class SelectFromSales {

         public static void main(String[] args) {

                 // BigQuery salesテーブルからデータを抽出
                BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
                QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(
                                "SELECT "
                                                + " sale_date "
                                                + ", product_name "
                                                + ", place_name "
                                                + ", sales_amount "
                                                + "FROM `(プロジェクトID).bigquery_purin_it.sales` ")
                                .setUseLegacySql(false)
                                .build();

                // BigQuery salesテーブルからデータを抽出するジョブを生成
                JobId jobId = JobId.of(UUID.randomUUID().toString());
                Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

                try {
                        // BigQuery salesテーブルからデータを抽出するジョブの終了を待機
                        queryJob = queryJob.waitFor();

                        // BigQuery salesテーブルからデータを抽出するジョブのエラーチェック
                        if (queryJob == null) {
                                throw new RuntimeException("Job no longer exists");
                        } else if (queryJob.getStatus().getError() != null) {
                                throw new RuntimeException(queryJob.getStatus().getError().toString());
                        }

                        // 抽出した結果をJSONファイルに出力
                        TableResult result = queryJob.getQueryResults();
                        new SelectFromSales().putJsonFile(result);
                        System.out.println("JSONファイル出力完了");

                } catch (Exception e) {
                        System.out.println(e);
                }
	}

        private void putJsonFile(TableResult result)
                        throws JSONException, FileNotFoundException, IOException {

                // 抽出した結果をJSON形式に変換
                JSONObject jsonObj = new JSONObject();
                int idx = 0;
                for (FieldValueList row : result.iterateAll()) {
                        JSONObject jsonObjTmp = new JSONObject();
                        jsonObjTmp.put("sale_date", row.get("sale_date").getStringValue());
                        jsonObjTmp.put("product_name", row.get("product_name").getStringValue());
                        jsonObjTmp.put("place_name", row.get("place_name").getStringValue());
                        jsonObjTmp.put("sales_amount", row.get("sales_amount").getLongValue());

                        idx += 1;
                        jsonObj.put(String.valueOf(idx), jsonObjTmp);
                 }

                // JSONファイルを出力
                String jsonFilePath = "c:\\work\\gcp\\";
                String jsonFileName = "sales.json";
                PrintWriter pw = new PrintWriter(jsonFilePath + jsonFileName, "utf-8");
                pw.write(jsonObj.toString(4));
                pw.close();
       }

}

 

なお、mavenプロジェクトの作成・実行等については、以下の記事を参照のこと。

www.purin-it.work

 

作成したJava1.8の実行結果

作成したJava1.8について、maven installにより作成された「select-from-sales-0.0.1-SNAPSHOT-jar-with-dependencies.jar」をコマンドプロンプト上で実行すると、以下のようになる。

f:id:purin_it:java_result

 

なお、GCS上に配置したファイル、BigQuery上のテーブル、出力されたJSONファイルについては、以下の記事の通り。

www.purin-it.work