name: dynamodb-design description: | Amazon DynamoDB のテーブル設計パターン。パーティションキー・ソートキー設計、GSI/LSI、 Single Table Design、キャパシティモード(オンデマンド/プロビジョンド)、TTL、 DynamoDB Streams、トランザクション、バックアップ戦略を網羅する。NoSQLデータモデリングの 基礎からプロダクション運用まで、実践的な設計パターンを提供する。 license: Apache-2.0 metadata: author: engineers-hub version: "1.0.0" category: aws-infrastructure tags: [dynamodb, nosql, single-table-design, gsi, lsi, capacity, ttl, streams] dependencies: []
DynamoDB テーブル設計パターン
このスキルを使うタイミング
- サーバーレスアーキテクチャのデータストアを選定するとき
- ミリ秒レベルのレイテンシが求められるとき
- キーバリュー型のアクセスパターンが主体のとき
- Single Table Design でデータモデルを統合するとき
- DynamoDB Streams でイベント駆動処理を実装するとき
- TTL で自動的にデータを期限切れにするとき
基本パターン
テーブル定義(Terraform)
resource "aws_dynamodb_table" "main" {
name = "${var.project}-${var.environment}-main"
billing_mode = var.environment == "production" ? "PROVISIONED" : "PAY_PER_REQUEST"
hash_key = "PK"
range_key = "SK"
# プロビジョンドモード時のキャパシティ
dynamic "provisioned_throughput" {
for_each = var.environment == "production" ? [1] : []
content {
read_capacity = var.read_capacity
write_capacity = var.write_capacity
}
}
attribute {
name = "PK"
type = "S"
}
attribute {
name = "SK"
type = "S"
}
attribute {
name = "GSI1PK"
type = "S"
}
attribute {
name = "GSI1SK"
type = "S"
}
attribute {
name = "GSI2PK"
type = "S"
}
attribute {
name = "GSI2SK"
type = "S"
}
# GSI1: エンティティタイプ別のクエリ用
global_secondary_index {
name = "GSI1"
hash_key = "GSI1PK"
range_key = "GSI1SK"
projection_type = "ALL"
dynamic "provisioned_throughput" {
for_each = var.environment == "production" ? [1] : []
content {
read_capacity = var.gsi_read_capacity
write_capacity = var.gsi_write_capacity
}
}
}
# GSI2: 逆引き・別軸のクエリ用
global_secondary_index {
name = "GSI2"
hash_key = "GSI2PK"
range_key = "GSI2SK"
projection_type = "ALL"
dynamic "provisioned_throughput" {
for_each = var.environment == "production" ? [1] : []
content {
read_capacity = var.gsi_read_capacity
write_capacity = var.gsi_write_capacity
}
}
}
# TTL
ttl {
attribute_name = "expiresAt"
enabled = true
}
# DynamoDB Streams
stream_enabled = true
stream_view_type = "NEW_AND_OLD_IMAGES"
# ポイントインタイムリカバリ
point_in_time_recovery {
enabled = true
}
# サーバーサイド暗号化
server_side_encryption {
enabled = true
kms_key_arn = var.kms_key_arn
}
# 削除保護
deletion_protection_enabled = var.environment == "production"
tags = {
Name = "${var.project}-${var.environment}-main"
Environment = var.environment
}
}
Auto Scaling(プロビジョンドモード)
resource "aws_appautoscaling_target" "dynamodb_read" {
count = var.environment == "production" ? 1 : 0
max_capacity = var.max_read_capacity
min_capacity = var.read_capacity
resource_id = "table/${aws_dynamodb_table.main.name}"
scalable_dimension = "dynamodb:table:ReadCapacityUnits"
service_namespace = "dynamodb"
}
resource "aws_appautoscaling_policy" "dynamodb_read" {
count = var.environment == "production" ? 1 : 0
name = "${var.project}-${var.environment}-dynamodb-read-scaling"
policy_type = "TargetTrackingScaling"
resource_id = aws_appautoscaling_target.dynamodb_read[0].resource_id
scalable_dimension = aws_appautoscaling_target.dynamodb_read[0].scalable_dimension
service_namespace = aws_appautoscaling_target.dynamodb_read[0].service_namespace
target_tracking_scaling_policy_configuration {
predefined_metric_specification {
predefined_metric_type = "DynamoDBReadCapacityUtilization"
}
target_value = 70
}
}
resource "aws_appautoscaling_target" "dynamodb_write" {
count = var.environment == "production" ? 1 : 0
max_capacity = var.max_write_capacity
min_capacity = var.write_capacity
resource_id = "table/${aws_dynamodb_table.main.name}"
scalable_dimension = "dynamodb:table:WriteCapacityUnits"
service_namespace = "dynamodb"
}
resource "aws_appautoscaling_policy" "dynamodb_write" {
count = var.environment == "production" ? 1 : 0
name = "${var.project}-${var.environment}-dynamodb-write-scaling"
policy_type = "TargetTrackingScaling"
resource_id = aws_appautoscaling_target.dynamodb_write[0].resource_id
scalable_dimension = aws_appautoscaling_target.dynamodb_write[0].scalable_dimension
service_namespace = aws_appautoscaling_target.dynamodb_write[0].service_namespace
target_tracking_scaling_policy_configuration {
predefined_metric_specification {
predefined_metric_type = "DynamoDBWriteCapacityUtilization"
}
target_value = 70
}
}
Single Table Design
// Single Table Design のキー設計例
// エンティティ: User, Order, OrderItem
// アクセスパターン:
// 1. ユーザー取得: PK=USER#<userId>, SK=PROFILE
// 2. ユーザーの注文一覧: PK=USER#<userId>, SK=begins_with(ORDER#)
// 3. 注文詳細: PK=ORDER#<orderId>, SK=METADATA
// 4. 注文の商品一覧: PK=ORDER#<orderId>, SK=begins_with(ITEM#)
// 5. ステータス別注文検索: GSI1PK=STATUS#<status>, GSI1SK=<createdAt>
interface DynamoDBItem {
PK: string;
SK: string;
GSI1PK?: string;
GSI1SK?: string;
entityType: string;
[key: string]: unknown;
}
// ユーザーエンティティ
function createUserItem(user: User): DynamoDBItem {
return {
PK: `USER#${user.id}`,
SK: "PROFILE",
GSI1PK: `EMAIL#${user.email}`,
GSI1SK: `USER#${user.id}`,
entityType: "User",
id: user.id,
name: user.name,
email: user.email,
createdAt: user.createdAt,
};
}
// 注文エンティティ
function createOrderItem(order: Order): DynamoDBItem {
return {
PK: `ORDER#${order.id}`,
SK: "METADATA",
GSI1PK: `STATUS#${order.status}`,
GSI1SK: order.createdAt,
GSI2PK: `USER#${order.userId}`,
GSI2SK: `ORDER#${order.createdAt}`,
entityType: "Order",
id: order.id,
userId: order.userId,
status: order.status,
totalAmount: order.totalAmount,
createdAt: order.createdAt,
};
}
// ユーザーの注文一覧にもアイテムを配置(非正規化)
function createUserOrderItem(order: Order): DynamoDBItem {
return {
PK: `USER#${order.userId}`,
SK: `ORDER#${order.createdAt}#${order.id}`,
entityType: "UserOrder",
orderId: order.id,
status: order.status,
totalAmount: order.totalAmount,
createdAt: order.createdAt,
};
}
クエリ実装
import { DynamoDBDocumentClient, QueryCommand, TransactWriteCommand } from "@aws-sdk/lib-dynamodb";
const TABLE_NAME = process.env.TABLE_NAME!;
// ユーザーの注文一覧を取得
async function getUserOrders(
client: DynamoDBDocumentClient,
userId: string,
limit: number = 20,
lastEvaluatedKey?: Record<string, unknown>,
): Promise<{ orders: Order[]; nextKey?: Record<string, unknown> }> {
const result = await client.send(
new QueryCommand({
TableName: TABLE_NAME,
KeyConditionExpression: "PK = :pk AND begins_with(SK, :skPrefix)",
ExpressionAttributeValues: {
":pk": `USER#${userId}`,
":skPrefix": "ORDER#",
},
ScanIndexForward: false, // 新しい順
Limit: limit,
ExclusiveStartKey: lastEvaluatedKey,
}),
);
return {
orders: (result.Items ?? []) as Order[],
nextKey: result.LastEvaluatedKey,
};
}
// ステータス別注文検索(GSI1使用)
async function getOrdersByStatus(
client: DynamoDBDocumentClient,
status: string,
since: string,
): Promise<Order[]> {
const result = await client.send(
new QueryCommand({
TableName: TABLE_NAME,
IndexName: "GSI1",
KeyConditionExpression: "GSI1PK = :pk AND GSI1SK >= :since",
ExpressionAttributeValues: {
":pk": `STATUS#${status}`,
":since": since,
},
}),
);
return (result.Items ?? []) as Order[];
}
// トランザクション(注文作成: 複数アイテムを一括書き込み)
async function createOrder(
client: DynamoDBDocumentClient,
order: Order,
items: OrderItem[],
): Promise<void> {
const transactItems = [
// 注文メタデータ
{
Put: {
TableName: TABLE_NAME,
Item: createOrderItem(order),
ConditionExpression: "attribute_not_exists(PK)", // 冪等性
},
},
// ユーザーの注文一覧用アイテム
{
Put: {
TableName: TABLE_NAME,
Item: createUserOrderItem(order),
},
},
// 注文商品
...items.map((item) => ({
Put: {
TableName: TABLE_NAME,
Item: {
PK: `ORDER#${order.id}`,
SK: `ITEM#${item.id}`,
entityType: "OrderItem",
...item,
},
},
})),
];
await client.send(new TransactWriteCommand({ TransactItems: transactItems }));
}
TTL によるデータライフサイクル管理
// セッションデータに TTL を設定
async function createSession(
client: DynamoDBDocumentClient,
session: Session,
): Promise<void> {
const ttlSeconds = Math.floor(Date.now() / 1000) + 24 * 60 * 60; // 24時間後
await client.send(
new PutCommand({
TableName: TABLE_NAME,
Item: {
PK: `SESSION#${session.id}`,
SK: "DATA",
entityType: "Session",
userId: session.userId,
data: session.data,
expiresAt: ttlSeconds, // Unix epoch (秒)
createdAt: new Date().toISOString(),
},
}),
);
}
DynamoDB Streams 連携
# Streams を Lambda でコンシュームする
resource "aws_lambda_event_source_mapping" "dynamodb_stream" {
event_source_arn = aws_dynamodb_table.main.stream_arn
function_name = aws_lambda_function.stream_processor.arn
starting_position = "LATEST"
batch_size = 100
maximum_batching_window_in_seconds = 5
filter_criteria {
filter {
pattern = jsonencode({
eventName = ["INSERT", "MODIFY"]
dynamodb = {
NewImage = {
entityType = {
S = ["Order"]
}
}
}
})
}
}
function_response_types = ["ReportBatchItemFailures"]
}
必須ルール
- アクセスパターンを先に定義 -- テーブル構造はクエリ要件から逆算して設計する(RDBのように正規化しない)
- パーティションキーのカーディナリティを確保 -- ホットパーティションを避けるため、十分に分散するキーを設計する
- ポイントインタイムリカバリを有効化 -- 本番環境では必ず有効にし、誤削除からの復旧手段を確保する
- TTL の値は Unix epoch(秒) -- ミリ秒ではなく秒単位で設定する(誤ると遠い未来に期限切れになる)
- トランザクションは 100 アイテム以内 --
TransactWriteItems/TransactGetItemsの上限 - GSI のキャパシティを個別に設計 -- テーブルとは独立してスロットリングが発生するため
- Streams のフィルタリングを活用 -- 不要なレコードを Lambda に送らずコストとパフォーマンスを最適化
アンチパターン
- RDB の正規化をそのまま適用する(結合操作がないため非正規化が基本)
- Scan で全件取得する(コスト・パフォーマンスの両面で問題)
- パーティションキーに日付や固定値を使用する(ホットパーティション)
- GSI を過剰に作成する(テーブルあたり最大20、書き込みコストが増大)
- TTL を Number 型以外で設定する(正しく動作しない)
- 大きなアイテム(400KB超)を格納する(S3 + DynamoDB のポインタパターンを使用)
ConsistentRead: trueを常に指定する(結果整合性で十分なケースが大半)FilterExpressionでデータを絞り込む設計にする(RCU は全件分消費される)
テスト戦略
DynamoDB 設計のテストでは以下のケースを検証する:
- CRUD 検証: 各エンティティの作成・読み取り・更新・削除が正常に動作すること
- クエリ検証: 全てのアクセスパターンで期待通りの結果が返ること
- トランザクション検証: 複数アイテムの一括書き込みが成功し、部分失敗が発生しないこと
- 冪等性検証: 同一データの再投入がエラーまたは冪等に処理されること
- TTL 検証: 期限切れアイテムが DynamoDB Streams に DELETE イベントとして流れること
- パフォーマンス検証: 想定スループットでスロットリングが発生しないこと
- ホットパーティション検証: CloudWatch の
ConsumedReadCapacityUnitsでパーティション偏りを確認