PR

Go言語(Golang)のgRPCでDDD(ドメイン駆動設計)構成のバックエンドAPIを開発する方法まとめ

応用

こんにちは。Tomoyuki(@tomoyuki65)です。

直近にGo言語(Golang)のDDD(ドメイン駆動設計)に関する記事を書きましたが、gRPCにおいてもDDDで開発したいことがあると思います。

そこでこの記事では、Go言語のgRPCでDDD構成のバックエンドAPIを開発する方法についてまとめます。

 

【関連記事】

Go言語(Golang)のGinでDDD(ドメイン駆動設計)構成のバックエンドAPIを開発する方法まとめ
こんにちは。Tomoyuki(@tomoyuki65)です。これまではクリーンアーキテクチャを参考にしてGo言語のAPIの作り方を解説してきましたが、実務においてはDDD(ドメイン駆動設計)と呼ばれる方法で作られていることが多いです。そんな...
Go言語(Golang)のgRPCでバックエンドAPIを開発する方法まとめ
こんにちは。Tomoyuki(@tomoyuki65)です。Go言語(Golang)はマイクロサービスで利用されることが多かったりしますが、その際にgRPC(Google Remote Procedure Call)が使われていることがあり...

 

Go言語(Golang)のgRPCでDDD(ドメイン駆動設計)構成のバックエンドAPIを開発する方法まとめ

まずは以下のコマンド実行して各種ファイルを作成します。

$ mkdir go-grpc-domain && cd go-grpc-domain
$ mkdir -p docker/local/go && touch docker/local/go/Dockerfile
$ mkdir src && touch src/.env src/.env.testing src/main.go
$ touch compose.yml

※いつものようにDockerを使って環境構築するため、事前にDockerが使える環境を準備して下さい。

 

次に作成したファイルをそれぞれ以下のように記述します。

・「docker/local/go/Dockerfile」

FROM golang:1.24-alpine3.21

WORKDIR /go/src

COPY ./src .

# go.modがあれば依存関係をインストール
RUN if [ -f ./go.mod ]; then \
      go install; \
    fi

# Buf CLIをインストール
RUN go install github.com/bufbuild/buf/cmd/buf@latest

# 開発用のライブラリをインストール
RUN go install github.com/air-verse/air@latest
RUN go install honnef.co/go/tools/cmd/staticcheck@latest
RUN go install go.uber.org/mock/mockgen@latest

EXPOSE 8080

※今回のgRPCはBuf CLIを利用します。ホットリロードに「air」、静的コード解析に「staticcheck」、テスト用のモックファイル作成に「mockgen」をインストールしています。

 

・「src/.env」

ENV=local
GRPC_PORT=50051
TZ=Asia/Tokyo

 

・「src/.env.testing」

ENV=testing
GRPC_PORT=50051
TZ=Asia/Tokyo

 

・「src/main.go」

package main

import (
    "fmt"
)

func main() {
    fmt.Println("Hello World !!")
}

 

・「compose.yml」

services:
  grpc:
    container_name: go-grpc-domain
    build:
      context: .
      dockerfile: ./docker/local/go/Dockerfile
    command: air -c .air.toml
    volumes:
      - ./src:/go/src
    ports:
      # gRPC Server用のポート設定
      - "50051:50051"
    environment:
      - ENV
      - GRPC_PORT
      - TZ
    tty: true
    stdin_open: true

 

次に以下のコマンドを実行し、コンテナのビルドをします。

$ docker compose build --no-cache

 

次に以下のコマンドを実行し、goの初期化をします。

$ docker compose run --rm grpc go mod init go-grpc-domain
$ docker compose run --rm grpc air init

 

Bufの設定

次に以下のコマンドを実行し、Bufの初期化をします。

$ docker compose run --rm grpc buf config init

 

次に作成されたファイル「src/buf.yaml」を以下のように修正します。

# For details on buf.yaml configuration, visit https://buf.build/docs/configuration/v2/buf-yaml
version: v2
# .protoファイルのパスを指定
modules:
  - path: proto
# 依存関係のライブラリを指定
deps:
  - buf.build/envoyproxy/protoc-gen-validate:v1.2.1
lint:
  use:
    - STANDARD
breaking:
  use:
    - FILE

※バリデーション用に「protoc-gen-validate」を使う想定です。

 

次に以下のコマンドを実行し、ファイル「src/buf.gen.yaml」を作成します。

$ touch src/buf.gen.yaml

 

次に作成したファイルを以下のように記述します。

version: v2
plugins:
  # 1. --go_out=.
  - remote: buf.build/protocolbuffers/go:v1.36.6
    out: ./pb
    opt: paths=source_relative

  # 2. --go-grpc_out=.
  - remote: buf.build/grpc/go:v1.5.1
    out: ./pb
    opt: paths=source_relative

  # 3. --validate_out=lang=go:.
  - remote: buf.build/bufbuild/validate-go:v1.2.1
    out: ./pb
    opt: paths=source_relative

  # 4. --doc_out=.
  - remote: buf.build/community/pseudomuto-doc:v1.5.1
    out: ./doc
    opt: markdown,docs.md

 

スポンサーリンク

共通設定のファイルを追加

次に以下のコマンドを実行し、共通設定のファイルを作成します。

$ mkdir -p src/internal/application/usecase/logger && touch src/internal/application/usecase/logger/logger.go
$ mkdir -p src/internal/infrastructure/logger && touch src/internal/infrastructure/logger/logger_slog.go
$ mkdir -p src/internal/presentation/interceptor && touch src/internal/presentation/interceptor/interceptor.go

 

次に作成したファイルをそれぞれ以下のように記述します。

・「src/internal/application/usecase/logger/logger.go」

package logger

import (
    "context"
)

type Logger interface {
    Info(ctx context.Context, message string)
    Warn(ctx context.Context, message string)
    Error(ctx context.Context, message string)
}

 

・「src/internal/infrastructure/logger/logger_slog.go」

package logger

import (
    "context"
    "log/slog"
    "os"

    logger_usecase "go-grpc-domain/internal/application/usecase/logger"
    ic "go-grpc-domain/internal/presentation/interceptor"
)

// slogの設定
type SlogHandler struct {
    slog.Handler
}

func (h *SlogHandler) Handle(ctx context.Context, r slog.Record) error {
    requestId, ok := ctx.Value(ic.RequestId).(string)
    if ok {
        r.AddAttrs(slog.Attr{Key: "requestId", Value: slog.String("requestId", requestId).Value})
    }

    xRequestSource, ok := ctx.Value(ic.XRequestSource).(string)
    if ok {
        r.AddAttrs(slog.Attr{Key: "xRequestSource", Value: slog.String("xRequestSource", xRequestSource).Value})
    }

    uid, ok := ctx.Value(ic.UID).(string)
    if ok {
        r.AddAttrs(slog.Attr{Key: "UID", Value: slog.String("UID", uid).Value})
    }

    return h.Handler.Handle(ctx, r)
}

var slogHandler = &SlogHandler{
    slog.NewTextHandler(os.Stdout, nil),
}
var logger = slog.New(slogHandler)

// ロガーの設定
type slogLogger struct{}

func NewSlogLogger() logger_usecase.Logger {
    return &slogLogger{}
}

func (l *slogLogger) Info(ctx context.Context, message string) {
    logger.InfoContext(ctx, message)
}

func (l *slogLogger) Warn(ctx context.Context, message string) {
    logger.WarnContext(ctx, message)
}

func (l *slogLogger) Error(ctx context.Context, message string) {
    logger.ErrorContext(ctx, message)
}

 

・「src/internal/presentation/interceptor/interceptor.go」

package interceptor

import (
    "context"
    "fmt"
    "strings"

    "go-grpc-domain/internal/application/usecase/logger"

    "github.com/google/uuid"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/metadata"
    "google.golang.org/grpc/status"
)

type contextKey string

const (
    RequestId contextKey = "request-id"
    XRequestSource contextKey = "x-request-source"
    UID contextKey = "uid"
    Status contextKey = "status"
    StatusCode contextKey = "statusCode"
)

// Streamでコンテキストを共有させるためのラッパー構造体(対象メソッドのオーバーライド)
type wrappedServerStream struct {
    grpc.ServerStream
    ctx context.Context
}

func (w *wrappedServerStream) SendMsg(m interface{}) error {
    return w.ServerStream.SendMsg(m)
}
func (w *wrappedServerStream) Context() context.Context {
    return w.ctx
}

type Interceptor struct {
    logger logger.Logger
}

func NewInterceptor(logger logger.Logger) *Interceptor {
    return &Interceptor{
        logger: logger,
    }
}

// リクエスト用のUnaryインターセプター
func (i *Interceptor) RequestUnary(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    // ctxにx-request-idを設定
    requestId := uuid.New().String()
    ctx = context.WithValue(ctx, RequestId, requestId)

    // レスポンスのメタデータにx-request-idを追加
    headerMD := metadata.New(map[string]string{string(RequestId): requestId})
    if err := grpc.SetHeader(ctx, headerMD); err != nil {
        return nil, err
    }

    // メタデータを取得
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        return nil, fmt.Errorf("メタデータを取得できません。")
    }

    // リクエストのメタデータからx-request-sourceを取得
    requestSource, ok := md[string(XRequestSource)]
    if !ok {
        requestSource = []string{"-"}
    }

    // ctxにx-request-sourceを設定
    ctx = context.WithValue(ctx, XRequestSource, requestSource[0])

    // レスポンスのメタデータにx-request-sourceを追加
    headerMD2 := metadata.New(map[string]string{string(XRequestSource): requestSource[0]})
    if err := grpc.SetHeader(ctx, headerMD2); err != nil {
        return nil, err
    }

    // リクエスト開始のログ出力
    i.logger.Info(ctx, "start gRPC-Server request")

    // 処理を実行
    res, err := handler(ctx, req)

    // ステータスコードを取得
    st, ok := status.FromError(err)
    if !ok {
        return nil, fmt.Errorf("ステータスコードを取得できませんでした。")
    }

    // ctxにstatusとstatusCodeを設定
    ctx = context.WithValue(ctx, Status, st.Code().String())
    ctx = context.WithValue(ctx, StatusCode, fmt.Sprintf("%d", int(st.Code())))

    // トレーラーに情報追加
    if err != nil {
        tStatus := metadata.New(map[string]string{"status": fmt.Sprintf("ERROR ( %d %s )", int(st.Code()), st.Code().String())})
        if err := grpc.SetTrailer(ctx, tStatus); err != nil {
            return nil, err
        }
    } else {
        tStatus := metadata.New(map[string]string{"status": fmt.Sprintf("SUCCESS ( %d %s )", int(st.Code()), st.Code().String())})
        if err := grpc.SetTrailer(ctx, tStatus); err != nil {
            return nil, err
        }
    }

    // リクエスト終了のログ出力
    i.logger.Info(ctx, "finish gRPC-Server request")

    return res, err
}

// リクエスト用のStreamインターセプター
func (i *Interceptor) RequestStream(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    // ctxにx-request-idを設定
    requestId := uuid.New().String()
    ctx := ss.Context()
    ctx = context.WithValue(ctx, RequestId, requestId)

    // レスポンスのメタデータにx-request-idを追加
    headerMD := metadata.New(map[string]string{string(RequestId): requestId})
    if err := grpc.SetHeader(ctx, headerMD); err != nil {
        return err
    }

    // メタデータを取得
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        return fmt.Errorf("メタデータを取得できません。")
    }

    // リクエストのメタデータからx-request-sourceを取得
    requestSource, ok := md[string(XRequestSource)]
    if !ok {
        requestSource = []string{"-"}
    }

    // ctxにx-request-sourceを設定
    ctx = context.WithValue(ctx, XRequestSource, requestSource[0])

    // レスポンスのメタデータにx-request-sourceを追加
    headerMD2 := metadata.New(map[string]string{string(XRequestSource): requestSource[0]})
    if err := grpc.SetHeader(ctx, headerMD2); err != nil {
        return err
    }

    // リクエスト開始のログ出力
    i.logger.Info(ctx, "start gRPC-Server stream request")

    err := handler(srv, &wrappedServerStream{ss, ctx})

    // ステータスコードを取得
    st, ok := status.FromError(err)
    if !ok {
        return fmt.Errorf("ステータスコードを取得できませんでした。")
    }

    // ctxにstatusとstatusCodeを設定
    ctx = context.WithValue(ctx, Status, st.Code().String())
    ctx = context.WithValue(ctx, StatusCode, fmt.Sprintf("%d", int(st.Code())))

    // トレーラーに情報追加
    if err != nil {
        tStatus := metadata.New(map[string]string{"status": fmt.Sprintf("ERROR ( %d %s )", int(st.Code()), st.Code().String())})
        if err := grpc.SetTrailer(ctx, tStatus); err != nil {
            return err
        }
    } else {
        tStatus := metadata.New(map[string]string{"status": fmt.Sprintf("SUCCESS ( %d %s )", int(st.Code()), st.Code().String())})
        if err := grpc.SetTrailer(ctx, tStatus); err != nil {
            return err
        }
    }

    // リクエスト終了のログ出力
    i.logger.Info(ctx, "finish gRPC-Server stream request")

    return err
}

// 認証用のUnaryインターセプター
func (i *Interceptor) AuthUnary(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    // 対象外のメソッドを設定
    skipMethods := []string{
        // 対象外のメソッドがあれば追加
        // 例:pb.SampleService_Hello_FullMethodName,
    }

    // 対象外メソッドの場合はスキップ
    for _, method := range skipMethods {
        if info.FullMethod == method {
            return handler(ctx, req)
        }
    }

    // authorizationからトークンを取得
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        i.logger.Error(ctx, "メタデータを取得できません。")
        return nil, fmt.Errorf("メタデータを取得できません。")
    }
    authHeader, ok := md["authorization"]
    if !ok {
        i.logger.Warn(ctx, "認証用トークンが設定されていません。")
        return nil, status.Errorf(codes.InvalidArgument, "%s", "認証用トークンが設定されていません。")
    }
    token := strings.TrimPrefix(authHeader[0], "Bearer ")
    if token == "" {
        i.logger.Warn(ctx, "認証用トークンが設定されていません。")
        return nil, status.Errorf(codes.InvalidArgument, "%s", "認証用トークンが設定されていません。")
    }

    // TODO: 認証チェック処理を追加

    // 処理を実行
    return handler(ctx, req)
}

// 認証用のStreamインターセプター
func (i *Interceptor) AuthStream(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    // 対象外のメソッドを設定
    skipMethods := []string{
        // 対象外のメソッドがあれば追加
        // 例:pb.SampleService_HelloServerStream_FullMethodName,
    }

    // 対象外メソッドの場合はスキップ
    for _, method := range skipMethods {
        if info.FullMethod == method {
            return handler(srv, ss)
        }
    }

    // authorizationからトークンを取得
    ctx := ss.Context()
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        i.logger.Error(ctx, "メタデータを取得できません。")
        return fmt.Errorf("メタデータを取得できません。")
    }
    authHeader, ok := md["authorization"]
    if !ok {
        i.logger.Warn(ctx, "認証用トークンが設定されていません。")
        return status.Errorf(codes.InvalidArgument, "%s", "認証用トークンが設定されていません。")
    }
    token := strings.TrimPrefix(authHeader[0], "Bearer ")
    if token == "" {
        i.logger.Warn(ctx, "認証用トークンが設定されていません。")
        return status.Errorf(codes.InvalidArgument, "%s", "認証用トークンが設定されていません。")
    }

    // TODO: 認証チェック処理を追加

    return handler(srv, &wrappedServerStream{ss, ctx})
}

※interceptorはgRPCのミドルウェアを指します。Unary用(1リクエスト-1レスポンス)とストリーミング機能用でそれぞれ定義が必要になります。

 

次に以下のコマンドを実行し、ロガーのインターフェース定義からテストコード用のモックファイルを作成します。

$ docker compose run --rm grpc mockgen -source=./internal/application/usecase/logger/logger.go -destination=./internal/application/usecase/logger/mock_logger/mock_logger.go

 

次に以下のコマンドを実行し、go.modの修正およびコンテナの再ビルドを行います。

$ docker compose run --rm grpc go mod tidy
$ docker compose build --no-cache

 

スポンサーリンク

DDD(ドメイン駆動設計)のディレクトリ構成について

この後にDDD(ドメイン駆動設計)とBufでgRPCのAPIを作成していきますが、ディレクトリ構成としてはDDDの思想に基づいたレイヤードアーキテクチャを採用しています。

/src
├── /internal
|   ├── /application(アプリケーション層)
|   |   └── usecase(ユースケース層)
|   |
|   ├── /domain(ドメイン層)
|   |   ├── model(ドメインモデルの定義。ビジネスロジックは可能な限りドメインに集約させる。)
|   |   ├── (仮)repository(リポジトリのインターフェース定義)
|   |   └── (仮)service(外部サービスのインターフェース定義)
|   |
|   ├── /infrastructure(インフラストラクチャ層)
|   |   ├── (仮)database(データベース設定)
|   |   ├── logger(ロガーの実装。インターフェース部分はユースケース層で定義。)
|   |   ├── (仮)persistence(リポジトリの実装。DB操作による永続化層。)
|   |   ├── (仮)cache(キャッシュを含めたリポジトリの実装。インターフェースはリポジトリと同一。)
|   |   └── (仮)externalapi(外部サービスの実装)
|   |
|   ├── /presentation(プレゼンテーション層)
|   |   ├── server(サーバー層)
|   |   ├── interceptor(インターセプターの定義)
|   |   └── router(ルーター設定。レジストリのコントローラーを利用して設定する。)
|   |
|   └── /registry(レジストリ層。依存注入によるサーバーのインスタンスをコントローラーにまとめる。)
|
├── /pb(protoファイルから生成したProtocol Buffersのコード)
|
└── /proto(スキーマ定義)

※(仮)のものは将来的に追加する想定の例です。

 

Chatドメインを例にAPIを作る

次に以下の手順でChatドメインを例に、gRPCの双方向ストリーミングを利用したAPIを作成します。

ドメインの定義

まずは以下のコマンドを実行し、各種ファイルを作成します。

$ mkdir -p src/internal/domain/chat
$ touch src/internal/domain/chat/chat_model.go src/internal/domain/chat/chat_model_test.go

 

次に作成したファイルをそれぞれ以下のように記述します。

・「src/internal/domain/chat/chat_model.go」

package chat

import (
    "fmt"
    "strings"
    "time"
)

type Chat struct {
    InputText string `json:"input_text"`
}

func NewChat(inputText string) *Chat {
    return &Chat{
        InputText: inputText,
    }
}

func (c *Chat) TextToUpper() string {
    return strings.ToUpper(c.InputText)
}

func (c *Chat) TextToLower() string {
    return strings.ToLower(c.InputText)
}

func (c *Chat) TextAddTimeNow() string {
    return fmt.Sprintf("[%s] %s", time.Now().Format("2006-01-02 15:04:05"), c.InputText)
}

※Chatモデルの定義とそのモデルに付与する機能をメソッドとして定義しています。

 

・「src/internal/domain/chat/chat_model_test.go」

package chat

import (
    "fmt"
    "strings"
    "testing"
    "time"

    "github.com/stretchr/testify/assert"
)

func TestNewChat(t *testing.T) {
    t.Run("ChatModel作成", func(t *testing.T) {
        text := "Hello"

        // ChatModel作成
        chat := NewChat(text)

        // 検証
        assert.Equal(t, text, chat.InputText)
    })
}

func TestChat_TextToUpper(t *testing.T) {
    t.Run("大文字に変換", func(t *testing.T) {
        text := "Hello"

        // ChatModel作成
        chat := NewChat(text)

        // 検証
        assert.Equal(t, strings.ToUpper(text), chat.TextToUpper())
    })
}

func TestChat_TextToLower(t *testing.T) {
    t.Run("小文字に変換", func(t *testing.T) {
        text := "Hello"

        // ChatModel作成
        chat := NewChat(text)

        // 検証
        assert.Equal(t, strings.ToLower(text), chat.TextToLower())
    })
}

func TestChat_TextAddTimeNow(t *testing.T) {
    t.Run("現在の時間を追加", func(t *testing.T) {
        text := "Hello"

        // ChatModel作成
        chat := NewChat(text)

        // 検証
        assert.Equal(t, fmt.Sprintf("[%s] %s", time.Now().Format("2006-01-02 15:04:05"), text), chat.TextAddTimeNow())
    })
}

 

次に以下のコマンドを実行し、フォーマット修正および静的コード解析を行い、警告が出ないことを確認します。

$ docker compose run --rm grpc go mod tidy
$ docker compose run --rm grpc go fmt ./internal/...
$ docker compose run --rm grpc staticcheck ./internal/...

 

次に以下のコマンドでテストコードを実行します。

$ docker compose run --rm grpc go test -v $(docker compose run --rm grpc go list -f '{{if or .TestGoFiles .XTestGoFiles}}{{.ImportPath}}{{end}}' ./...)

 

テスト実行後、下図のように全てのテストがPASSすればOKです。

 

リポジトリやサービスの実装

今回は省略しているのでありませんが、DB操作や外部サービスを利用する場合、infrastructure層に実装します。

 

スキーマの定義

次にAPIのスキーマを定義するため、以下のコマンドを実行してファイルを作成します。

$ mkdir -p src/proto/chat && touch src/proto/chat/chat.proto

 

次に作成したファイルを以下のように記述します。

・「src/proto/chat/chat.proto」

syntax = "proto3";

import "validate/validate.proto";

package chat;

option go_package="pb/chat";

message TextInput {
  string text = 1 [(validate.rules).string.min_len = 1];
}

message TextOutput {
  string text = 1;
}

// サンプルサービス
service ChatService {
  // 双方向ストリーミング(複数のリクエスト-複数のレスポンス)
  rpc Bidirectional(stream TextInput) returns (stream TextOutput) {}
    // Returns:
    // - 0 OK: TextOutputを出力
    // - 2 Unknown: 不明なエラー
    // - 3 INVALID_ARGUMENT: バリデーションエラー)
}

※APIの種類とそのInputとOutputについてスキーマ定義します。今回はgRPCの双方向ストリーミング機能を例に作ります。

 

次に以下のコマンドを実行し、protoファイルからProtocol Buffersのコードを生成します。

$ docker compose run --rm grpc buf dep update
$ docker compose run --rm grpc buf generate

※コマンド「buf dep update」実行後、依存関係を管理するファイル「buf.lock」が作成されます。

 

コマンド実行後、下図のようにProtocol Buffersのコードが生成されればOKです。

 

ユースケースの定義

次にユースケースを定義するため、以下のコマンドを実行して各種ファイルを作成します。

$ mkdir -p src/internal/application/usecase/chat && touch src/internal/application/usecase/chat/chat.go
$ touch src/internal/application/usecase/chat/chat_bidirectional.go src/internal/application/usecase/chat/chat_bidirectional_test.go

 

・「src/internal/application/usecase/chat/chat.go」

package chat

import (
    "go-grpc-domain/internal/application/usecase/logger"
    pb "go-grpc-domain/pb/chat"

    "google.golang.org/grpc"
)

type ChatUsecase interface {
    Bidirectional(stream grpc.BidiStreamingServer[pb.TextInput, pb.TextOutput]) error
}

type chatUsecase struct {
    logger logger.Logger
}

func NewChatUsecase(logger logger.Logger) ChatUsecase {
    return &chatUsecase{
        logger: logger,
    }
}

※このファイルでユースケースの構造体やインターフェース定義をまとめ、インターフェースに定義した各メソッドはファイルを分割して記述します。

 

・「src/internal/application/usecase/chat/chat_bidirectional.go」

package chat

import (
    "errors"
    "fmt"
    "io"

    chatModel "go-grpc-domain/internal/domain/chat"
    pb "go-grpc-domain/pb/chat"

    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

func (u *chatUsecase) Bidirectional(stream grpc.BidiStreamingServer[pb.TextInput, pb.TextOutput]) error {
    // コンテキストを取得
    ctx := stream.Context()

    for {
        req, err := stream.Recv()
        if errors.Is(err, io.EOF) {
            return nil
        }
        if err != nil {
            return err
        }

        // バリデーションチェック
        if err := req.Validate(); err != nil {
            msg := fmt.Sprintf("バリデーションエラー:%s", err.Error())
            u.logger.Warn(ctx, msg)

            return status.Errorf(codes.InvalidArgument, "%s", err.Error())
        }

        // Chatドメインを利用してメッセージを設定
        chat := chatModel.NewChat(req.GetText())
        msg := fmt.Sprintf("ToUppder: %s, ToLower: %s, AddTimeNow: %s", chat.TextToUpper(), chat.TextToLower(), chat.TextAddTimeNow())

        if err := stream.Send(&pb.TextOutput{Text: msg}); err != nil {
            return err
        }
    }
}

 

・「src/internal/application/usecase/chat/chat_bidirectional_test.go」

package chat

import (
    "context"
    "fmt"
    "io"
    "testing"

    mockLogger "go-grpc-domain/internal/application/usecase/logger/mock_logger"
    pb "go-grpc-domain/pb/chat"

    "github.com/stretchr/testify/assert"
    "go.uber.org/mock/gomock"
    "google.golang.org/grpc"
)

// Bidirectionalのstream用のモック構造体
type mockBidirectionalStream struct {
    grpc.BidiStreamingServer[pb.TextInput, pb.TextOutput]
    ctx context.Context
    recv []*pb.TextInput
    sent []*pb.TextOutput
    recvIndex int
    sendError error
    recvError error
}

func (m *mockBidirectionalStream) Recv() (*pb.TextInput, error) {
    if m.recvError != nil {
        return nil, m.recvError
    }
    if m.recvIndex >= len(m.recv) {
        return nil, io.EOF
    }
    req := m.recv[m.recvIndex]
    m.recvIndex++
    return req, nil
}

func (m *mockBidirectionalStream) Send(resp *pb.TextOutput) error {
    if m.sendError != nil {
        return m.sendError
    }
    m.sent = append(m.sent, resp)
    return nil
}

func (m *mockBidirectionalStream) Context() context.Context {
    return m.ctx
}

func TestChatUsecase_Bidirectional(t *testing.T) {
    // モック
    ctrl := gomock.NewController(t)
    defer ctrl.Finish()

    // ロガーのモック
    mockLogger := mockLogger.NewMockLogger(ctrl)

    t.Run("正常終了", func(t *testing.T) {
        // ユースケースのインスタンス化
        chatUsecase := NewChatUsecase(mockLogger)

        // パラメータの設定
        mockBidirectionalStream := &mockBidirectionalStream{
            ctx: context.Background(),
            recv: []*pb.TextInput{{Text: "Hello"}},
            recvError: nil,
            sendError: nil,
        }

        // テストの実行
        err := chatUsecase.Bidirectional(mockBidirectionalStream)

        // 検証
        assert.Equal(t, nil, err)
    })

    t.Run("受信エラー", func(t *testing.T) {
        // ユースケースのインスタンス化
        chatUsecase := NewChatUsecase(mockLogger)

        // パラメータの設定
        mockBidirectionalStream := &mockBidirectionalStream{
            ctx: context.Background(),
            recv: []*pb.TextInput{{Text: "Hello"}},
            recvError: fmt.Errorf("error"),
            sendError: nil,
        }

        // テストの実行
        err := chatUsecase.Bidirectional(mockBidirectionalStream)

        // 検証
        assert.NotEqual(t, nil, err)
    })

    t.Run("バリデーションエラー", func(t *testing.T) {
        // モック化
        mockLogger.EXPECT().Warn(gomock.Any(), gomock.Any()).Return()

        // ユースケースのインスタンス化
        chatUsecase := NewChatUsecase(mockLogger)

        // パラメータの設定
        mockBidirectionalStream := &mockBidirectionalStream{
            ctx: context.Background(),
            recv: []*pb.TextInput{{Text: ""}},
            recvError: nil,
            sendError: nil,
        }

        // テストの実行
        err := chatUsecase.Bidirectional(mockBidirectionalStream)

        // 検証
        assert.NotEqual(t, nil, err)
    })

    t.Run("送信エラー", func(t *testing.T) {
        // ユースケースのインスタンス化
        chatUsecase := NewChatUsecase(mockLogger)

        // パラメータの設定
        mockBidirectionalStream := &mockBidirectionalStream{
            ctx: context.Background(),
            recv: []*pb.TextInput{{Text: "Hello"}},
            recvError: nil,
            sendError: fmt.Errorf("error"),
        }

        // テストの実行
        err := chatUsecase.Bidirectional(mockBidirectionalStream)

        // 検証
        assert.NotEqual(t, nil, err)
    })
}

 

次に以下のコマンドを実行し、フォーマット修正および静的コード解析を行い、警告が出ないことを確認します。

$ docker compose run --rm grpc go mod tidy
$ docker compose run --rm grpc go fmt ./internal/...
$ docker compose run --rm grpc staticcheck ./internal/...

 

次に以下のコマンドでテストコードを実行します。

$ docker compose run --rm grpc go test -v $(docker compose run --rm grpc go list -f '{{if or .TestGoFiles .XTestGoFiles}}{{.ImportPath}}{{end}}' ./...)

 

テスト実行後、下図のように全てのテストがPASSすればOKです。

 

次に以下のコマンドを実行し、後述のテストコード用のモックファイルを作成します。

$ docker compose run --rm grpc mockgen -source=./internal/application/usecase/chat/chat.go -destination=./internal/application/usecase/chat/mock_chat/mock_chat.go

 

サーバーの定義

次にサーバーを定義するため、以下のコマンドを実行して各種ファイルを作成します。

$ mkdir -p src/internal/presentation/server/grpc/chat
$ touch src/internal/presentation/server/grpc/chat/chat.go src/internal/presentation/server/grpc/chat/chat_test.go

 

・「src/internal/presentation/server/grpc/chat/chat.go」

package chat

import (
    usecase "go-grpc-domain/internal/application/usecase/chat"
    pb "go-grpc-domain/pb/chat"

    "google.golang.org/grpc"
)

type ChatServer struct {
    pb.UnimplementedChatServiceServer
    chatUsecase usecase.ChatUsecase
}

func NewChatServer(
    chatUsecase usecase.ChatUsecase,
) *ChatServer {
    return &ChatServer{
        chatUsecase: chatUsecase,
    }
}

func (s *ChatServer) Bidirectional(stream grpc.BidiStreamingServer[pb.TextInput, pb.TextOutput]) error {
    return s.chatUsecase.Bidirectional(stream)
}

 

・「src/internal/presentation/server/grpc/chat/chat_test.go」

package chat

import (
    "context"
    "fmt"
    "io"
    "os"
    "testing"

    mockChat "go-grpc-domain/internal/application/usecase/chat/mock_chat"
    chatModel "go-grpc-domain/internal/domain/chat"
    pb "go-grpc-domain/pb/chat"

    "github.com/joho/godotenv"
    "github.com/stretchr/testify/assert"
    "go.uber.org/mock/gomock"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "google.golang.org/grpc/metadata"
)

// Bidirectionalのstream用のモック構造体
type mockBidirectionalStream struct {
    grpc.BidiStreamingServer[pb.TextInput, pb.TextOutput]
    ctx context.Context
    recv []*pb.TextInput
    sent []*pb.TextOutput
    recvIndex int
    sendError error
    recvError error
}

func (m *mockBidirectionalStream) Recv() (*pb.TextInput, error) {
    if m.recvError != nil {
        return nil, m.recvError
    }
    if m.recvIndex >= len(m.recv) {
        return nil, io.EOF
    }
    req := m.recv[m.recvIndex]
    m.recvIndex++
    return req, nil
}

func (m *mockBidirectionalStream) Send(resp *pb.TextOutput) error {
    if m.sendError != nil {
        return m.sendError
    }
    m.sent = append(m.sent, resp)
    return nil
}

func (m *mockBidirectionalStream) Context() context.Context {
    return m.ctx
}

// 初期処理
func init() {
    // テスト用の環境変数ファイル「.env.testing」を読み込んで使用する。
    if err := godotenv.Load("../../../../../.env.testing"); err != nil {
        fmt.Println(".env.testingの読み込みに失敗しました。")
    }
}

func TestChatServer_Bidirectional(t *testing.T) {
    // モック
    ctrl := gomock.NewController(t)
    defer ctrl.Finish()
    mockChatUsecase := mockChat.NewMockChatUsecase(ctrl)

    // gRPCクライアントの設定
    grpcPort := os.Getenv("GRPC_PORT")
    if grpcPort == "" {
        grpcPort = "50051"
    }
    target := fmt.Sprintf("dns:///localhost:%s", grpcPort)
    conn, err := grpc.NewClient(target, grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        t.Fatalf("Failed to connect: %v", err)
    }
    defer conn.Close()
    client := pb.NewChatServiceClient(conn)

    t.Run("ChatServerが正常終了すること", func(t *testing.T) {
        // モック
        mockChatUsecase.EXPECT().Bidirectional(gomock.Any()).Return(nil)

        // ChatServerのインスタンス化
        chatServer := NewChatServer(mockChatUsecase)

        // パラメータの設定
        mockBidirectionalStream := &mockBidirectionalStream{
            ctx: context.Background(),
            recv: []*pb.TextInput{{Text: "Hello"}},
            recvError: nil,
            sendError: nil,
        }

        // テストの実行
        err := chatServer.Bidirectional(mockBidirectionalStream)

        // 検証
        assert.Equal(t, nil, err)
    })

    t.Run("レスポンス結果が想定通りであること", func(t *testing.T) {
        // メタデータにauthorizationを追加
        ctx := context.Background()
        md := metadata.New(map[string]string{"authorization": "Bearer token"})
        ctx = metadata.NewOutgoingContext(ctx, md)

        // テストの実行
        stream, err := client.Bidirectional(ctx)
        if err != nil {
            t.Fatalf("Failed to call client.Bidirectional: %v", err)
        }

        var sendEnd, recvEnd bool
        inputText := "Hello"
        for !(sendEnd && recvEnd) {
            // 送信処理
            if !sendEnd {
                if err := stream.Send(&pb.TextInput{Text: inputText}); err != nil {
                    t.Fatalf("Failed to stream.Send: %v", err)
                }
                if err := stream.CloseSend(); err != nil {
                    t.Fatalf("Failed to stream.CloseSend: %v", err)
                }
                sendEnd = true
            }

            // 受信処理
            if !recvEnd {
                res, err := stream.Recv()
                if err != nil {
                    t.Fatalf("Failed to stream.Recv: %v", err)
                }

                // 検証
                chat := chatModel.NewChat(inputText)
                msg := fmt.Sprintf("ToUppder: %s, ToLower: %s, AddTimeNow: %s", chat.TextToUpper(), chat.TextToLower(), chat.TextAddTimeNow())
                assert.Equal(t, msg, res.Text)

                recvEnd = true
            }
        }
    })
}

 

レジストリ登録

次にルーター設定で使うためのレジストリ登録をするため、以下のコマンドを実行してファイルを作成します。

$ mkdir -p src/internal/registry && touch src/internal/registry/registry_grpc.go

 

次に作成したファイルを以下のように記述します。

・「src/internal/registry/registry_grpc.go」

package registry

import (
    usecaseChat "go-grpc-domain/internal/application/usecase/chat"
    "go-grpc-domain/internal/infrastructure/logger"
    serverGrpcChat "go-grpc-domain/internal/presentation/server/grpc/chat"
)

// ハンドラーをまとめるコントローラー構造体
type GrpcController struct {
    Chat *serverGrpcChat.ChatServer
}

func NewGrpcController() *GrpcController {
    // ロガー設定
    logger := logger.NewSlogLogger()

    // chatドメインのハンドラー設定
    chatUsecase := usecaseChat.NewChatUsecase(logger)
    chatServer := serverGrpcChat.NewChatServer(chatUsecase)

    return &GrpcController{
        Chat: chatServer,
    }
}

※ルーター設定時にインスタンス化したサーバー設定が必要になるため、このファイルでまとめるようにしています。

 

ルーター設定

次にルーター設定をするため、以下のコマンドを実行してファイルを作成します。

$ mkdir -p src/internal/presentation/router && touch src/internal/presentation/router/router.go

 

・「src/internal/presentation/router/router.go」

package router

import (
    ic "go-grpc-domain/internal/presentation/interceptor"
    "go-grpc-domain/internal/registry"
    pbChat "go-grpc-domain/pb/chat"

    "google.golang.org/grpc"
    "google.golang.org/grpc/reflection"
)

func SetupGrpcServer(c *registry.GrpcController, i *ic.Interceptor) *grpc.Server {
    // gRPCサーバーの作成
    s := grpc.NewServer(
        // インターセプターの適用
        grpc.ChainUnaryInterceptor(
            i.RequestUnary,
            i.AuthUnary,
        ),
        grpc.ChainStreamInterceptor(
            i.RequestStream,
            i.AuthStream,
        ),
    )

    // サービス設定
    pbChat.RegisterChatServiceServer(s, c.Chat)

    // リフレクション設定
    reflection.Register(s)

    return s
}

 

main.goの修正

次にファイル「src/main.go」を以下のように修正します。

・「src/main.go」

package main

import (
    "fmt"
    "log/slog"
    "net"
    "os"

    "go-grpc-domain/internal/infrastructure/logger"
    ic "go-grpc-domain/internal/presentation/interceptor"
    "go-grpc-domain/internal/presentation/router"
    "go-grpc-domain/internal/registry"

    "github.com/joho/godotenv"
)

func main() {
    // .env ファイルの読み込み
    err := godotenv.Load()
    if err != nil {
        slog.Error(fmt.Sprintf(".envファイルの読み込みに失敗しました。: %v", err))
    }

    // ENVの設定
    env := os.Getenv("ENV")
    if env == "" {
        env = "local"
    }

    // gRPC用のポート番号の設定
    grpcPort := os.Getenv("GRPC_PORT")
    if grpcPort == "" {
        grpcPort = "50051"
    }

    // Listenerの設定
    listener, err := net.Listen("tcp", fmt.Sprintf(":%s", grpcPort))
    if err != nil {
        slog.Error(fmt.Sprintf("Listenerの設定に失敗しました。: %v", err))
    }

    // サーバー設定
    c := registry.NewGrpcController()
    l := logger.NewSlogLogger()
    i := ic.NewInterceptor(l)
    s := router.SetupGrpcServer(c, i)

    // gRPCサーバーの起動(非同期)
    slog.Info(fmt.Sprintf("[ENV=%s] start gRPC-Server port: %s", env, grpcPort))
    if err := s.Serve(listener); err != nil {
        slog.Error(fmt.Sprintf("gRPC-Server の起動に失敗しました。: %v", err))
    }
}

 

次に以下のコマンドを実行し、フォーマット修正および静的コード解析を行い、警告が出ないことを確認します。

$ docker compose run --rm grpc go mod tidy
$ docker compose run --rm grpc go fmt ./internal/...
$ docker compose run --rm grpc staticcheck ./internal/...

 

コンテナの再ビルドと起動

次に以下のコマンドを実行し、コンテナを再びルドします。

$ docker compose down
$ docker compose build --no-cache

 

次に以下のコマンドを実行し、コンテナを起動します。

$ docker compose up -d

 

次に以下のコマンドを実行し、ログ出力を確認します。

$ docker compose logs

 

ログ出力を確認し、エラーがなければOKです。

 

スポンサーリンク

ChatドメインのAPIを試す

次に上記で作成したChatドメインのAPIをPostmanを使って試します。

まずはgRPC用のリクエスト画面を表示し、URLに「localhost:50051」を入力後、タブ「認可」からBearerトークンを設定して下さい。

※インターセプターで認証チェックを入れているため

 

次にタブ「サービス定義」などからサーバーリフレクションを使って利用できるメソッドを読み込みます。

※「src/internal/presentation/router/router.go」の「reflection.Register(s)」部分の処理でサーバーリフレクションの機能が使えます。コードにエラーがあったりするとメソッドの読み込みに失敗するので、その場合はコードの修正をして下さい。

 

次にメソッド「Bidirectional」を選択し、「呼び出す」をクリックします。

 

次にリクエストボディを設定し、画面右下の「送信」をクリックしてリクエストを送信します。

リクエスト送信後、想定通りのレスポンス結果が返ってこればOKです。

※接続を終了する場合は「ストリーミングを終了」をクリックして下さい。

 

テストコードのカバレッジを確認

次に上記で追加したサーバーのテストも試してみますが、Goならオプション「-cover」を使ってカバレッジ率(テストがどれくらい網羅されているか)も確認することができます。

$ docker compose exec grpc go test -v -cover $(docker compose exec grpc go list -f '{{if or .TestGoFiles .XTestGoFiles}}{{.ImportPath}}{{end}}' ./...)

 

また、以下のコマンドを実行し、カバレッジの対象箇所をファイル出力し、それをHTMLファイルに変換してブラウザで確認することも可能です。

$ docker compose exec grpc go test -v -coverprofile=internal/coverage.out $(docker compose exec grpc go list -f '{{if or .TestGoFiles .XTestGoFiles}}{{.ImportPath}}{{end}}' ./...)
$ docker compose exec grpc go tool cover -html=internal/coverage.out -o=internal/coverage.html

 

コマンド実行後、以下のようにファイルが作成されます。

 

ファイル「src/internal/coverage.html」をブラウザで確認すると下図のように表示されます。

左上のリストから対象のコードを選択可能で、カバー済みの箇所は緑色の文字で表示され、もし未対応の箇所があったら赤色の文字で表示されます。

※上図はVSCodeの拡張機能「HTML Preview」を使ってプレビューしています。

 

データベース関連について

今回はデータベースに関する部分は省略しています。必要な場合は以下の記事を参考にしてみて下さい。

Go言語(Golang)のEchoでシンプルかつ実務的なバックエンドAPI開発方法まとめ
こんにちは。Tomoyuki(@tomoyuki65)です。以前にGo言語のEchoでバックエンドAPIを開発する方法についての記事を書きましたが、あれから私自身もさらに成長し、もっとシンプルかつ実務的にAPIを開発する方法をまとめたいと思...
Go言語(Golang)のGinでDDD(ドメイン駆動設計)構成のバックエンドAPIを開発する方法まとめ
こんにちは。Tomoyuki(@tomoyuki65)です。これまではクリーンアーキテクチャを参考にしてGo言語のAPIの作り方を解説してきましたが、実務においてはDDD(ドメイン駆動設計)と呼ばれる方法で作られていることが多いです。そんな...

 

スポンサーリンク

最後に

今回はGo言語のgRPCでDDD構成のバックエンドAPIを開発する方法について解説しました。

実務ではドメイン駆動設計での開発が求められることが多いと思うので、Go言語(Golang)のgRPCでDDD(ドメイン駆動設計)が必要になる場合は、ぜひ参考にしてみて下さい!

 

この記事を書いた人
Tomoyuki

SE→ブロガーを経て、現在はWeb系エンジニアをしています!

Tomoyukiをフォローする
応用
スポンサーリンク
Tomoyukiをフォローする

コメント

タイトルとURLをコピーしました