ITエンジニアとして経験・学習したこと

ITエンジニアとして経験したり学習したことを忘れないよう、書いていきたいと思います。少しでも皆様のお役に立てれば幸いです。

GCP Cloud Pub/Subを使ってみた(4)

今回も引き続き、Cloud Pub/Subについて述べる。

今回は、2台の仮想マシン上でメッセージの送受信が行えるようなプログラムの内容について述べる。

 

前提条件

GCP(Google Cloud Platform)のアカウントが有り、下記記事の手順に従って、GCP上でDebian GNU/Linux搭載済の2台の仮想マシンを作成済であること
www.purin-it.work

 

また、以下の記事での環境構築を実施済であること

www.purin-it.work

 

さらに、以下の記事の「JDK1.8のインストール」「Mavenのインストール」が実施済であること

www.purin-it.work

また、MavenによるJavaプログラムの作成・実行手順も、上記記事を参照のこと。

 

その他、下記記事のトピックとサブスクリプションの作成が完了していること

www.purin-it.work

 

やってみたこと

  1. 2台の仮想マシンを起動
  2. サーバー側でのJavaプログラムの作成
  3. クライアント側でのJavaプログラムの作成
  4. 実行確認

 

2台の仮想マシンを起動

GCP上でDebian GNU/Linux搭載済の2台の仮想マシンを起動する。起動後の仮想マシンは以下の通り。

f:id:purin_it:exec_machine

なお、上記仮想マシンのうち、今後は「test-linux-vm-2」をサーバー側とし、「test-linux-vm」をクライアント側とする。

 

サーバー側でのJavaプログラムの作成

サーバー側の仮想マシン「test-linux-vm-2」にログイン後、実施した内容は以下の通り。

 

1) mavenコマンドを利用して、use-pubsub-serverというMavenプロジェクトを作成

f:id:purin_it:make_maven_server

 

2) pom.xmlを修正し、TestPubSubServer.javaというプログラムを追加

ソース修正後のフォルダ構成は以下の通り

f:id:purin_it:make_source_server

 

修正後のpom.xmlは以下の通り

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>test.pubsub</groupId>
  <artifactId>use-pubsub-server</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>use-pubsub-server</name>
  <url>http://maven.apache.org</url>

  <!-- 文字コードとJavaのバージョンの設定 -->
  <properties>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      <java.version>1.8</java.version>
  </properties>

  <!-- プラグインの設定 -->
  <build>
      <plugins>
          <!-- Javaファイルのコンパイラの設定 -->
          <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-compiler-plugin</artifactId>
              <version>3.6.1</version>
              <configuration>
                  <source>${java.version}</source>
                  <target>${java.version}</target>
              </configuration>
          </plugin>
          <!-- mavenプロジェクトのテスト時のエラー解消のための設定 -->
          <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-surefire-plugin</artifactId>
              <version>3.0.0-M3</version>
              <configuration>
                  <useSystemClassLoader>false</useSystemClassLoader>
              </configuration>
          </plugin>
          <!-- プロジェクトと依存するライブラリを1つにまとめる設定 -->
          <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.6</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <!-- メインプログラムとして実行するクラスの指定 -->
                            <mainClass>test.TestPubSubServer</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
          </plugin>
      </plugins>
  </build>

  <!-- ライブラリ依存関係の設定 -->
  <dependencies>
      <dependency>
          <!-- Junitの設定 -->
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>3.8.1</version>
          <scope>test</scope>
      </dependency>
      <dependency>
          <!-- PubSub用ライブラリを追加する設定 -->
          <groupId>com.google.cloud</groupId>
          <artifactId>google-cloud-pubsub</artifactId>
          <version>1.49.0</version>
      </dependency>
  </dependencies>

</project>

 

作成したTestPubSubServer.javaは以下の通り

package test;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;

import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;

public class TestPubSubServer {

        private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();
        private static final String TOPIC_ID = "my-topic";
        private static final String SUBSCRIPTION_ID = "my-sub";
        private static final int MSG_CNT = 5;
        private static final BlockingQueue<PubsubMessage> messages = new LinkedBlockingDeque<>();

        public static void main(String[] args) {

                try {
                        // 送信されたメッセージを受信する
                        doPull();

                } catch (Exception e) {
                        System.out.println(e);
                }

        }

        private static class MessageReceiverTest implements MessageReceiver {

                @Override
                public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
                        // メッセージを受信しACK(肯定応答)を返す
                        messages.offer(message);
                        consumer.ack();
                }

        }

        private static void doPull() throws InterruptedException {

                System.out.println("**** started doPull. ****");

                // 送信されたメッセージを取得するSubscriberを生成
                ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(PROJECT_ID, SUBSCRIPTION_ID);
                Subscriber subscriber = Subscriber.newBuilder(subscriptionName, new MessageReceiverTest()).build();

                // メッセージの受信を開始
                subscriber.startAsync().awaitRunning();
                int cnt = 0;
                while (true) {
                        // 受信したメッセージを出力
                        PubsubMessage message = messages.take();
                        System.out.println("Received Data: " + message.getData().toStringUtf8());
                        cnt++;

                        // 全てのメッセージを受信できたら終了する
                        if (cnt == MSG_CNT) {
                                break;
                        }
                }

                // Subscriberを終了
                if (subscriber != null) {
                        subscriber.stopAsync();
                }

                System.out.println("**** ended doPull. ****");
       }

}

なお、前回の「GCP Cloud Pub/Subを使ってみた(3)」で記載したTestPubSub.javaのうち、送信したメッセージを受信する部分のみ記載する形となる。

 

3) 「mvn compile」によりMavenプロジェクトをコンパイルした上で、「mvn package」によりJarファイルの作成を実行。実行後のフォルダ構成は以下の通り。

f:id:purin_it:make_jar_server

 

クライアント側でのJavaプログラムの作成

サーバー側の仮想マシンにログイン後、実施した内容は以下の通り。

 

1) mavenコマンドを利用して、use-pubsub-clientというMavenプロジェクトを作成

f:id:purin_it:make_maven_client

 

2) pom.xmlを修正し、TestPubSubClient.javaというプログラムを追加

ソース修正後のフォルダ構成は以下の通り

f:id:purin_it:make_source_client

 

修正後のpom.xmlは以下の通り

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>test.pubsub</groupId>
  <artifactId>use-pubsub-client</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>use-pubsub-client</name>
  <url>http://maven.apache.org</url>

  <!-- 文字コードとJavaのバージョンの設定 -->
  <properties>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      <java.version>1.8</java.version>
  </properties>

  <!-- プラグインの設定 -->
  <build>
      <plugins>
          <!-- Javaファイルのコンパイラの設定 -->
          <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-compiler-plugin</artifactId>
              <version>3.6.1</version>
              <configuration>
                  <source>${java.version}</source>
                  <target>${java.version}</target>
              </configuration>
          </plugin>
          <!-- mavenプロジェクトのテスト時のエラー解消のための設定 -->
          <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-surefire-plugin</artifactId>
              <version>3.0.0-M3</version>
              <configuration>
                  <useSystemClassLoader>false</useSystemClassLoader>
              </configuration>
          </plugin>
          <!-- プロジェクトと依存するライブラリを1つにまとめる設定 -->
          <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.6</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <!-- メインプログラムとして実行するクラスの指定 -->
                            <mainClass>test.TestPubSubClient</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
          </plugin>
      </plugins>
  </build>

  <!-- ライブラリ依存関係の設定 -->
  <dependencies>
      <dependency>
          <!-- Junitの設定 -->
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>3.8.1</version>
          <scope>test</scope>
      </dependency>
      <dependency>
          <!-- PubSub用ライブラリを追加する設定 -->
          <groupId>com.google.cloud</groupId>
          <artifactId>google-cloud-pubsub</artifactId>
          <version>1.49.0</version>
      </dependency>
  </dependencies>

</project>

 

作成したTestPubSubClient.javaは以下の通り

package test;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;

import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;

public class TestPubSubClient {

        private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();
        private static final String TOPIC_ID = "my-topic";
        private static final int MSG_CNT = 5;
        private static final BlockingQueue<PubsubMessage> messages = new LinkedBlockingDeque<>();

        public static void main(String[] args) {

               try {
                        // メッセージを送信する
                        doPublish();
			
                } catch (Exception e) {
                        System.out.println(e);
                }

        }

        private static void doPublish() throws Exception {

                System.out.println("**** started doPublish. ****");

                // トピック名を指定しメッセージ送信のためのPublisherを生成
               Publisher publisher = Publisher.newBuilder(ProjectTopicName.of(PROJECT_ID, TOPIC_ID)).build();

                // メッセージをPublisherに設定し送信(MSG_CNT数分繰り返す)
               for (int i = 0; i < MSG_CNT; i++) {
                        String message = "Hello PubSub " + i + " !!";
                        ByteString data = ByteString.copyFromUtf8(message);
                        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
                        publisher.publish(pubsubMessage);

                        // 送信したメッセージを出力
                        System.out.println("Sended Data : " + message);
                }

                // Publisherを終了
                if (publisher != null) {
                         publisher.shutdown();
                }

                System.out.println("**** ended doPublish. ****");
                System.out.println();

       }

}

なお、前回の「GCP Cloud Pub/Subを使ってみた(3)」で記載したTestPubSub.javaのうち、メッセージを送信する部分のみ記載する形となる。

 

3) 「mvn compile」によりMavenプロジェクトをコンパイルした上で、「mvn package」によりJarファイルの作成を実行。実行後のフォルダ構成は以下の通り。

f:id:purin_it:make_jar_client

 

実行確認

2台のマシン間での、Pub/Subによるメッセージ送受信を確認した結果は以下の通り。

 

1) サーバー側(test-linux-vm-2)のプログラムを起動

f:id:purin_it:result_two_1

 

2) クライアント側(test-linux-vm)のプログラムを起動

f:id:purin_it:result_two_2

 

3) サーバー側(test-linux-vm-2)で、クライアント側から送信されたメッセージが受信できることを確認

f:id:purin_it:result_two_3