Datastreamサービス追加手順書
目的
既存のDatastream環境(VPC、VPN、Private Connectionなど)に新しいサービスのDatastreamを追加する手順書です。
前提条件
初回構築が完了していること
このドキュメントは、以下の共有リソースが既に作成されていることを前提としています:
- GCPプロジェクト(
fd-datastream-prdまたはfd-datastream-stg) - VPC・サブネット
- VPN接続(AWS Site-to-Site VPN、HA VPN Gateway、VPNトンネル等)
- Datastream Private Connection
- 共有Firewall(Health Check用)
初回構築がまだの場合は、先に Datastream初回構築手順書 を実施してください。
プロジェクト構成
- 1つのGCPプロジェクト内に複数のDatastreamが存在
- プロジェクト名(環境ごと):
- production:
fd-datastream-prd - staging:
fd-datastream-stg
- production:
- 複数のサービス(例: online-ops, online-karte)が同一プロジェクト内に共存
- プロジェクト名(環境ごと):
- VPNは1つのプロジェクトに対して1つ
- 複数サービスで同じVPN接続を共有
このドキュメントで作成する個別リソース
各サービス追加時に以下のリソースを作成します:
- Secrets Manager (DBユーザーパスワード)
- GCEインスタンステンプレート
- GCEインスタンスグループマネージャー
- パススルーNLB(固定IP、Backend Service、Forwarding Rule)
- Firewall(LB→GCE用)
- Datastream Connection Profile(source/destination)
- Datastream Stream
- BigQuery Dataset
- 監視設定(GCPアラート、Datadogモニター)
サービス追加手順
1. Secrets Manager作成(DBパスワード用)
【サービスごと】
Secrets Manager作成
- 【サービスごと】 追加するサービスごとに作成
- DBユーザーのパスワード用
設定ルール
google_secret_manager_secret- secret_id
${var.service}-db-user-password
- project
- GCPプロジェクト名を指定
- replication
auto
- labels
service = var.service
google_secret_manager_secret_version- secret
google_secret_manager_secret.db_user_password_${service}.id
- secret_data
"dummy"- AWS側をoutput参照すると値が露出してしまうため、apply後に手動で登録する
- lifecycle
- ignore_changes
[secret_data, enabled]- 構築後に差分が設定するため
- ignore_changes
GCPコンソールから手動でDBパスワードを設定
- Secrets Managerにアクセス
- 該当のシークレット(
${service}-db-user-password)を選択 新しいバージョンでパスワードを追加- 過去のバージョンは破棄せず無効にすること
2. GCEプロキシ作成
【サービスごと】
GCEのインスタンステンプレート (
google_compute_region_instance_template) を作成- 【サービスごと】 追加するサービスごとに作成
設定ルール
name
${var.service}-${var.env}-gce-template
region
- VPN接続先と同じリージョンを指定
machine_type
e2-micro- モニタリングしつつ必要なら増強する
disk
- source_image
debian-cloud/debian-12
- disk_size_gb
20GB- モニタリングしつつ必要なら増強する
- source_image
network_interface
- network
google_compute_network.main.self_link
- subnetwork
google_compute_subnetwork.private.self_link
- network
shielded_instance_config
- enable_secure_boot
true
- enable_vtpm
true
- enable_secure_boot
metadata_startup_script
- Datastreamの制約上、DBのライターインスタンスのIPアドレスしか指定できない。起動時に動的に名前解決できるようにする。
bash<<EOT #! /bin/bash DB_ENDPOINT="${local.db_endpoint_${service}}" CONF_FILE="/etc/rinetd.conf" apt update -y apt install -y rinetd if ! grep -q "$DB_ENDPOINT" "$CONF_FILE"; then echo "0.0.0.0 5432 $DB_ENDPOINT 5432" >> "$CONF_FILE" fi systemctl restart rinetd systemctl enable rinetd EOTtags
["${var.service}-allow-lb-postgres-datastream", "allow-lb-health-check"]
labels
service = var.service
GCEのインスタンスグループマネージャー (
google_compute_region_instance_group_manager) を作成- 【サービスごと】 追加するサービスごとに作成
設定ルール
- name
${var.service}-${var.env}-gce-group
- base_instance_name
${var.service}-${var.env}-gce
- region
- VPN接続先と同じリージョンを指定
- distribution_policy_zones
["${var.region}-a", "${var.region}-b"]
- version
- instance_template
google_compute_region_instance_template.${service}.id- 作成したインスタンステンプレートのidを指定
- instance_template
- target_size
2以上とする- 冗長化のため
- named_port
- name:
postgresql - port:
5432
- name:
- update_policy
- type:
"PROACTIVE" - minimal_action:
"REPLACE" - max_surge_fixed
- インスタンス数と同じ値
- インスタンス入れ替え時にダウンを避けるため
- インスタンス数と同じ値
- max_unavailable_fixed
0- ダウンタイムを発生させない
- type:
- standby_policy
- initial_delay_sec:
30
- initial_delay_sec:
3. パススルーNLB作成
【サービスごと】
固定IP (
google_compute_address) を作成- 【サービスごと】 追加するサービスごとに作成
設定ルール
- name
${var.service}-${var.env}-lb-ip
- region
- VPN接続先と同じリージョンを指定
- subnetwork
google_compute_subnetwork.private.id- 作成したプライベートサブネットのID
- address_type
INTERNAL
- labels
service = var.service
ヘルスチェック (
google_compute_region_health_check) の作成または参照- 【初回サービス追加時】 新規作成(共有リソース)
- 【2回目以降のサービス追加時】 既存リソースを参照
設定ルール(初回のみ作成)
- name
${var.service}-${var.env}-health-check-ssh
- region
- VPN接続先と同じリージョンを指定
- tcp_health_check
- port:
22- GCEでデフォルトでリッスンしている22番ポートを指定
- port:
バックエンドサービス (
google_compute_region_backend_service) の作成- 【サービスごと】 追加するサービスごとに作成
設定ルール
- name
${var.service}-${var.env}-backend-service
- region
- VPN接続先と同じリージョンを指定
- load_balancing_scheme
INTERNAL
- protocol
TCP
- backend
- group
google_compute_region_instance_group_manager.${service}.instance_group- 作成したインスタンスグループマネージャーを指定
- balancing_mode
CONNECTION- パススルーNLBの場合は
CONNECTION固定となる - 「同時接続数」を基準にバックエンドの空きキャパシティを判断して振り分ける
- パススルーNLBの場合は
- group
- health_checks
[google_compute_region_health_check.ssh.id]- 作成したヘルスチェックのIDを指定
フォワーディングルール (
google_compute_forwarding_rule) を作成- 【サービスごと】 追加するサービスごとに作成
設定ルール
- name
${var.service}-${var.env}-forward-rule
- region
- VPN接続先と同じリージョンを指定
- ip_protocol
TCP
- load_balancing_scheme
INTERNAL
- backend_service
google_compute_region_backend_service.${service}.id- 作成したバックエンドのIDを指定
- network
google_compute_network.main.id- VPCのIDを指定
- subnetwork
google_compute_subnetwork.private.id- プライベートサブネットのIDを指定
- ports
["5432"]
- ip_address
google_compute_address.${service}.address- 作成した固定IPアドレスを指定
- labels
service = var.service
4. Firewall設定
【サービスごと + 共有リソース更新】
Health Check用Firewall
- 【既存リソースを参照】 初回構築時に作成済み
- 注意: 新しくは作成せず、既存のFirewallルールを参照
参照するリソース名
- name
${var.service}-${var.env}-allow-lb-health-check
- GoogleのネットワークからGCEに対するヘルスチェックのルール
Datastream→LB用Firewall作成/更新(共有)
- 【初回サービス追加時】 新規作成(共有リソース)
- 【2回目以降のサービス追加時】
destination_rangesに新しいLB IPを追加
設定ルール
- name
${var.service}-${var.env}-allow-datastream-to-lb
- direction
INGRESS
- network
google_compute_network.main.id- VPCのIDを指定
- source_ranges
[var.datastream_subnet_ip_range]- datastreamのネットワークのcidrを指定
- allow
- protocol:
tcp - ports:
["5432"]
- protocol:
- destination_ranges
- 全サービスのLB固定IPをリストで指定(LBはタグ指定できないため)
- 初回:
["${google_compute_address.online_ops.address}/32"] - 2回目以降: 既存のIPリストに新しいサービスのLB IPを追加
LB→GCE用Firewall作成(サービスごと)
- 【サービスごと】 追加するサービスごとに作成
設定ルール
- name
${var.service}-${var.env}-allow-lb-to-gce
- direction
INGRESS
- network
google_compute_network.main.id- VPCのIDを指定
- source_ranges
["${google_compute_address.${service}.address}/32"]- サービス専用のLB固定IPを指定
- allow
- protocol:
tcp - ports:
["5432"]
- protocol:
- target_tags
["${var.service}-allow-lb-postgres-datastream"]- サービス専用のタグ
6. DBセキュリティグループの設定(AWS側)
【サービスごと】
- 対象DBのセキュリティグループにインバウンドルールを追加
- GCP側で作成したプライベートサブネットのCIDRからポート5432への接続を許可
7. DBのパラメータ設定(AWS側)
【サービスごと】
以下のモジュールのいずれかで定義追加可能
- 対応モジュール
/template_modules/microservice-ecs/template_modules/options/rds-aurora
設定内容
cluster_parameter_group_custom_enable
true
cluster_parameter_group_params
- 設定値
hcl"rds.logical_replication" = { value = "1" apply_method = "pending-reboot" } "max_slot_wal_keep_size" = { value = "1000" # MB apply_method = "immediate" } "wal_sender_timeout" = { value = "0" # 無制限 apply_method = "immediate" } "max_wal_senders" = { value = "20" apply_method = "pending-reboot" }設定例
- variableで
cluster_parameter_group_paramsを定義し、default指定
hclvariable "cluster_parameter_group_params" { type = map(object({ value = string apply_method = string })) default = { "rds.logical_replication" = { value = "1" apply_method = "pending-reboot" } "max_slot_wal_keep_size" = { value = "1000" # MB apply_method = "immediate" } "wal_sender_timeout" = { value = "0" # 無制限 apply_method = "immediate" } "max_wal_senders" = { value = "20" apply_method = "pending-reboot" } } }- variableで
- 対応モジュール
8. DBの再起動を実施
【サービスごと】
パラメータグループの適用のため、DBを再起動する
9. DBに手動でDatastream用の設定追加
【サービスごと】重要: ストリーム開始直前に実施すること(レプリケーションスロット作成から1GB以内にストリーム開始が必要)
AWS Secrets ManagerにDatastreamユーザーのパスワードを追加
- 既存のDBシークレットに
DATASTREAM_USER_PASSWORDキーを追加
- 既存のDBシークレットに
AWSマネジメントコンソールから踏み台EC2用のポリシー作成
- ポリシー名:
tmp-${project}-for-setup-datastream
権限
hcl{ "Version": "2012-10-17", "Statement": [ { "Action": [ "secretsmanager:GetSecretValue" ], "Effect": "Allow", "Resource": "*" }, { "Effect": "Allow", "Action": [ "s3:List*", "s3:GetObject" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "rds:DescribeDBClusters", "rds:DescribeDBInstances" ], "Resource": "*" } ] }- ポリシー名:
踏み台EC2のIAMロールに作成した
tmp-${project}-for-setup-datastreamポリシーをアタッチ- 踏み台EC2の条件
- 対象DBと同じプライベートサブネットに所属
- 対象DBのセキュリティグループで踏み台からの接続許可ルールがある
- 踏み台EC2の条件
S3にスクリプト用バケット作成
bashaws s3api create-bucket --bucket work-setup-db-datastream --profile ${AWS_PROFILE} --region ap-northeast-1 --create-bucket-configuration LocationConstraint="ap-northeast-1"バケットにスクリプトをアップロード
bashaws s3 cp ./setup_datastream.sh "s3://work-setup-db-datastream/" --profile ${AWS_PROFILE} --region ap-northeast-1SSMで踏み台(fd-platform)にログイン
tmp-${project}-for-setup-datastreamポリシーをアタッチしたロールをもつ踏み台EC2のインスタンスIDについて、コンソールから確認する
bashaws ssm start-session --target ${ec2のインスタンスID指定} --profile ${AWS_PROFILE} --region ap-northeast-1バケットから踏み台にスクリプトをコピー
bashaws s3 cp s3://work-setup-db-datastream/setup_datastream.sh ./ chmod 755 setup_datastream.shスクリプトの内容(変数を書き換えること)
bash#!/bin/bash TIMESTAMP=$(date +"%Y%m%d_%H%M%S") LOGFILE="setup_datastream_$TIMESTAMP.log" SECRET_ID="${Secrets Manager name}" DB_CLUSTER_NAME="${DB_CLUSTER_NAME}" DB_HOST=$(aws rds describe-db-clusters --db-cluster-identifier "$DB_CLUSTER_NAME"--query 'DBClusters[0].Endpoint' --output text) DB_PORT="5432" DB_NAME="${DB_NAME}" DB_USER=$(aws secretsmanager get-secret-value --secret-id "$SECRET_ID" --querySecretString --output text | jq -r '.DB_USERNAME') echo "Retrieving PostgreSQL admin password." | tee -a "$LOGFILE" PGPASSWORD=$(aws secretsmanager get-secret-value --secret-id "$SECRET_ID" --querySecretString --output text | jq -r '.DB_PASSWORD') if [ -z "$PGPASSWORD" ]; then echo "ERROR: Failed to retrieve PostgreSQL admin password." exit 1 fi echo "Admin password retrieved successfully." | tee -a "$LOGFILE" echo "Retrieving datastream user password." | tee -a "$LOGFILE" DATASTREAM_USER_PASSWORD=$(aws secretsmanager get-secret-value --secret-id"$SECRET_ID" --query SecretString --output text | jq -r 'DATASTREAM_USER_PASSWORD') if [ -z "$DATASTREAM_USER_PASSWORD" ]; then echo "ERROR: Failed to retrieve datastream password." exit 1 fi echo "Datastream password retrieved successfully." | tee -a "$LOGFILE" export DB_HOST DB_PORT DB_NAME DB_USER PGPASSWORD set -e execute_sql() { echo "----------------------------------------" | tee -a "$LOGFILE" echo "Executing: $1" | tee -a "$LOGFILE" echo "----------------------------------------" | tee -a "$LOGFILE" OUTPUT=$(psql -h "$DB_HOST" -p "$DB_PORT" -U "$DB_USER" -d "$DB_NAME" -v ON_ERROR_STOP=1 -c "$2" 2>&1 | tee -a "$LOGFILE") if [ $? -ne 0 ]; then echo "ERROR: SQL execution failed." | tee -a "$LOGFILE" echo "$OUTPUT" | tee -a "$LOGFILE" exit 1 fi echo "Done" | tee -a "$LOGFILE" echo "" | tee -a "$LOGFILE" } execute_sql "CREATE PUBLICATION" "CREATE PUBLICATION datastream FOR ALL TABLES;" execute_sql "SELECT PUBLICATION" " SELECT pubname, pubowner::regrole, puballtables, pubinsert, pubupdate, pubdelete,pubtruncate FROM pg_publication;" execute_sql "CREATE PG_CREATE_LOGICAL_REPLICATION_SLOT" "SELECTPG_CREATE_LOGICAL_REPLICATION_SLOT('datastream','pgoutput');" execute_sql "SELECT PG_CREATE_LOGICAL_REPLICATION_SLOT" " SELECT slot_name, plugin, slot_type, active, confirmed_flush_lsn FROM pg_replication_slots;" execute_sql "CREATE USER" "CREATE USER datastream WITH PASSWORD'$DATASTREAM_USER_PASSWORD';" execute_sql "GRANT RDS_REPLICATION" "GRANT RDS_REPLICATION TO datastream;" execute_sql "GRANT SELECT ON ALL TABLES" "GRANT SELECT ON ALL TABLES IN SCHEMApublic TO datastream;" execute_sql "GRANT USAGE ON SCHEMA" "GRANT USAGE ON SCHEMA public TO datastream;" execute_sql "ALTER DEFAULT PRIVILEGES IN SCHEMA" "ALTER DEFAULT PRIVILEGES INSCHEMA public GRANT SELECT ON TABLES TO datastream;" execute_sql "SELECT GRANT" " SELECT grantee, privilege_type, table_schema, table_name FROM information_schema.role_table_grants WHERE grantee = 'datastream';" execute_sql "SELECT ROLE" " SELECT rolname, rolsuper, rolreplication, rolcanlogin, rolcreatedb FROM pg_roles;" echo "Completed setup_datastream." | tee -a "$LOGFILE"コマンドインストール
bashyum install -y jq yum install -y postgresqlスクリプトを実行
bash./setup_datastream.shログにエラーがないことを確認
- ログファイル:
setup_datastream_YYYYMMDD_HHMMSS.log
- ログファイル:
S3のスクリプト削除
bashaws s3 rm s3://work-setup-db-datastream --recursive --profile ${AWS_PROFILE} --region ap-northeast-1S3のバケット削除
bashaws s3api delete-bucket --bucket work-setup-db-datastream --profile ${AWS_PROFILE} --region ap-northeast-1tmp-${project}-for-setup-datastreamポリシーをロールからデタッチtmp-${project}-for-setup-datastreamポリシーを削除
10. Datastream設定
【サービスごと】重要: 前の手順(レプリケーションスロット作成)から1GB以上書き込まれる前にストリーム開始まで実施すること
Connection Profile作成(転送元)
- 【サービスごと】 追加するサービスごとに作成
設定ルール
Terraformリソース名:
google_datastream_connection_profile.source_${service}- connection_profile_id
${var.service}-source
- display_name
${var.service}-source
- project
- GCPプロジェクト名を指定
- location
- VPN接続先と同じリージョンを指定
- postgresql_profile
- hostname
google_compute_address.${service}.address- パススルーNLBの固定IPアドレス指定
- port
5432
- database
- 対象DBのデータベース名
- username
datastream- Datastreamユーザー指定
- password
data.google_secret_manager_secret_version.db_user_password_${service}.secret_data- Secret Managerの値をdata参照で取得
- hostname
- private_connectivity
- private_connection
google_datastream_private_connection.main.id- プライベート接続構成のID指定(共有リソース)
- private_connection
- depends_on
google_secret_manager_secret_version.db_user_password_${service}google_compute_forwarding_rule.${service}google_compute_router_peer.main
Connection Profile作成(転送先: BigQuery)
- 【サービスごと】 追加するサービスごとに作成
設定ルール
Terraformリソース名:
google_datastream_connection_profile.destination_${service}- connection_profile_id
${var.service}-destination
- display_name
${var.service}-destination
- project
- GCPプロジェクト名を指定
- location
- VPN接続先と同じリージョンを指定
- bigquery_profile
{}
- depends_on
google_bigquery_dataset.${service}_public
BigQuery Dataset作成
- 【サービスごと】 追加するサービスごとに作成
設定ルール
Terraformリソース名:
google_bigquery_dataset.${service}_public- dataset_id
- サービス名に応じた適切なDataset ID
- location
- VPN接続先と同じリージョンを指定
- project
- GCPプロジェクト名を指定
- access
- このブロック内でroleなどの権限を指定する
Datastream Stream作成
- 【サービスごと】 追加するサービスごとに作成
設定ルール
Terraformリソース名:
google_datastream_stream.${service}- stream_id
${var.service}
- display_name
${var.service}
- project
- GCPプロジェクト名を指定
- location
- VPN接続先と同じリージョンを指定
- desired_state
"NOT_STARTED"- 作成時に意図せず開始しないように
- source_config
- source_connection_profile
google_datastream_connection_profile.source_${service}.id- 接続元プロファイルのID指定
- postgresql_source_config
- source_connection_profile
- destination_config
- destination_connection_profile
google_datastream_connection_profile.destination_${service}.id- 接続先プロファイルのIDを指定
- bigquery_destination_config
- data_freshness
300s- 送信元と送信先の間の時間差であるが、要件によって変更すること
- 狭めればその分コストとDB負荷がかかる
- source_hierarchy_datasets
- dataset_template
- location
- VPN接続先と同じリージョンを指定
- dataset_id_prefix
- サービス固有のプレフィックス
- location
- dataset_template
- data_freshness
- destination_connection_profile
- backfill_all
{}- バックフィル対象を自動で全て含む
11. ストリーム開始とモニタリング
【サービスごと】
GCPのコンソールからストリームを開始する
- GCP Console > Datastream > Streams
- 該当のStreamを選択
STARTをクリック
DBの負荷状況やDatastreamの遅延状況をモニタリングする
Datastream監視設定
対象: サービス追加時に実行
12. GCP監視設定
【サービスごと】
GCPのDatastreamを設定したモジュール(gcp/services/fd-datastream/${env})内で以下を追加
- production:
gcp/services/fd-datastream/production - staging:
gcp/services/fd-datastream/staging
Datastreamステータス監視
- 【サービスごと】 追加するサービスごとに作成
- GCPアラートポリシー(ログ監視)で実装
設定内容
Terraformリソース:
google_logging_metric.datastream_status_error_${service}- project
- GCPプロジェクト名を指定
- name
datastream-status-error-${var.service}
- filter
severity="ERROR" AND jsonPayload.eventCode="STREAM_STATE_CHANGED" AND resource.labels.stream_id="${var.service}" - metric_descriptor
- metric_kind:
"DELTA" - value_type:
"INT64" - unit:
"1" - display_name:
datastream-status-error-${var.service}
- metric_kind:
Terraformリソース:
google_monitoring_alert_policy.datastream_status_error_${service}- project
- GCPプロジェクト名を指定
- combiner
"OR"
- display_name
datastream-status-error-${var.service}
- notification_channels
- 通知先のSlackチャネルを指定
- サービスごとに異なる通知先を設定可能
- 指定方法参考
- severity
"CRITICAL"
- alert_strategy
- notification_prompts:
["OPENED", "CLOSED"]
- notification_prompts:
- conditions
- display_name:
datastream-status-error-${var.service} - condition_threshold
- filter
"resource.type=\"datastream.googleapis.com/Stream\" AND metric.type=\"logging.googleapis.com/user/${google_logging_metric.datastream_status_error_${service}.name}\"" - threshold_value:
0- 閾値0より大きい場合
- comparison:
"COMPARISON_GT" - duration:
"60s"- 再テストウィンドウ
- aggregations
- alignment_period:
"300s"- ローリングウィンドウ
- per_series_aligner:
"ALIGN_SUM" - cross_series_reducer:
"REDUCE_SUM"
- alignment_period:
- trigger
- count:
1- 1つ以上の上記の条件を満たせばアラートをトリガー
- percent:
0
- count:
- filter
- display_name:
- documentation
- content
"Datastream (${var.service})のステータスがエラーとなり同期できていない可能性があります。" - subject
"datastream-status-error (${var.service})"
- content
サポートされていないイベント数の監視
- 【サービスごと】 追加するサービスごとに作成
- GCPアラートポリシー(ログ監視)で実装
設定内容
Terraformリソース:
google_logging_metric.datastream_unsupported_events_${service}- project
- GCPプロジェクト名を指定
- name
datastream-unsupported-events-${var.service}
- filter
severity="WARNING" AND jsonPayload.event_code="UNSUPPORTED_EVENTS_DISCARDED" AND resource.labels.stream_id="${var.service}" - metric_descriptor
- metric_kind:
"DELTA" - value_type:
"INT64" - unit:
"1" - display_name:
datastream-unsupported-events
- metric_kind:
Terraformリソース:
google_monitoring_alert_policy.datastream_unsupported_events_${service}- project
- GCPプロジェクト名を指定
- combiner
"OR"
- display_name
datastream-unsupported-events-${var.service}
- notification_channels
- 通知先のSlackチャネルを指定
- サービスごとに異なる通知先を設定可能
- severity
"CRITICAL"
- alert_strategy
- notification_prompts:
["OPENED", "CLOSED"]
- notification_prompts:
- conditions
- display_name:
datastream-unsupported-events - condition_threshold
- filter
"resource.type=\"datastream.googleapis.com/Stream\" AND metric.type=\"logging.googleapis.com/user/${google_logging_metric.datastream_unsupported_events_${service}.name}\"" - threshold_value:
0 - comparison:
"COMPARISON_GT" - duration:
"60s" - aggregations
- alignment_period:
"300s" - per_series_aligner:
"ALIGN_SUM" - cross_series_reducer:
"REDUCE_SUM"
- alignment_period:
- trigger
- count:
1 - percent:
0
- count:
- filter
- display_name:
- documentation
- content
"Datastream (${var.service})のサポートされていないイベントを検知しました。BigQueryに転送できていないイベントがあります。" - subject
"datastream-unsupported-events (${var.service})"
- content
13. Datadog監視設定
【サービスごと】
/terraform_for_aws/datadog/services/${service}/${env}/monitor配下で以下を追加
レイテンシ監視
- 【サービスごと】 追加するサービスごとに作成
- ファイル名:
datastream-latency.tftpl - 参考実装
設定内容
- query:
max(last_5m):max:gcp.datastream.stream.total_latencies.avg{stream_id:${service}} >= ${threshold} - 閾値
- タイムウィンドウ:
5m - critical:
600(10分以上) - critical_recovery: リカバリ閾値
- タイムウィンドウ:
- tags:
["service:${service}"]
データ鮮度の監視
- 【サービスごと】 追加するサービスごとに作成
- ファイル名:
datastream-freshness.tftpl - 参考実装
設定内容
- query:
max(last_5m):max:gcp.datastream.stream.freshness{stream_id:${service}} >= ${threshold} - 閾値
- タイムウィンドウ:
5m - critical:
600(10分以上) - critical_recovery: リカバリ閾値
- タイムウィンドウ:
- tags:
["service:${service}"]
Tips
事象
- terraformでdatastream作成時に以下のエラーが発生した場合
╷
│ Error: Error waiting to create Stream: Error waiting for Creating Stream: {"@type":"type.googleapis.com/google.rpc.ErrorInfo","domain":"datastream.googleapis.com","metadata":{"message":"Some validations failed to complete successfully, see the full list in the operation metadata.","originalMessage":"","time":"2025-03-25T08:39:35Z","uuid":"ae744438-0954-11f0-a769-beb01129c829"},"reason":"VALIDATION_FAILURE"}
│ {"code":"POSTGRES_VALIDATE_REPLICATION_SLOT","description":"Validates that the replication slot exists and not lost.","message":[{"code":"POSTGRES_REPLICATION_SLOT_LOST","level":"ERROR","message":"The stream has failed because the replication slot {slot_name} status is 'lost' (wal_status). Make sure that the slot is active or recreate the slot, and recover the stream.","metadata":{"original_error":"None","slot_name":"datastream"}}],"state":"FAILED"}
│
│
│
│ with google_datastream_stream.main,
│ on datastream.tf line 57, in resource "google_datastream_stream" "main":
│ 57: resource "google_datastream_stream" "main" {
│
╵原因
lostは、必要なWALファイルの一部が削除されており、このスロットはもはや利用可能ではないことを意味- レプリケーションスロットを作成してから時間が経ってしまい、max_slot_wal_keep_size(1GB)を超えたためwalが一部削除されて追えなくなったため
対処法
レプリケーションスロットを再作成する。
# 存在確認
SELECT * FROM pg_replication_slots;
# 削除
SELECT pg_drop_replication_slot('datastream');
# 作成
SELECT PG_CREATE_LOGICAL_REPLICATION_SLOT('datastream', 'pgoutput');
# 存在確認
SELECT * FROM pg_replication_slots;関連ドキュメント
- Datastream初回構築手順書 - 初回構築時の共有リソース作成手順