この記事は さくらインターネット Advent Calendar 2025 23日目の記事です。

22日目はkiyokiyoさんの Containerlabを使ってネットワーク機器のCIを考えてみたSONiCのsystemdを読みながら好きなツールを動かしてみる でした。
2つともネットワークエンジニアにとって非常に興味深い内容でしたね。自分には全然わかりませんが、ネットワーク領域にもCI/CD、可観測、IaCの波が来ているのだなと感じました。


さくらのクラウド シンプルMQは、マネージドなメッセージキューサービスです。この記事では、シンプルMQを使ったProducer-Consumerパターンの実装例を紹介します。

特に以下の点に焦点を当てます:

サンプルアプリケーションの全体像

sample application overview

【入門編】基本的な使い方

まずはシンプルMQの基本的な使い方を体験してみましょう。

リポジトリのクローン

git clone https://github.com/thara/sakura-simplemq-sample.git
cd sakura-simplemq-sample

TerraformでシンプルMQを構築

Infrastructure as Codeの実践として、TerraformでシンプルMQのキューを作成します。

まず、さくらのクラウドのAPIキーのアクセストークンとアクセストークンシークレットを環境変数に設定します:

export SAKURACLOUD_ACCESS_TOKEN="your-access-token"
export SAKURACLOUD_ACCESS_TOKEN_SECRET="your-access-token-secret"

次にTerraformでキューを作成:

cd terraform
terraform init
terraform apply
cd ..

terraform/main.tfでは、以下のようにシンプルMQリソースを定義しています:

resource "sakura_simple_mq" "playground" {
  name = "playground-mq"
  description = "This is a playground message queue."
  tags = ["playground"]

  visibility_timeout_seconds = 10  # メッセージの可視性タイムアウト(秒)
  expire_seconds             = 100 # メッセージの有効期限(秒)
}

作成後、さくらのクラウドのコントロールパネルからシンプルMQのキューのAPIキーをローテートし、環境変数に設定します:

export SIMPLEMQ_API_KEY="your-simplemq-api-key"

2025年12月現在terraform apply時にAPIキーを設定 or 取得する方法は提供されていないため、キューを作成後に一度APIキーをローテートする必要がある ことに注意してください。

動かしてみる

それでは、3つのコンポーネントを実際に動かしてみましょう。

Consumer Workerを起動(別ターミナル):

go run cmd/consumer-worker/main.go -queue=playground-mq

このWorkerは1秒ごとにシンプルMQをポーリングし、メッセージを処理します。

Producer CLIでメッセージを送信:

go run cmd/producer/main.go -queue=playground-mq -message="Hello from CLI!"

Consumer側のターミナルに以下のようなログが表示されるはずです:

yyyy/MM/dd HH:mm:ss INFO messages received count=1
yyyy/MM/dd HH:mm:ss INFO message received id=<message-id>
yyyy/MM/dd HH:mm:ss INFO message deleted id=<message-id>
yyyy/MM/dd HH:mm:ss INFO notification received message="Hello from CLI!"

Producer API Serverでの送信(別ターミナル):

# サーバーを起動
go run cmd/producer-api-server/main.go -addr=:8080 -queue=playground-mq

# 別ターミナルからHTTP POSTで送信
curl -X POST http://localhost:8080 -d "Hello from API Server!"

同様にConsumer側でメッセージが受信されます。これで非同期メッセージングの基本的な流れを体験できました。

【実践編】本番運用に向けた工夫

ここからは、本番運用を想定した実装の工夫について解説します。基本的な使い方を理解した方向けの内容です。

シンプルMQの特性と設計への影響

シンプルMQには以下の特性があり、これらを理解した上で設計する必要があります:

Pull型 - Consumer側がポーリングする

Consumer側が能動的にメッセージを取得します。今回のサンプルでは1秒ごとのポーリングを実装しています。

順序は保証されない

受信時刻の古いメッセージから配信されますが、可視性タイムアウト延長などにより順序が入れ替わる場合があります。順序が重要な処理では、アプリケーション側で制御してください。

重複配信を前提に冪等性を確保

各メッセージは少なくとも1回の配信が保証されますが、2回以上配信される可能性があります。Consumer側で冪等性を確保する必要があります。

冪等性の実装例:

可視性タイムアウトは再配信の原因になる

メッセージを受信してから他のConsumerに見えなくなる時間(可視性タイムアウト)があります。処理時間より短いと、処理中のメッセージが再配信されます。Terraformでの visibility_timeout_seconds 設定時は、想定される処理時間より長めに設定してください。

メッセージの効率的な取り扱い

このサンプルアプリケーションでは、メッセージの効率性と信頼性を高めるために、Protocol BuffersDeflate圧縮を組み合わせています。

なぜProtocol Buffersなのか

Protocol Buffers(protobuf)は、Googleが開発したシリアライズフォーマットです。JSONと比較して以下の利点があります:

  1. スキーマ定義: .protoファイルでメッセージ構造を明確に定義
  2. 型安全性: コンパイル時に型チェックが行われる
  3. バイナリフォーマット: JSONよりコンパクトで高速

今回のサンプルでは、シンプルなNotificationメッセージを定義しています:

edition = "2023";

option go_package = "github.com/thara/sakura-simplemq-sample/samplepb";

message Notification {
  string message = 1;
}

このシンプルな例でも、将来的にフィールドを追加する際に後方互換性を保ちやすいという利点があります。

なぜここまでやるのか

この設計は「キューを長期利用する」前提です。メッセージは将来、他言語・他サービスからも利用される可能性があるため、以下の点を重視しています:

適用の判断基準

Deflate圧縮 + Base64エンコーディング

さらに、メッセージサイズを削減するためにDeflate圧縮を適用し、Base64エンコーディングでテキスト化してからシンプルMQに送信しています。Go標準ライブラリのcompress/flateだけで完結させるためDeflateを採用していますが、gzipでも同様に実装できます。

エンコード処理の流れ(internal/encoding.go):

Notification (protobuf)
  → Marshal (バイナリ化)
  → Deflate圧縮
  → Base64エンコード
  → シンプルMQに送信するメッセージ(文字列)

実装を見てみましょう(internal/encoding.go):

func encodeProtoMessage(msg proto.Message) (string, error) {
    // 1. Protocol Buffersでバイナリ化
    b, err := proto.Marshal(msg)
    if err != nil {
        return "", fmt.Errorf("failed to marshal proto message: %w", err)
    }

    // 2. Deflate圧縮
    compressed, err := compress(b)
    if err != nil {
        return "", fmt.Errorf("failed to compress proto message: %w", err)
    }

    // 3. Base64エンコード
    return base64.StdEncoding.EncodeToString(compressed), nil
}

デコード側は逆の順序で処理を行います:

func decodeProtoMessage(src string, msg proto.Message) error {
    // 1. Base64デコード
    data, err := base64.StdEncoding.DecodeString(src)
    if err != nil {
        return fmt.Errorf("failed to decode base64 string: %w", err)
    }

    // 2. Deflate解凍
    decompressed, err := decompress(data)
    if err != nil {
        return fmt.Errorf("failed to decompress proto message: %w", err)
    }

    // 3. Protocol Buffersでデシリアライズ
    if err := proto.Unmarshal(decompressed, msg); err != nil {
        return fmt.Errorf("failed to unmarshal proto message: %w", err)
    }
    return nil
}

圧縮の効果

この方式により、以下のようなメリットが得られます:

ただし、メッセージが数十バイト程度の場合、圧縮によって逆にサイズが増えることもあります。ユースケースに応じて判断してください。

Producer-Consumerパターンの実装

Producer実装のポイント

ProducerとConsumerは共通のinternalパッケージを使用することで、エンコード/デコード処理の一貫性を保っています。

CLI Producercmd/producer/main.go):

notification := &samplepb.Notification{
    Message: proto.String(message),
}
if err := internal.SendNotification(ctx, messageOp, notification); err != nil {
    return fmt.Errorf("failed to send notification: %v", err)
}

API Server Producercmd/producer-api-server/main.go):

handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
    body, err := io.ReadAll(r.Body)
    if err != nil {
        http.Error(w, "failed to read request body", http.StatusBadRequest)
        return
    }

    notification := &samplepb.Notification{
        Message: proto.String(string(body)),
    }
    if err := internal.SendNotification(r.Context(), messageOp, notification); err != nil {
        http.Error(w, fmt.Sprintf("failed to send notification: %w", err), http.StatusInternalServerError)
        return
    }
})

どちらもinternal.SendNotification関数を呼び出すだけで、エンコード処理は内部で行われます:

func SendNotification(ctx context.Context, messageOp simplemq.MessageAPI, notification *samplepb.Notification) error {
    content, err := encodeProtoMessage(notification)
    if err != nil {
        return fmt.Errorf("failed to encode message: %v", err)
    }

    resSend, err := messageOp.Send(ctx, content)
    if err != nil {
        return fmt.Errorf("failed to send message: %v", err)
    }

    slog.Info("Message Sent", slog.String("ID", string(resSend.ID)))
    return nil
}

Consumer実装のポイント

Consumer Worker(cmd/consumer-worker/main.go)は、1秒ごとにシンプルMQをポーリングします:

ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

for {
    select {
    case <-ctx.Done():
        return ctx.Err()
    case <-ticker.C:
        if err := receiveMessages(ctx, client, queueName); err != nil {
            slog.ErrorContext(ctx, "failed to receive messages", slog.Any("error", err))
        }
    }
}

メッセージ受信処理では、internal.ReceiveNotificationsを使用してデコードと削除(acknowledge)を行います:

func ReceiveNotifications(ctx context.Context, messageOp simplemq.MessageAPI) ([]*samplepb.Notification, error) {
    // 1. シンプルMQからメッセージを受信
    messages, err := messageOp.Receive(ctx)
    if err != nil {
        return nil, fmt.Errorf("failed to receive messages: %v", err)
    }
    slog.Info("messages received", slog.Int("count", len(messages)))

    var notifications []*samplepb.Notification

    // 2. 各メッセージを処理
    for _, msg := range messages {
        if err := func() error {
            // 2-1. デコード
            var notification samplepb.Notification
            if err := decodeProtoMessage(string(msg.Content), &notification); err != nil {
                return fmt.Errorf("failed to decode message ID %s: %v", msg.ID, err)
            }
            slog.Info("message received", slog.String("id", string(msg.ID)), slog.String("content", notification.GetMessage()))

            notifications = append(notifications, &notification)

            // 2-2. メッセージを削除してacknowledge
            if err := messageOp.Delete(ctx, string(msg.ID)); err != nil {
                return fmt.Errorf("failed to delete message ID %s: %v", msg.ID, err)
            }
            slog.Info("message deleted", slog.String("id", string(msg.ID)))

            return nil
        }(); err != nil {
            slog.Error("failed to process message", slog.String("id", string(msg.ID)), slog.Any("error", err))
        }
    }
    return notifications, nil
}

実践的なユースケースと運用Tips

よくあるユースケース

運用上の考慮事項

メッセージ設定の調整

Terraformで定義した以下のパラメータは、用途に応じて調整が必要です:

複数Consumerによるスケーリング

処理量が増えた場合、Consumer Workerを複数起動することで並列処理が可能です:

# ターミナル1
go run cmd/consumer-worker/main.go -queue=playground-mq

# ターミナル2
go run cmd/consumer-worker/main.go -queue=playground-mq

# ターミナル3(さらに追加)
go run cmd/consumer-worker/main.go -queue=playground-mq

それぞれのWorkerが異なるメッセージを処理するため、処理能力が向上します。

エラーハンドリングとリトライ

このサンプル実装には、本番運用で危険な点があります。デコード失敗などの恒久的に成功しない処理が発生した場合、そのメッセージは削除されないため、可視性タイムアウトが切れると再配信され続けます。 このようなメッセージをPoison messageと呼びます。

本番環境ではこのPoison messageに対して以下のような対策を講じることを推奨します:

モニタリング

以下の指標を監視することで、システムの健全性を把握できます:

まとめ

この記事では、さくらのクラウドの「シンプルMQ」を使ったProducer-Consumerパターンの実装例を紹介しました。Protocol Buffers + Deflate圧縮により、型安全性とメッセージサイズの削減を両立しながら、将来的な拡張性も確保できます。

次のステップ

このサンプルをベースに、以下のような拡張も考えられます:

サンプルコードはGitHubで公開していますので、ぜひ試してみてください。

さくらのクラウド シンプルMQを活用して、スケーラブルで堅牢なイベント駆動アプリケーションを構築しましょう!


明日は@linyowsさんの「Stalwartについて書く」です。

またしても何もわからないのですが、このようにさくらインターネットでは様々な技術分野で活躍されている方が多いので、とても勉強になりますね。

ぜひ他の記事もチェックしてみてください!