Floci × SQS / SNS / EventBridge ハンズオン — メッセージング基盤のローカル検証【2026年版】

目次
この記事でわかること
- FlociでSQSキューを作成しメッセージを送受信する基本
- SNSトピックとサブスクリプション、SNS→SQSのFan-out
- FIFOキュー・デッドレターキュー(DLQ)
- EventBridgeのカスタムバス・ルール・ターゲット設定
- Lambda連携によるイベント駆動アーキテクチャの検証
イベント駆動アーキテクチャの心臓部である SQS / SNS / EventBridge。Flociはこの3つをすべてローカル再現でき、本番とほぼ同じメッセージング挙動を開発段階で検証できます。
Flociの基本は 入門記事 を参照してください。
SQS — キューでの送受信
キュー作成
aws sqs create-queue --queue-name tasks --endpoint-url http://localhost:4566
# => "QueueUrl": "http://localhost:4566/000000000000/tasks"
メッセージ送受信(Python)
import boto3
sqs = boto3.client(
"sqs", endpoint_url="http://localhost:4566", region_name="us-east-1",
aws_access_key_id="test", aws_secret_access_key="test",
)
url = "http://localhost:4566/000000000000/tasks"
# 送信
sqs.send_message(QueueUrl=url, MessageBody="hello")
# 受信
res = sqs.receive_message(QueueUrl=url, MaxNumberOfMessages=1, WaitTimeSeconds=5)
for m in res.get("Messages", []):
print(m["Body"])
sqs.delete_message(QueueUrl=url, ReceiptHandle=m["ReceiptHandle"])
FIFOキュー
aws sqs create-queue \
--queue-name orders.fifo \
--attributes FifoQueue=true,ContentBasedDeduplication=true \
--endpoint-url http://localhost:4566
sqs.send_message(
QueueUrl="http://localhost:4566/000000000000/orders.fifo",
MessageBody="order-001",
MessageGroupId="user-a",
)
デッドレターキュー(DLQ)
aws sqs create-queue --queue-name tasks-dlq --endpoint-url http://localhost:4566
aws sqs set-queue-attributes \
--queue-url http://localhost:4566/000000000000/tasks \
--attributes '{
"RedrivePolicy": "{\"deadLetterTargetArn\":\"arn:aws:sqs:us-east-1:000000000000:tasks-dlq\",\"maxReceiveCount\":\"3\"}"
}' \
--endpoint-url http://localhost:4566
3回リトライ失敗後、DLQに自動転送されます。
SNS — トピックとFan-out
トピック作成と発行
sns = boto3.client("sns", endpoint_url="http://localhost:4566",
region_name="us-east-1",
aws_access_key_id="test", aws_secret_access_key="test")
topic = sns.create_topic(Name="user-events")["TopicArn"]
sns.publish(TopicArn=topic, Subject="Welcome", Message="Hello SNS")
SNS → SQS Fan-out
1つのSNSメッセージを複数のSQSキューへ配信する典型パターン:
q1 = sqs.create_queue(QueueName="email-queue")["QueueUrl"]
q2 = sqs.create_queue(QueueName="audit-queue")["QueueUrl"]
def arn_of(url):
return sqs.get_queue_attributes(
QueueUrl=url, AttributeNames=["QueueArn"]
)["Attributes"]["QueueArn"]
sns.subscribe(TopicArn=topic, Protocol="sqs", Endpoint=arn_of(q1))
sns.subscribe(TopicArn=topic, Protocol="sqs", Endpoint=arn_of(q2))
sns.publish(TopicArn=topic, Message="new user signed up")
両キューに同じメッセージが届きます。
メッセージフィルタリング
sns.subscribe(
TopicArn=topic, Protocol="sqs", Endpoint=arn_of(q1),
Attributes={"FilterPolicy": '{"event_type": ["signup"]}'},
)
sns.publish(
TopicArn=topic,
Message='{"user": "alice"}',
MessageAttributes={"event_type": {"DataType": "String", "StringValue": "signup"}},
)
EventBridge — イベントバスとルール
カスタムバス作成
aws events create-event-bus --name app-bus --endpoint-url http://localhost:4566
ルール作成(Lambdaターゲット)
aws events put-rule \
--name order-created-rule \
--event-bus-name app-bus \
--event-pattern '{"source":["myapp.orders"],"detail-type":["OrderCreated"]}' \
--endpoint-url http://localhost:4566
aws events put-targets \
--rule order-created-rule \
--event-bus-name app-bus \
--targets 'Id=1,Arn=arn:aws:lambda:us-east-1:000000000000:function:hello' \
--endpoint-url http://localhost:4566
イベント発行
events = boto3.client("events", endpoint_url="http://localhost:4566",
region_name="us-east-1",
aws_access_key_id="test", aws_secret_access_key="test")
events.put_events(Entries=[{
"Source": "myapp.orders",
"DetailType": "OrderCreated",
"Detail": '{"orderId":"o-001","amount":1500}',
"EventBusName": "app-bus",
}])
パターンに一致したイベントが自動的にLambdaへ配信されます。
スケジュールルール(Cron)
aws events put-rule \
--name daily-job \
--schedule-expression "cron(0 9 * * ? *)" \
--endpoint-url http://localhost:4566
定期実行の設定もローカルで検証可能です。
組み合わせパターン例
API Gateway → Lambda → SNS → SQS → Lambda
[HTTP Request]
↓
[API Gateway (Floci)]
↓
[Lambda: order-validator]
↓
[SNS: order-events]
↓ (Fan-out)
[SQS: email-queue] [SQS: audit-queue]
↓ ↓
[Lambda: email] [Lambda: audit]
このマイクロサービス構成のE2Eを、Floci 1コンテナだけで検証できます。
Node.js(AWS SDK v3)例
import { SQSClient, SendMessageCommand } from "@aws-sdk/client-sqs";
const sqs = new SQSClient({
endpoint: "http://localhost:4566",
region: "us-east-1",
credentials: { accessKeyId: "test", secretAccessKey: "test" },
});
await sqs.send(new SendMessageCommand({
QueueUrl: "http://localhost:4566/000000000000/tasks",
MessageBody: "hello from node",
}));
よくあるエラー
SQSキューURLが解決できない
コンテナ内からは http://floci:4566/000000000000/queue-name のように、FLOCI_HOSTNAME と一致させる。
SNSサブスクリプションが配信されない
SQSキューのARNが正しいか、またFIFOキューは FIFOトピック にのみサブスクライブ可能。
EventBridgeルールが発火しない
EventPattern のJSONが厳密にマッチしていない。source と detail-type を完全一致で確認。
FIFOでメッセージが重複
ContentBasedDeduplication=true を有効にするか、MessageDeduplicationId を明示指定。
FAQ
Q. Visibility TimeoutやLong Pollingは動作する?
はい。VisibilityTimeout、WaitTimeSeconds ともに本番同様に機能します。
Q. SNSのHTTPSサブスクリプションは?
基本的な確認リクエスト送信に対応。実運用のTLS検証までは再現されません。
Q. EventBridge Schedulerは?
基本的なスケジュール機能に対応。フレキシブル時間枠など一部詳細は未対応の可能性あり。
Q. メッセージの可視化ツールは?
Flociに管理画面はありません。awslocal 系コマンドまたは sqs-admin などの外部ツールを利用してください。
典型ユースケース別の実装例
1. 注文処理パイプライン(SQS)
[API Gateway] → [Lambda: validate] → [SQS: orders]
↓
[Lambda: fulfill] → [DynamoDB]
↓ (失敗時)
[SQS: orders-dlq]
SQS FIFO + グループIDでユーザー単位の順序保証を実現:
sqs.send_message(
QueueUrl=fifo_url,
MessageBody='{"orderId":"o-001","amount":1500}',
MessageGroupId="user-001",
MessageDeduplicationId="o-001",
)
2. 通知Fan-out(SNS)
ユーザー登録時にメール・Slack・監査ログへ同時配信:
[Lambda: signup] → [SNS: user-events]
↓ (Fan-out)
[SQS: email] [SQS: slack] [SQS: audit]
↓ ↓ ↓
[Lambda] [Lambda] [Lambda]
メッセージフィルタリングで受信側の関心事を限定できます。
3. ETLスケジュール(EventBridge)
events.put_rule(
Name="daily-etl",
ScheduleExpression="cron(0 2 * * ? *)", # 毎日2時
State="ENABLED",
)
events.put_targets(
Rule="daily-etl",
Targets=[{
"Id": "1",
"Arn": "arn:aws:lambda:us-east-1:000000000000:function:etl",
}]
)
バックプレッシャーとスロットリング対策
大量メッセージで下流が詰まる事態を事前に検証:
# SQS側の Visibility Timeout で処理時間に猶予を
sqs.set_queue_attributes(
QueueUrl=url,
Attributes={"VisibilityTimeout": "120"},
)
# Lambda側の同時実行制限
lam.put_function_concurrency(
FunctionName="fulfill",
ReservedConcurrentExecutions=10,
)
下流への流量制御をFloci上で実測できます。
メッセージスキーマ管理
JSONスキーマで送受信契約を担保する運用例:
import json, jsonschema
SCHEMA = {
"type": "object",
"required": ["orderId", "amount"],
"properties": {
"orderId": {"type": "string"},
"amount": {"type": "number", "minimum": 0},
},
}
def send_order(order):
jsonschema.validate(order, SCHEMA)
sqs.send_message(QueueUrl=url, MessageBody=json.dumps(order))
スキーマ不正なメッセージが送信前に弾かれるため、下流バグを防ぎます。
エラーハンドリングパターン
リトライ+指数バックオフ
import time, random
for attempt in range(5):
try:
process_message(msg)
break
except TransientError:
time.sleep((2 ** attempt) + random.random())
DLQ監視
dlq = sqs.get_queue_attributes(
QueueUrl="http://localhost:4566/000000000000/tasks-dlq",
AttributeNames=["ApproximateNumberOfMessages"],
)
count = int(dlq["Attributes"]["ApproximateNumberOfMessages"])
if count > 0:
print(f"⚠️ DLQ に {count}件のメッセージ")
CIでDLQ件数をアサーションすることで、デグレを早期検知できます。
監視・オブザーバビリティ
本番ではCloudWatch Metricsを使う部分を、ローカルでは:
- メッセージ滞留数を
ApproximateNumberOfMessagesで定期ポーリング - 処理時間をアプリ側で計測し、OpenTelemetryで集約
- エラーログを構造化ログ(JSON)で出力し、
jqで集計
ログ・メトリクスの出力フォーマットをローカルで固めておけば、本番移行時の観測性が下がりません。
Node.js 例:メッセージ処理ワーカー
import { SQSClient, ReceiveMessageCommand, DeleteMessageCommand }
from "@aws-sdk/client-sqs";
const sqs = new SQSClient({
endpoint: "http://localhost:4566", region: "us-east-1",
credentials: { accessKeyId: "test", secretAccessKey: "test" },
});
while (true) {
const res = await sqs.send(new ReceiveMessageCommand({
QueueUrl: QUEUE_URL, MaxNumberOfMessages: 5, WaitTimeSeconds: 10,
}));
for (const m of res.Messages ?? []) {
await handle(JSON.parse(m.Body));
await sqs.send(new DeleteMessageCommand({
QueueUrl: QUEUE_URL, ReceiptHandle: m.ReceiptHandle,
}));
}
}
本番との差異
- TLS・KMS暗号化 の本番水準での検証は不可
- SNSのSMS/Email配信 は実際には送信されない(メタデータだけ)
- EventBridge Pipes / Scheduler の詳細機能は未対応
- IAM条件付きアクセス は簡略評価
まとめ
- FlociはSQS / SNS / EventBridgeの主要機能を一通り再現
- FIFO・DLQ・メッセージフィルタリング・スケジュールルールに対応
- イベント駆動アーキテクチャのE2Eテストがローカル1コンテナで完結
- 本番挙動との差異は最小限だが、TLS・IAM厳密評価は別途検証