Floci × SQS / SNS / EventBridge ハンズオン — メッセージング基盤のローカル検証【2026年版】

Floci × SQS / SNS / EventBridge ハンズオン — メッセージング基盤のローカル検証【2026年版】
目次

この記事でわかること

  • FlociでSQSキューを作成しメッセージを送受信する基本
  • SNSトピックとサブスクリプション、SNS→SQSのFan-out
  • FIFOキュー・デッドレターキュー(DLQ)
  • EventBridgeのカスタムバス・ルール・ターゲット設定
  • Lambda連携によるイベント駆動アーキテクチャの検証

イベント駆動アーキテクチャの心臓部である SQS / SNS / EventBridge。Flociはこの3つをすべてローカル再現でき、本番とほぼ同じメッセージング挙動を開発段階で検証できます。

Flociの基本は 入門記事 を参照してください。


SQS — キューでの送受信

キュー作成

aws sqs create-queue --queue-name tasks --endpoint-url http://localhost:4566
# => "QueueUrl": "http://localhost:4566/000000000000/tasks"

メッセージ送受信(Python)

import boto3

sqs = boto3.client(
    "sqs", endpoint_url="http://localhost:4566", region_name="us-east-1",
    aws_access_key_id="test", aws_secret_access_key="test",
)

url = "http://localhost:4566/000000000000/tasks"

# 送信
sqs.send_message(QueueUrl=url, MessageBody="hello")

# 受信
res = sqs.receive_message(QueueUrl=url, MaxNumberOfMessages=1, WaitTimeSeconds=5)
for m in res.get("Messages", []):
    print(m["Body"])
    sqs.delete_message(QueueUrl=url, ReceiptHandle=m["ReceiptHandle"])

FIFOキュー

aws sqs create-queue \
  --queue-name orders.fifo \
  --attributes FifoQueue=true,ContentBasedDeduplication=true \
  --endpoint-url http://localhost:4566
sqs.send_message(
    QueueUrl="http://localhost:4566/000000000000/orders.fifo",
    MessageBody="order-001",
    MessageGroupId="user-a",
)

デッドレターキュー(DLQ)

aws sqs create-queue --queue-name tasks-dlq --endpoint-url http://localhost:4566

aws sqs set-queue-attributes \
  --queue-url http://localhost:4566/000000000000/tasks \
  --attributes '{
    "RedrivePolicy": "{\"deadLetterTargetArn\":\"arn:aws:sqs:us-east-1:000000000000:tasks-dlq\",\"maxReceiveCount\":\"3\"}"
  }' \
  --endpoint-url http://localhost:4566

3回リトライ失敗後、DLQに自動転送されます。


SNS — トピックとFan-out

トピック作成と発行

sns = boto3.client("sns", endpoint_url="http://localhost:4566",
                   region_name="us-east-1",
                   aws_access_key_id="test", aws_secret_access_key="test")

topic = sns.create_topic(Name="user-events")["TopicArn"]
sns.publish(TopicArn=topic, Subject="Welcome", Message="Hello SNS")

SNS → SQS Fan-out

1つのSNSメッセージを複数のSQSキューへ配信する典型パターン:

q1 = sqs.create_queue(QueueName="email-queue")["QueueUrl"]
q2 = sqs.create_queue(QueueName="audit-queue")["QueueUrl"]

def arn_of(url):
    return sqs.get_queue_attributes(
        QueueUrl=url, AttributeNames=["QueueArn"]
    )["Attributes"]["QueueArn"]

sns.subscribe(TopicArn=topic, Protocol="sqs", Endpoint=arn_of(q1))
sns.subscribe(TopicArn=topic, Protocol="sqs", Endpoint=arn_of(q2))

sns.publish(TopicArn=topic, Message="new user signed up")

両キューに同じメッセージが届きます。

メッセージフィルタリング

sns.subscribe(
    TopicArn=topic, Protocol="sqs", Endpoint=arn_of(q1),
    Attributes={"FilterPolicy": '{"event_type": ["signup"]}'},
)

sns.publish(
    TopicArn=topic,
    Message='{"user": "alice"}',
    MessageAttributes={"event_type": {"DataType": "String", "StringValue": "signup"}},
)

EventBridge — イベントバスとルール

カスタムバス作成

aws events create-event-bus --name app-bus --endpoint-url http://localhost:4566

ルール作成(Lambdaターゲット)

aws events put-rule \
  --name order-created-rule \
  --event-bus-name app-bus \
  --event-pattern '{"source":["myapp.orders"],"detail-type":["OrderCreated"]}' \
  --endpoint-url http://localhost:4566

aws events put-targets \
  --rule order-created-rule \
  --event-bus-name app-bus \
  --targets 'Id=1,Arn=arn:aws:lambda:us-east-1:000000000000:function:hello' \
  --endpoint-url http://localhost:4566

イベント発行

events = boto3.client("events", endpoint_url="http://localhost:4566",
                     region_name="us-east-1",
                     aws_access_key_id="test", aws_secret_access_key="test")

events.put_events(Entries=[{
    "Source": "myapp.orders",
    "DetailType": "OrderCreated",
    "Detail": '{"orderId":"o-001","amount":1500}',
    "EventBusName": "app-bus",
}])

パターンに一致したイベントが自動的にLambdaへ配信されます。

スケジュールルール(Cron)

aws events put-rule \
  --name daily-job \
  --schedule-expression "cron(0 9 * * ? *)" \
  --endpoint-url http://localhost:4566

定期実行の設定もローカルで検証可能です。


組み合わせパターン例

API Gateway → Lambda → SNS → SQS → Lambda

[HTTP Request]
     ↓
[API Gateway (Floci)]
     ↓
[Lambda: order-validator]
     ↓
[SNS: order-events]
     ↓ (Fan-out)
[SQS: email-queue]  [SQS: audit-queue]
     ↓                     ↓
[Lambda: email]     [Lambda: audit]

このマイクロサービス構成のE2Eを、Floci 1コンテナだけで検証できます。


Node.js(AWS SDK v3)例

import { SQSClient, SendMessageCommand } from "@aws-sdk/client-sqs";

const sqs = new SQSClient({
  endpoint: "http://localhost:4566",
  region: "us-east-1",
  credentials: { accessKeyId: "test", secretAccessKey: "test" },
});

await sqs.send(new SendMessageCommand({
  QueueUrl: "http://localhost:4566/000000000000/tasks",
  MessageBody: "hello from node",
}));

よくあるエラー

SQSキューURLが解決できない

コンテナ内からは http://floci:4566/000000000000/queue-name のように、FLOCI_HOSTNAME と一致させる。

SNSサブスクリプションが配信されない

SQSキューのARNが正しいか、またFIFOキューは FIFOトピック にのみサブスクライブ可能。

EventBridgeルールが発火しない

EventPattern のJSONが厳密にマッチしていない。sourcedetail-type を完全一致で確認。

FIFOでメッセージが重複

ContentBasedDeduplication=true を有効にするか、MessageDeduplicationId を明示指定。


FAQ

Q. Visibility TimeoutやLong Pollingは動作する?

はい。VisibilityTimeoutWaitTimeSeconds ともに本番同様に機能します。

Q. SNSのHTTPSサブスクリプションは?

基本的な確認リクエスト送信に対応。実運用のTLS検証までは再現されません。

Q. EventBridge Schedulerは?

基本的なスケジュール機能に対応。フレキシブル時間枠など一部詳細は未対応の可能性あり。

Q. メッセージの可視化ツールは?

Flociに管理画面はありません。awslocal 系コマンドまたは sqs-admin などの外部ツールを利用してください。


典型ユースケース別の実装例

1. 注文処理パイプライン(SQS)

[API Gateway] → [Lambda: validate] → [SQS: orders]
                                         ↓
                         [Lambda: fulfill] → [DynamoDB]
                                         ↓ (失敗時)
                                      [SQS: orders-dlq]

SQS FIFO + グループIDでユーザー単位の順序保証を実現:

sqs.send_message(
    QueueUrl=fifo_url,
    MessageBody='{"orderId":"o-001","amount":1500}',
    MessageGroupId="user-001",
    MessageDeduplicationId="o-001",
)

2. 通知Fan-out(SNS)

ユーザー登録時にメール・Slack・監査ログへ同時配信:

[Lambda: signup] → [SNS: user-events]
                          ↓ (Fan-out)
              [SQS: email] [SQS: slack] [SQS: audit]
                  ↓            ↓            ↓
              [Lambda]     [Lambda]     [Lambda]

メッセージフィルタリングで受信側の関心事を限定できます。

3. ETLスケジュール(EventBridge)

events.put_rule(
    Name="daily-etl",
    ScheduleExpression="cron(0 2 * * ? *)",  # 毎日2時
    State="ENABLED",
)
events.put_targets(
    Rule="daily-etl",
    Targets=[{
        "Id": "1",
        "Arn": "arn:aws:lambda:us-east-1:000000000000:function:etl",
    }]
)

バックプレッシャーとスロットリング対策

大量メッセージで下流が詰まる事態を事前に検証:

# SQS側の Visibility Timeout で処理時間に猶予を
sqs.set_queue_attributes(
    QueueUrl=url,
    Attributes={"VisibilityTimeout": "120"},
)

# Lambda側の同時実行制限
lam.put_function_concurrency(
    FunctionName="fulfill",
    ReservedConcurrentExecutions=10,
)

下流への流量制御をFloci上で実測できます。


メッセージスキーマ管理

JSONスキーマで送受信契約を担保する運用例:

import json, jsonschema

SCHEMA = {
    "type": "object",
    "required": ["orderId", "amount"],
    "properties": {
        "orderId": {"type": "string"},
        "amount": {"type": "number", "minimum": 0},
    },
}

def send_order(order):
    jsonschema.validate(order, SCHEMA)
    sqs.send_message(QueueUrl=url, MessageBody=json.dumps(order))

スキーマ不正なメッセージが送信前に弾かれるため、下流バグを防ぎます。


エラーハンドリングパターン

リトライ+指数バックオフ

import time, random
for attempt in range(5):
    try:
        process_message(msg)
        break
    except TransientError:
        time.sleep((2 ** attempt) + random.random())

DLQ監視

dlq = sqs.get_queue_attributes(
    QueueUrl="http://localhost:4566/000000000000/tasks-dlq",
    AttributeNames=["ApproximateNumberOfMessages"],
)
count = int(dlq["Attributes"]["ApproximateNumberOfMessages"])
if count > 0:
    print(f"⚠️ DLQ に {count}件のメッセージ")

CIでDLQ件数をアサーションすることで、デグレを早期検知できます。


監視・オブザーバビリティ

本番ではCloudWatch Metricsを使う部分を、ローカルでは:

  • メッセージ滞留数を ApproximateNumberOfMessages で定期ポーリング
  • 処理時間をアプリ側で計測し、OpenTelemetryで集約
  • エラーログを構造化ログ(JSON)で出力し、jq で集計

ログ・メトリクスの出力フォーマットをローカルで固めておけば、本番移行時の観測性が下がりません。


Node.js 例:メッセージ処理ワーカー

import { SQSClient, ReceiveMessageCommand, DeleteMessageCommand }
  from "@aws-sdk/client-sqs";

const sqs = new SQSClient({
  endpoint: "http://localhost:4566", region: "us-east-1",
  credentials: { accessKeyId: "test", secretAccessKey: "test" },
});

while (true) {
  const res = await sqs.send(new ReceiveMessageCommand({
    QueueUrl: QUEUE_URL, MaxNumberOfMessages: 5, WaitTimeSeconds: 10,
  }));
  for (const m of res.Messages ?? []) {
    await handle(JSON.parse(m.Body));
    await sqs.send(new DeleteMessageCommand({
      QueueUrl: QUEUE_URL, ReceiptHandle: m.ReceiptHandle,
    }));
  }
}

本番との差異

  • TLS・KMS暗号化 の本番水準での検証は不可
  • SNSのSMS/Email配信 は実際には送信されない(メタデータだけ)
  • EventBridge Pipes / Scheduler の詳細機能は未対応
  • IAM条件付きアクセス は簡略評価

まとめ

  • FlociはSQS / SNS / EventBridgeの主要機能を一通り再現
  • FIFO・DLQ・メッセージフィルタリング・スケジュールルールに対応
  • イベント駆動アーキテクチャのE2Eテストがローカル1コンテナで完結
  • 本番挙動との差異は最小限だが、TLS・IAM厳密評価は別途検証

関連記事