PR

マイクロサービスアーキテクチャの基礎知識まとめ

応用

こんにちは。Tomoyuki(@tomoyuki65)です。

Go言語(Golang)はマイクロサービス開発でよく使われたりするため、マイクロサービスアーキテクチャの基礎知識については知っておいた方がいいでしょう。

この記事では、そんなマイクロサービスアーキテクチャの基礎知識についてまとめます。

 

マイクロサービスアーキテクチャの基礎知識まとめ

マイクロサービスアーキテクチャとは、一つの大きなアプリケーションを独立した小さな「サービス」の集合体として構築する設計手法のことです。

簡単にいうと、「疎結合で自律的なサービスの集まり」がマイクロサービスになります。

 

モノリシックアーキテクチャとの比較

項目 モノリシックアーキテクチャ マイクロサービスアーキテクチャ
構造 すべての機能が単一のアプリケーション(1つのコードベース、実行単位)として統合されている。コンポーネント間は密結合になりやすい。 特定のビジネス機能を持った独立した小さなサービスの集合体。各サービスはネットワーク(APIなど)を通じて連携する疎結合な構造。
デプロイ アプリケーション全体を一度にまとめてデプロイする。一部の小さな変更であっても、全体のビルドと再デプロイが必要。 各サービスを独立してデプロイ可能。他のサービスに影響を与えずに、頻繁かつ高速なリリースサイクルを実現しやすい。
データベース 通常、システム全体で単一の共有データベースを使用する。データの一貫性は保ちやすいが、変更の影響範囲が大きい。 原則として各サービスが独自のデータベースを所有する。データの独立性は高いが、サービス間のデータ整合性(結果整合性など)の管理が複雑になる。
技術 基本的に単一の技術スタック(プログラミング言語、フレームワークなど)で統一される。新しい技術への移行ハードルが高い。 サービスごとに最適な技術スタックを選択可能(ポリグロット)。要件に合わせて多様な言語やDBを使い分けられるが、管理コストは増える。
障害耐性 一部の機能(メモリリークやバグなど)の障害がシステム全体に波及しやすく、全停止のリスクが高い 適切に設計(障害の隔離など)されていれば、1つのサービスがダウンしてもシステム全体の停止を防げる。ただし、障害箇所の特定は複雑になりがち。
開発チーム 大規模なチーム、または機能別チームが同じコードベースを共有する傾向がある。チーム間の調整・コミュニケーションコストが高くなりやすい。 サービスごとに責任を持つ小規模で自律的なチーム(2-pizza team等)を編成しやすい。チーム内で完結して開発を進めやすい。

 

メリット

1. 俊敏性の向上

  • サービスが小さいので、開発・テスト・デプロイが迅速に行える
  • CI/CDとの相性が非常に良い

 

2. スケーラビリティ

  • アプリケーション全体ではなく、負荷が高いサービス(例:商品検索サービスなど)だけをスケールアウト(サーバー台数を増やす)でき、リソースを効率的に使える

 

3. 技術的自由度

  • サービスごとに最適なプログラミング言語やフレームワーク、データベースを選択できる

 

4. 障害耐性

  • あるサービスに障害が発生してもその影響を局所化でき、システム全体がダウンするのを防げる

 

5. チームの自立性

  • 各チームが自分たちのサービスに責任を持つことで、開発のオーナーシップと生産性が向上する

 

デメリット

1. 分散システムとしての複雑さ

  • サービス間の通信(ネットワーク遅延、障害)、データの整合性をどう保つか、といったモノリシックでは考えなくてよかった問題が発生する

 

2. 運用・管理の複雑さ

  • デプロイするサービスの数は増え、監視・ロギング・デバッグが複雑になる

 

3. データ整合性の担保が難しい

  • データベースがサービスごとに分散しているため、複数のサービスにまたがるトランザクション管理が難しくなる

※これを解決する方法として、SAGAパターンというのがある

 

4. テストの難しさ

  • 複数のサービスが連携する機能のE2Eテストは、モノリシックより複雑になる

 

マイクロサービス化するタイミングについて

プロジェクトの初期段階や、小規模なアプリケーション、そして小規模チームなどにおいてはデメリットが大きいため、基本的には以下のようなタイミングで検討すべきです。

 

・アプリケーションが巨大で複雑になりすぎた時

モノリシックなコードベースの変更が困難になり、デプロイに時間がかかりすぎるようになったなど。

 

・開発チームの規模が大きい時

複数のチームが同じコードベースを触ることでコンフリクトが多発し、開発速度が低下するようになったなど。

 

・機能ごとにスケーラビリティの要求が大きく異なる時

例えば「認証機能の負荷は低いが、動画配信機能は非常に高い負荷がかかる。」というようなケースなど。

 

・ビジネスの要求として新機能を迅速にリリースする必要がある時

機能の増加によってコードが巨大化し、新機能を迅速にリリースすることが困難になったなど。

 

マイクロサービスを作る際の設計手法について

上記でも書きましたが、マイクロサービスには複数のサービスにまたがるデータ整合性を保つのが難しいというデメリットがあるため、設計手法には注意が必要です。

その解決方法としてよく使われているのが、SAGAパターンと呼ばれる設計手法になります。

具体的には、マイクロサービスなど複数のシステムにまたがる一連の処理(分散トランザクション)を、各サービスがローカルトランザクションを順番に実行し、失敗した場合は補償トランザクションを実行して整合性を保つ設計パターンです。

例えばECサイトで「注文」という一連の処理を例とすると、「注文サービス→決済サービス→在庫サービス」のように複数のサービスにまたがりますが、もし在庫引き当てで失敗したら、決済を取り消し、注文をキャンセルする必要があるため、それらに対応するのがSAGAパターンの役割になります。

 

SAGAパターンにおける2種類の実装方法

1. コレオグラフィ

 

※マイクロサービスでエラーが発生した場合、補償トランザクションを実行する。

 

・概要

  • 中央の司令塔(オーケストレーター)が存在せず、各サービスがイベントを検知した際に自律的にアクションを実行します。
  • 各々が自分の役割とタイミングを知っていて、連鎖的に処理が進みます。

 

・仕組みの例

  1. 注文サービスが注文を受け、「注文作成済み」イベントを発行する。
  2. 決済サービスが「注文作成済み」イベントを検知し、それに対する決済処理を実行する。成功したら「決済成功」イベントを発行する。
  3. 在庫サービスが「決済成功」イベントを検知し、在庫の引き当てを行う。

 

・メリット

  • サービス間の結合度が低い(疎結合)
  • 単一障害点がない

 

・デメリット

  • 全体のワークフローがどのようになっているか把握しづらい(ロジックが各サービスに分散するため)
  • サービスが増えると、どのサービスがどのイベントを処理しているかの関係が複雑になり、追跡が困難になる

 

2. オーケストレーション

 

・概要

  • オーケストレーターという中央のサービスが、全体のワークフローを管理・実行します。
  • 「指揮者(オーケストレーター)」が各楽器(サービス)に指示を出すイメージです。

 

・仕組みの例

  1. クライアントがオーケストレーターに注文処理をリクエスト
  2. オーケストレーターが注文サービスに「注文作成」を指示する。
  3. 成功したら、次に決済サービスに「決済実行」を指示する。
  4. もし在庫サービスへの指示が失敗した場合、オーケストレーターが決済サービスに「返金(補償トランザクション)」を、注文サービスに「注文キャンセル(補償トランザクション)」を指示する。

 

・メリット

  • ワークフローのロジックが一箇所に集約されているため、処理の流れが分かりやすい。
  • エラーハンドリングや補償トランザクション管理がしやすい。

 

・デメリット

  • オーケストレーターが単一障害点になる可能性がある
  • オーケストレーターにビジネスロジックが集中しすぎると、そこがボトルネックになったり、「賢いハブと貧弱なエンドポイント」というアンチパターンに陥る危険がある

※ハイブリッド型(原則はコレオグラフィ + 必要に応じてオーケストレーション)が、現代のマイクロサービス設計で多く採用されている現実的なスタイル

 

スポンサーリンク

Go言語(Golang)とEMQXでコレオグラフィを試す

次にSAGAパターンのコレオグラフィをローカル環境で試してみますが、非同期処理のトリガー部分にはなんらかのメッセージングサービスが必要になるため、今回はローカル環境でも試しやすいEMQXというOSSを利用してみます。

※EMQXは、MQTT(Message Queuing Telemetry Transport)プロトコルに対応した高性能・高スケーラビリティのオープンソースMQTTブローカー(メッセージサーバー)です。

※ローカルで簡単に使える前提で、Google Cloud Pub/Subと同じようなメッセージングサービス(Push型、外部API実行のWebhook機能有り、エラー時のリトライ処理も可能なもの)のOSSを探しましたが、近しいものがEMQXぐらいしかなかったのでこれを使いました。他にはApache Pulsar(+ HTTP sink connector)などがあるっぽいです。それ以外はPull型だったり、Webhook機能の導入ができないか、導入するのが難しいのしかありませんでした。

 

まずは以下のコマンドを実行し、各種ファイルを作成します。

$ mkdir saga-choreography && cd saga-choreography
$ touch compose.yml

※以降ではDockerを利用して環境構築を行います。

 

次に作成したファイルを以下のように記述します。

・「compose.yml」

services:
  # メッセージングサービス
  emqx:
    image: emqx/emqx:6.0.0
    container_name: emqx
    ports:
      - "1883:1883"   # MQTT
      - "8883:8883"   # MQTT over SSL
      - "8083:8083"   # MQTT over WebSocket
      - "8084:8084"   # MQTT over Secure WebSocket
      - "18083:18083" # ダッシュボード用HTTPポート
    environment:
      - EMQX_NAME=emqx
    volumes:
      - emqx_data:/opt/emqx/data
      - emqx_log:/opt/emqx/log
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:18083"]
      interval: 5s
      timeout: 5s
      retries: 5
volumes:
  emqx_data:
  emqx_log:

※EMQXのバージョンは2025年10月時点で最新の「6.0.0」を使います。

 

次に以下のコマンドを実行し、コンテナを起動します。

$ docker compose up -d

 

次にブラウザで「http://localhost:18083」を開きます。

 

次にID「admin」、初期パスワード「public」でログインします。

 

次にパスワード変更画面がでますが、必要がないので下にある「Skip」をクリックします。

 

これでEMQXの画面にログインが完了です。

 

各種トピックに対するWebhook設定

マイクロサービスB用の設定

次に後述で別途作成予定のマイクロサービスBのAPIを起動させる用のトピックを「topic/service-b」、それが失敗した際に再実行させる用のトピックを「topic/service-b/retry1」、リトライ失敗時にDLQ(エラーで処理できなかったメッセージを一時的に保存するためのメッセージキュー)にするためのトピックを「dlq/service-b」とし、それらに必要な各種設定を行います。

 

まずはメニュー「Integration > Connectors」を開き、画面右上の「+ Create」をクリックします。

 

次にコネクタタイプに「HTTP Server」を選択します。

 

次に画面下の「Next」をクリックします。

 

次にConnector Nameに「connector_service-b」、Descriptionに「service-b用のコネクタ設定」、URLに「http://host.docker.internal:8082/service-b」を設定し、画面下の「Create」をクリックします。

 

次に続けてルールを作成するかのポップアップが表示されるので「Create Rule」をクリックします。

 

次にルールに付けるアクション設定画面が開くので、Nameに「action_service-b」、Descriptionに「service-b用のアクション設定」を設定します。

 

次に画面下の「+Add Fallback Action」をクリックします。

 

次にType of Actionに「Republish」を選択、Topicに「topic/service-b/retry1」、Payloadに「${payload}」を設定し、画面下の「Add」をクリックします。

※サービスBへのメッセージ送信に失敗したらリトライ用のトピック「topic/service-b/retry1」にPayload部分だけを送ります。(EMQXでリトライ設定をしたい場合はこのようなやり方が必要でした)

 

次に画面下の「Create」をクリックします。

 

次にルール名を「rule_service-b」、説明を「サービスB用のルール設定」、SQLのFROM句でトピック「topic/service-b」を設定します。

※このSQLの条件にて対象のトピックにデータがあったら実行されるような感じです。

 

次に画面下の「Save」をクリックします。

 

これでサービスB用のルール設定が完了しました。

 

次にサービスBのリトライ設定用のルールを作成するため、画面右上の「+ Create」をクリックします。

 

次にルール名を「rule_service-b_retry1」、説明を「サービスBのリトライ用設定」、SQLのFROM句でトピック「topic/service-b/retry1」を設定し、アクション追加の「+ Add Action」をクリックします。

 

次にType of Actionに「HTTP Server」を選択します。

 

次にNameに「action_service-b_retry1」、Connectorsに「connector_service-b」を選択、Descriptionに「サービスBのリトライ用のアクション設定」を設定します。

 

次に画面下の「+ Add Fallback Action」をクリックします。

 

次にType of Actionに「Republish」、Topicに「dlq/service-b」を設定し、画面下の「Add」をクリックします。

 

次に画面下の「Create」をクリックします。

 

次に画面下の「Save」をクリックします。

 

これでサービスBのリトライ用のルール設定も完了しました。

 

マイクロサービスAの補償トランザクション用の設定

次に上記と同様に、後述で別途作成予定のマイクロサービスAの補償トランザクションのAPIを起動させる用のトピックを「topic/service-a/compensate」、それが失敗した際に再実行させる用のトピックを「topic/service-a/compensate/retry1」、リトライ失敗時にDLQにするためのトピックを「dlq/service-a/compensate」とし、それらに必要な各種設定を行います。

 

まずはメニュー「Integration > Connectors」を開き、画面右上の「+ Create」をクリックします。

 

次にコネクタタイプに「HTTP Server」を選択し、画面下の「Next」をクリックします。

 

次にConnector Nameに「connector_service-a_compensate」、Descriptionに「service-aの補償トランザクション用のコネクタ設定」、URLに「http://host.docker.internal:8081/service-a/compensate」を設定し、画面下の「Create」をクリックします。

 

次に続けてルールを作成するかのポップアップが表示されるので「Create Rule」をクリックします。

 

次にルールに付けるアクション設定画面が開くので、Nameに「action_service-a_compensate」、Descriptionに「サービスAの補償トランザクション用のアクション設定」を設定します。

 

次に画面下の「+ Add Fallback Action」をクリックします。

 

次にType of Actionに「Republish」を選択、Topicに「topic/service-a/compensate/retry1」、Payloadに「${payload}」を設定し、画面下の「Add」をクリックします。

 

次に画面下の「Create」をクリックします。

 

次にルール名を「rule_service-a_compensate」、説明を「サービスAの補償トランザクション用のルール設定」、SQLのFROM句でトピック「topic/service-a/compensate」を設定します。

 

次に画面下の「Save」をクリックします。

 

次にサービスAの補償トランザクションのリトライ設定用のルールを作成するため、画面右上の「+ Create」をクリックします。

 

次にルール名を「rule_service-a_compensate_retry1」、説明を「サービスAの補償トランザクションのリトライ用設定」、SQLのFROM句でトピック「topic/service-a/compensate/retry1」を設定し、アクション追加の「+ Add Action」をクリックします。

 

次にType of Actionに「HTTP Server」を選択、Nameに「action_service-a_compensate_retry1」、Connectorsに「connector_service-a_compensate」を選択、Descriptionに「サービスAの補償トランザクションのリトライ用のアクション設定」を設定します。

 

次に画面下の「+ Add Fallback Action」をクリックします。

 

次にType of Actionに「Republish」、Topicに「dlq/service-a/compensate」を設定し、画面下の「Add」をクリックします。

 

次に画面下の「Create」をクリックします。

 

次に画面下の「Save」をクリックします。

 

これでEMQXの各種設定が完了です。

 

Go言語でマイクロサービスAとBの簡単なAPIを作成する

次にGo言語で最初に起動させるAPIをマイクロサービスAとして作成し、EMQXのトピックにメッセージを送信して非同期実行させるAPIをマイクロサービスBとして作成します。

まずは以下のコマンドを実行し、各種ファイルを作成します。

$ mkdir microservices-a
$ touch microservices-a/Dockerfile microservices-a/main.go
$ mkdir microservices-b
$ touch microservices-b/Dockerfile microservices-b/main.go

 

次に作成したファイルをそれぞれ以下のように記述します。

・「microservices-a/Dockerfile」

FROM golang:1.25-alpine3.22

WORKDIR /go/src

COPY ./microservices-a .

# go.modがあれば依存関係をインストール
RUN if [ -f ./go.mod ]; then \
      go install; \
    fi

# 開発用のライブラリをインストール
RUN go install github.com/air-verse/air@v1.63.0
RUN go install honnef.co/go/tools/cmd/staticcheck@latest

EXPOSE 8081

※goのバージョンは「1.25」、ホットリロード用のairのバージョンは「1.63」を使います。

 

・「microservices-a/main.go」

package main

import (
    "fmt"
)

func main() {
    fmt.Println("Hello World !!")
}

 

・「microservices-b/Dockerfile」

FROM golang:1.25-alpine3.22

WORKDIR /go/src

COPY ./microservices-b .

# go.modがあれば依存関係をインストール
RUN if [ -f ./go.mod ]; then \
      go install; \
    fi

# 開発用のライブラリをインストール
RUN go install github.com/air-verse/air@v1.63.0
RUN go install honnef.co/go/tools/cmd/staticcheck@latest

EXPOSE 8082

※goのバージョンは「1.25」、ホットリロード用のairのバージョンは「1.63」を使います。

 

・「microservices-b/main.go」

package main

import (
    "fmt"
)

func main() {
    fmt.Println("Hello World !!")
}

 

次にファイル「compose.yml」を以下のように修正します。

services:
  # メッセージングサービス
  emqx:
    image: emqx/emqx:6.0.0
    container_name: emqx
    ports:
      - "1883:1883"   # MQTT
      - "8883:8883"   # MQTT over SSL
      - "8083:8083"   # MQTT over WebSocket
      - "8084:8084"   # MQTT over Secure WebSocket
      - "18083:18083" # ダッシュボード用HTTPポート
    environment:
      - EMQX_NAME=emqx
    volumes:
      - emqx_data:/opt/emqx/data
      - emqx_log:/opt/emqx/log
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:18083"]
      interval: 5s
      timeout: 5s
      retries: 5
  # マイクロサービスA
  ms-a:
    container_name: ms-a
    build:
      context: .
      dockerfile: ./microservices-a/Dockerfile
    command: air -c .air.toml
    volumes:
      - ./microservices-a:/go/src
    ports:
      - "8081:8081"
    tty: true
    stdin_open: true
    depends_on:
      emqx:
        condition: service_healthy
  # マイクロサービスB
  ms-b:
    container_name: ms-b
    build:
      context: .
      dockerfile: ./microservices-b/Dockerfile
    command: air -c .air.toml
    volumes:
      - ./microservices-b:/go/src
    ports:
      - "8082:8082"
    tty: true
    stdin_open: true
    depends_on:
      emqx:
        condition: service_healthy
volumes:
  emqx_data:
  emqx_log:

 

次に以下のコマンドを実行し、コンテナの再ビルドをします。

$ docker compose down
$ docker compose build --no-cache

 

次に以下のコマンドを実行し、goの初期化をします

$ docker compose run --rm ms-a go mod init ms-a
$ docker compose run --rm ms-a air init
$ docker compose run --rm ms-b go mod init ms-b
$ docker compose run --rm ms-b air init

 

次にファイル「microservices-a/main.go」、「microservices-b/main.go」を以下のように修正します。

・「microservices-a/main.go」

package main

import (
    "encoding/json"
    "fmt"
    "net/http"
    "time"

    "github.com/labstack/echo/v4"
    mqtt "github.com/eclipse/paho.mqtt.golang"
)

// 現在日時取得用関数
func time_now_jst() string {
    loc, _ := time.LoadLocation("Asia/Tokyo")
    now := time.Now().In(loc)
    return now.Format("2006-01-02 15:04:05")
}

func main() {
    /********************
    * EMQXへの接続
    ********************/
    // EMQXブローカーの接続情報を設定
    // HOSTはcompose.ymlで定義した「emqx」
    broker := "tcp://emqx:1883"
    // クライアントIDは一意の値
    clientID := "emqx_ms-a"

    // MQTTクライアントのオプションを設定
    opts := mqtt.NewClientOptions()
    opts.AddBroker(broker)
    opts.SetClientID(clientID)

    // MQTTクライアントを作成
    client := mqtt.NewClient(opts)
    defer client.Disconnect(250)

    // ブローカーに接続
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        fmt.Printf("EMQXブローカーの接続に失敗しました。[%v]\n", token.Error())
    }
    fmt.Println("EMQXブローカーに接続しました。")

    /********************
    * echoのルーター設定
    ********************/
    e := echo.New()

    // サービスAを実行するAPIのサンプル
    e.POST("/service-a", func(c echo.Context) error {
        // ログ出力
        msg := "サービスAを実行しました!"
        fmt.Println(msg)

        // EMQXのトピック「topic/service-b」へのメッセージ設定
        topic := "topic/service-b"
        timestamp := time_now_jst()
        data := map[string]string{
            "event": "service-b",
            "text": "マイクロサービスBを起動",
            "timestamp": timestamp,
        }
        payload, err := json.Marshal(data)
        if err != nil {
            msg := fmt.Sprintf("json.Marshal処理に失敗しました。: %v", err)
            return echo.NewHTTPError(http.StatusInternalServerError, msg)
        }

        // メッセージ送信
        token := client.Publish(topic, 0, false, payload)
        token.Wait()
        if token.Error() != nil {
            msg := fmt.Sprintf("EMQXへのメッセージ送信に失敗しました。: %v", err)
            return echo.NewHTTPError(http.StatusInternalServerError, msg)
        }

        // レスポンス結果の設定
        res := map[string]string{
            "message": msg,
        }

        return c.JSON(http.StatusOK, res)
    })

    // サービスAの補償トランザクションを実行するAPIのサンプル
    e.POST("/service-a/compensate", func(c echo.Context) error {
        // ログ出力
        msg := "サービスAの補償トランザクションを実行しました!"
        fmt.Println(msg)

        // レスポンス結果の設定
        res := map[string]string{
            "message": msg,
        }

        return c.JSON(http.StatusOK, res)
    })

    e.Logger.Fatal(e.Start(":8081"))
}

 

・「microservices-b/main.go」

package main

import (
    "encoding/json"
    "fmt"
    "net/http"
    "time"

    "github.com/labstack/echo/v4"
    mqtt "github.com/eclipse/paho.mqtt.golang"
)

// EMQXからのメッセージの型
type Message struct {
    Event    string `json:"event"`
    Payload  string `json:"payload"`
    Clientid string `json:"clientid"`
    Topic    string `json:"topic"`
}

// メッセージとして送信するPayloadの型
type Payload struct {
    Event     string `json:"event"`
    Text      string `json:"text"`
    Timestamp string `json:"timestamp"`
}

// 現在日時取得用関数
func time_now_jst() string {
loc, _ := time.LoadLocation("Asia/Tokyo")
now := time.Now().In(loc)
return now.Format("2006-01-02 15:04:05")
}

func main() {
    /********************
    * EMQXへの接続
    ********************/
    // EMQXブローカーの接続情報を設定
    // HOSTはcompose.ymlで定義した「emqx」
    broker := "tcp://emqx:1883"
    // クライアントIDは一意の値
    clientID := "emqx_ms-b"

    // MQTTクライアントのオプションを設定
    opts := mqtt.NewClientOptions()
    opts.AddBroker(broker)
    opts.SetClientID(clientID)

    // MQTTクライアントを作成
    client := mqtt.NewClient(opts)
    defer client.Disconnect(250)

    // ブローカーに接続
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        fmt.Printf("EMQXブローカーの接続に失敗しました。[%v]\n", token.Error())
    }
    fmt.Println("EMQXブローカーに接続しました。")

    /********************
    * 結果切り替えフラグ
    ********************/
    // 0:   正常終了
    // 1:   エラーによる補償トランザクション処理
    // etc: 異常終了
    flag := 0

    /********************
    * echoのルーター設定
    ********************/
    e := echo.New()

    // サービスBを実行するAPIのサンプル
    e.POST("/service-b", func(c echo.Context) error {
        // リクエストパラメータの取得処理
        var m Message
        if err := c.Bind(&m); err != nil {
            msg := fmt.Sprintf("リクエストボディが不正です。: %v", err)
            return echo.NewHTTPError(http.StatusBadRequest, msg)
        }

        var p Payload
        if err := json.Unmarshal([]byte(m.Payload), &p); err != nil {
            msg := fmt.Sprintf("リクエストボディのPayloadが不正です。: %v", err)
            return echo.NewHTTPError(http.StatusBadRequest, msg)
        }

        // フラグによって処理結果を切り替える
        switch flag {
        case 0:
            // ログ出力
            msg := "サービスBを実行しました!"
            fmt.Printf("%s [event:%s, text:%s, timestamp:%s] \n", msg, p.Event, p.Text, p.Timestamp)

            // レスポンス結果の設定
            res := map[string]string{
                "message": msg,
            }

            return c.JSON(http.StatusOK, res)
        case 1:
            // ログ出力
            msg := "マイクロサービスAの補償トランザクションを起動!"
            fmt.Printf("%s [event:%s, text:%s, timestamp:%s] \n", msg, p.Event, p.Text, p.Timestamp)

            // EMQXのトピック「topic/service-a/compensate」へのメッセージ設定
            topic := "topic/service-a/compensate"
            timestamp := time_now_jst()
            data := map[string]string{
                "event": "service-a/compensate",
                "text": "マイクロサービスAの補償トランザクションを起動",
                "timestamp": timestamp,
            }
            payload, err := json.Marshal(data)
            if err != nil {
                msg := fmt.Sprintf("json.Marshal処理に失敗しました。: %v", err)
                return echo.NewHTTPError(http.StatusInternalServerError, msg)
            }

            // メッセージ送信
            token := client.Publish(topic, 0, false, payload)
            token.Wait()
            if token.Error() != nil {
                msg := fmt.Sprintf("EMQXへのメッセージ送信に失敗しました。: %v", err)
                return echo.NewHTTPError(http.StatusInternalServerError, msg)
            }

            // レスポンス結果の設定
            res := map[string]string{
                "message": msg,
            }

            return c.JSON(http.StatusOK, res)
        default:
            // ログ出力
            msg := fmt.Sprintf("サービスBで予期せぬエラーが発生しました! [event:%s, text:%s, timestamp:%s] \n", p.Event, p.Text, p.Timestamp)
            fmt.Println(msg)

            return echo.NewHTTPError(http.StatusInternalServerError, msg)
        }

    })

    e.Logger.Fatal(e.Start(":8082"))
}

 

次に以下のコマンドを実行し、go.modを更新します。

$ docker compose run --rm ms-a go mod tidy
$ docker compose run --rm ms-b go mod tidy

 

次に以下のコマンドを実行し、コンテナの再ビルドおよび起動します。

$ docker compose down
$ docker compose build --no-cache
$ docker compose up -d

 

テスト

次にテストをするため、まずはEMQXのメニュー「Diagnostics Tools > WebSocket Client」を開き、対象のトピックをSubscriptionに追加して検知できるようにします。

画面を開いたら画面右側の「Connect」をクリックして接続します。

 

次にテストで確認するために画面下にあるSubscriptionにあるTopicに対象のトピックを入力し、「Subscribe」をクリックして登録します。

登録するトピックは「topic/service-b」、「topic/service-b/retry1」、「dlq/service-b」、「topic/service-a/compensate」、「topic/service-a/compensate/retry1」、「dlq/service-a/compensate」です。

 

Subscriptionに登録したトピックにメッセージが送信されると、画面左下のReceivedに表示されるため、テストとして確認できるようになります。

 

次にPostman(API実行用ツール)を使ってマイクロサービスAのAPIを実行します。

 

次にDocker DesktopからマイクロサービスBのコンテナ「ms-b」画面を開き、サービスBが実行されたログ出力がされていればOKです。

 

次にEMQX画面に戻ってReceivedを確認し、「topic/service-b」に対してメッセージを受信していればOKです。

※マイクロサービスAのAPIでEMQXにメッセージを送り、対象のトピック「topic/service-b」を検知したらWebhookの設定でマイクロサービスBのAPIを起動させているため、このような動作になります。

 

次にファイル「microservices-b/main.go」フラグを「1」に変更して試してみます。

※フラグ「1」は、マイクロサービスBでエラーが発生し、マイクロサービスAの補償トランザクションを実行させるパターンです。

 

API実行前にEMQXのRecevedのゴミ箱ボタンで結果をクリアしておきます。

 

次に再度マイクロサービスAのAPIを実行します。

 

次にDocker DesktopからマイクロサービスBのコンテナ「ms-b」画面を開き、マイクロサービスAの補償トランザクションを起動させたログ出力がされていればOKです。

 

次にDocker DesktopからマイクロサービスAのコンテナ「ms-a」画面を開き、サービスAの補償トランザクションが実行されたログ出力がされていればOKです。

 

次にEMQX画面に戻ってReceivedを確認し、「topic/service-b」と「topic/service-a/compensate」に対してメッセージを受信していればOKです。

 

次にファイル「microservices-b/main.go」フラグを「2」に変更して試してみます。

※フラグが0と1以外の場合は、マイクロサービスBで予期せぬエラーが発生し、レスポンスのステータスコード「500」を返すパターンです。

 

API実行前にEMQXのRecevedのゴミ箱ボタンで結果をクリアしておきます。

 

次に再度マイクロサービスAのAPIを実行します。

 

次にDocker DesktopからマイクロサービスBのコンテナ「ms-b」画面を開き、サービスBで予期せぬエラーが発生したログ出力が2件あればOKです。

※失敗しても1回だけリトライさせる設定にしたため、ログが2件出力される。

 

次にEMQX画面に戻ってReceivedを確認し、「topic/service-b」と「topic/service-b/retry1」、「dlq/service-b」に対してメッセージを受信していればOKです。

※最終的にトピック「dlq/service-b」にメッセージが送られる。

 

このようにマイクロサービスにおけるSAGAパターンのコレオグラフィについては、メッセージングサービスを使いながら各種マイクロサービスを非同期実行させることになります。

尚、もし最終的にDLQにメッセージが送信された場合、それを確認したりするのに別途仕組みや運用を構築することになるので覚えておきましょう。

 

スポンサーリンク

本番環境にGoogle CloudかAWSを使う場合のインフラ構成について

上記ではローカル開発用のメッセージングサービスとしてEMQXというOSSを利用しましたが、本番環境ではGoogle CloudかAWSにあるサービスを利用して構築することになると思います。

例えばGoogle CloudにはCloud Pub/Subというメッセージングサービスがあり、それを使ってSAGAパターンのコレオグラフィを実現する際には主に以下のようなインフラ構成になります。

※Cloud Pus/SubからCloud Functions(イベント駆動型のサーバーレス関数実行サービス)を実行し、そこからマイクロサービスを呼び出すことも可能です。

 

また、もしAWSを使う場合は複数のサービスを組み合わせて構築することになるため、以下のようにEventBridge(イベントバスサービス)、SQS(メッセージキューイングサービス)、Lambda(イベント駆動型のサーバーレスコンピューティングサービス)を使うようなインフラ構成になります。

※SQSがPull型(受信側がイベントが発行されているかをチェックする)なので、Push型(イベント送信によって対象の処理を実行させる)のように動作させるにはLambdaを紐付けて実行させる必要があります。また、EventBridgeから直接Lambdaや外部APIを実行させることもできますが、運用面なども考慮すると一つのやり方(EventBridge→SQS→Lambda)に統一した方がいいです。

 

このように利用するクラウドサービスのメッセージングサービスによって作り方が若干変わったりするので注意しましょう。

 

オーケストレーションを実現する方法について

上記ではマイクロサービスの基本となるコレオグラフィについて解説しましたが、もしSAGAパターンのオーケストレーションを実現させたい場合は、別途オーケストレーション用のマイクロサービスを構築する必要が出てきます。

ただし、ワークフローを構築するような各種マイクロサービスを制御するオーケストレーション用のマイクロサービスを独自で構築するのは大変なので、なんらかのワークフローエンジンサービスを使うのが推薦されます。

例えばオープンソースの分散ワークフローエンジンに「Temporal」というのがあったりするので、オーケストレーションを実現させたい場合はこういったワークフローエンジンを使って構築することになると思います。

※尚、インフラにAWSを使っている場合、Step Functions(AWSの各サービスを視覚的に連携させて、分散アプリケーションのワークフローを構築・管理できるサーバーレスなサービス)でもワークフローの構築が可能ですが、柔軟性はTemporalの方が優っていて、将来的に運用が辛くなる可能性があるっぽいので、技術選定をする際は注意しましょう。

 

スポンサーリンク

最後に

今回はマイクロサービスアーキテクチャの基礎知識についてまとめました。

中規模以上のプロジェクトにならないと携わることは少ないと思いますが、Go言語を扱っていればいずれ携わる可能性もあると思うので、基本的な部分はしっかり抑えておくといいでしょう。

今回はローカル環境でサクッと試すために、あまり馴染みがないEMQXを使ったコレオグラフィを試しましたが、よければぜひ参考にしてみて下さい。

 

この記事を書いた人
Tomoyuki

SE→ブロガーを経て、現在はWeb系エンジニアをしています!

Tomoyukiをフォローする
応用
スポンサーリンク
Tomoyukiをフォローする

コメント

タイトルとURLをコピーしました