Skip to content

Datastreamサービス追加手順書

目的

既存のDatastream環境(VPC、VPN、Private Connectionなど)に新しいサービスのDatastreamを追加する手順書です。

前提条件

初回構築が完了していること

このドキュメントは、以下の共有リソースが既に作成されていることを前提としています:

  • GCPプロジェクト(fd-datastream-prd または fd-datastream-stg
  • VPC・サブネット
  • VPN接続(AWS Site-to-Site VPN、HA VPN Gateway、VPNトンネル等)
  • Datastream Private Connection
  • 共有Firewall(Health Check用)

初回構築がまだの場合は、先に Datastream初回構築手順書 を実施してください。

プロジェクト構成

  • 1つのGCPプロジェクト内に複数のDatastreamが存在
    • プロジェクト名(環境ごと):
      • production: fd-datastream-prd
      • staging: fd-datastream-stg
    • 複数のサービス(例: online-ops, online-karte)が同一プロジェクト内に共存
  • VPNは1つのプロジェクトに対して1つ
    • 複数サービスで同じVPN接続を共有

このドキュメントで作成する個別リソース

各サービス追加時に以下のリソースを作成します:

  • Secrets Manager (DBユーザーパスワード)
  • GCEインスタンステンプレート
  • GCEインスタンスグループマネージャー
  • パススルーNLB(固定IP、Backend Service、Forwarding Rule)
  • Firewall(LB→GCE用)
  • Datastream Connection Profile(source/destination)
  • Datastream Stream
  • BigQuery Dataset
  • 監視設定(GCPアラート、Datadogモニター)

サービス追加手順

1. Secrets Manager作成(DBパスワード用)

【サービスごと】

  1. Secrets Manager作成

    • 【サービスごと】 追加するサービスごとに作成
    • DBユーザーのパスワード用
    設定ルール

    google_secret_manager_secret

    • secret_id
      • ${var.service}-db-user-password
    • project
      • GCPプロジェクト名を指定
    • replication
      • auto
    • labels
      • service = var.service

    google_secret_manager_secret_version

    • secret
      • google_secret_manager_secret.db_user_password_${service}.id
    • secret_data
      • "dummy"
        • AWS側をoutput参照すると値が露出してしまうため、apply後に手動で登録する
    • lifecycle
      • ignore_changes
        • [secret_data, enabled]
          • 構築後に差分が設定するため
  2. GCPコンソールから手動でDBパスワードを設定

    • Secrets Managerにアクセス
    • 該当のシークレット(${service}-db-user-password)を選択
    • 新しいバージョンでパスワードを追加
    • 過去のバージョンは破棄せず無効にすること

2. GCEプロキシ作成

【サービスごと】

  1. GCEのインスタンステンプレート (google_compute_region_instance_template) を作成

    • 【サービスごと】 追加するサービスごとに作成
    設定ルール
    • name

      • ${var.service}-${var.env}-gce-template
    • region

      • VPN接続先と同じリージョンを指定
    • machine_type

      • e2-micro
        • モニタリングしつつ必要なら増強する
    • disk

      • source_image
        • debian-cloud/debian-12
      • disk_size_gb
        • 20 GB
          • モニタリングしつつ必要なら増強する
    • network_interface

      • network
        • google_compute_network.main.self_link
      • subnetwork
        • google_compute_subnetwork.private.self_link
    • shielded_instance_config

      • enable_secure_boot
        • true
      • enable_vtpm
        • true
    • metadata_startup_script

      • Datastreamの制約上、DBのライターインスタンスのIPアドレスしか指定できない。起動時に動的に名前解決できるようにする。
      bash
      <<EOT
        #! /bin/bash
        DB_ENDPOINT="${local.db_endpoint_${service}}"
        CONF_FILE="/etc/rinetd.conf"
        apt update -y
        apt install -y rinetd
        if ! grep -q "$DB_ENDPOINT" "$CONF_FILE"; then
          echo "0.0.0.0 5432 $DB_ENDPOINT 5432" >> "$CONF_FILE"
        fi
        systemctl restart rinetd
        systemctl enable rinetd
        EOT
    • tags

      • ["${var.service}-allow-lb-postgres-datastream", "allow-lb-health-check"]
    • labels

      • service = var.service
  2. GCEのインスタンスグループマネージャー (google_compute_region_instance_group_manager) を作成

    • 【サービスごと】 追加するサービスごとに作成
    設定ルール
    • name
      • ${var.service}-${var.env}-gce-group
    • base_instance_name
      • ${var.service}-${var.env}-gce
    • region
      • VPN接続先と同じリージョンを指定
    • distribution_policy_zones
      • ["${var.region}-a", "${var.region}-b"]
    • version
      • instance_template
        • google_compute_region_instance_template.${service}.id
          • 作成したインスタンステンプレートのidを指定
    • target_size
      • 2以上とする
        • 冗長化のため
    • named_port
      • name: postgresql
      • port: 5432
    • update_policy
      • type: "PROACTIVE"
      • minimal_action: "REPLACE"
      • max_surge_fixed
        • インスタンス数と同じ値
          • インスタンス入れ替え時にダウンを避けるため
      • max_unavailable_fixed
        • 0
          • ダウンタイムを発生させない
    • standby_policy
      • initial_delay_sec: 30

3. パススルーNLB作成

【サービスごと】

  1. 固定IP (google_compute_address) を作成

    • 【サービスごと】 追加するサービスごとに作成
    設定ルール
    • name
      • ${var.service}-${var.env}-lb-ip
    • region
      • VPN接続先と同じリージョンを指定
    • subnetwork
      • google_compute_subnetwork.private.id
        • 作成したプライベートサブネットのID
    • address_type
      • INTERNAL
    • labels
      • service = var.service
  2. ヘルスチェック (google_compute_region_health_check) の作成または参照

    • 【初回サービス追加時】 新規作成(共有リソース)
    • 【2回目以降のサービス追加時】 既存リソースを参照
    設定ルール(初回のみ作成)
    • name
      • ${var.service}-${var.env}-health-check-ssh
    • region
      • VPN接続先と同じリージョンを指定
    • tcp_health_check
      • port: 22
        • GCEでデフォルトでリッスンしている22番ポートを指定
  3. バックエンドサービス (google_compute_region_backend_service) の作成

    • 【サービスごと】 追加するサービスごとに作成
    設定ルール
    • name
      • ${var.service}-${var.env}-backend-service
    • region
      • VPN接続先と同じリージョンを指定
    • load_balancing_scheme
      • INTERNAL
    • protocol
      • TCP
    • backend
      • group
        • google_compute_region_instance_group_manager.${service}.instance_group
          • 作成したインスタンスグループマネージャーを指定
      • balancing_mode
        • CONNECTION
          • パススルーNLBの場合はCONNECTION固定となる
          • 「同時接続数」を基準にバックエンドの空きキャパシティを判断して振り分ける
    • health_checks
      • [google_compute_region_health_check.ssh.id]
        • 作成したヘルスチェックのIDを指定
  4. フォワーディングルール (google_compute_forwarding_rule) を作成

    • 【サービスごと】 追加するサービスごとに作成
    設定ルール
    • name
      • ${var.service}-${var.env}-forward-rule
    • region
      • VPN接続先と同じリージョンを指定
    • ip_protocol
      • TCP
    • load_balancing_scheme
      • INTERNAL
    • backend_service
      • google_compute_region_backend_service.${service}.id
        • 作成したバックエンドのIDを指定
    • network
      • google_compute_network.main.id
        • VPCのIDを指定
    • subnetwork
      • google_compute_subnetwork.private.id
        • プライベートサブネットのIDを指定
    • ports
      • ["5432"]
    • ip_address
      • google_compute_address.${service}.address
        • 作成した固定IPアドレスを指定
    • labels
      • service = var.service

4. Firewall設定

【サービスごと + 共有リソース更新】

  1. Health Check用Firewall

    • 【既存リソースを参照】 初回構築時に作成済み
    • 注意: 新しくは作成せず、既存のFirewallルールを参照
    参照するリソース名
    • name
      • ${var.service}-${var.env}-allow-lb-health-check
    • GoogleのネットワークからGCEに対するヘルスチェックのルール
  2. Datastream→LB用Firewall作成/更新(共有)

    • 【初回サービス追加時】 新規作成(共有リソース)
    • 【2回目以降のサービス追加時】 destination_rangesに新しいLB IPを追加
    設定ルール
    • name
      • ${var.service}-${var.env}-allow-datastream-to-lb
    • direction
      • INGRESS
    • network
      • google_compute_network.main.id
        • VPCのIDを指定
    • source_ranges
      • [var.datastream_subnet_ip_range]
        • datastreamのネットワークのcidrを指定
    • allow
      • protocol: tcp
      • ports: ["5432"]
    • destination_ranges
      • 全サービスのLB固定IPをリストで指定(LBはタグ指定できないため)
      • 初回: ["${google_compute_address.online_ops.address}/32"]
      • 2回目以降: 既存のIPリストに新しいサービスのLB IPを追加
  3. LB→GCE用Firewall作成(サービスごと)

    • 【サービスごと】 追加するサービスごとに作成
    設定ルール
    • name
      • ${var.service}-${var.env}-allow-lb-to-gce
    • direction
      • INGRESS
    • network
      • google_compute_network.main.id
        • VPCのIDを指定
    • source_ranges
      • ["${google_compute_address.${service}.address}/32"]
        • サービス専用のLB固定IPを指定
    • allow
      • protocol: tcp
      • ports: ["5432"]
    • target_tags
      • ["${var.service}-allow-lb-postgres-datastream"]
        • サービス専用のタグ

6. DBセキュリティグループの設定(AWS側)

【サービスごと】

  1. 対象DBのセキュリティグループにインバウンドルールを追加
    • GCP側で作成したプライベートサブネットのCIDRからポート5432への接続を許可

7. DBのパラメータ設定(AWS側)

【サービスごと】

  1. 以下のモジュールのいずれかで定義追加可能

    • 対応モジュール
      • /template_modules/microservice-ecs
      • /template_modules/options/rds-aurora
    設定内容
    • cluster_parameter_group_custom_enable

      • true
    • cluster_parameter_group_params

      • 設定値
      hcl
      "rds.logical_replication" = {
        value        = "1"
        apply_method = "pending-reboot"
      }
      "max_slot_wal_keep_size" = {
        value        = "1000" # MB
        apply_method = "immediate"
      }
      "wal_sender_timeout" = {
        value        = "0" # 無制限
        apply_method = "immediate"
      }
      "max_wal_senders" = {
        value        = "20"
        apply_method = "pending-reboot"
      }
    • 設定例

      • variableでcluster_parameter_group_paramsを定義し、default指定
      hcl
      variable "cluster_parameter_group_params" {
        type = map(object({
          value        = string
          apply_method = string
        }))
        default = {
          "rds.logical_replication" = {
            value        = "1"
            apply_method = "pending-reboot"
          }
          "max_slot_wal_keep_size" = {
            value        = "1000" # MB
            apply_method = "immediate"
          }
          "wal_sender_timeout" = {
            value        = "0" # 無制限
            apply_method = "immediate"
          }
          "max_wal_senders" = {
            value        = "20"
            apply_method = "pending-reboot"
          }
        }
      }

8. DBの再起動を実施

【サービスごと】

パラメータグループの適用のため、DBを再起動する

9. DBに手動でDatastream用の設定追加

【サービスごと】重要: ストリーム開始直前に実施すること(レプリケーションスロット作成から1GB以内にストリーム開始が必要)

  1. AWS Secrets ManagerにDatastreamユーザーのパスワードを追加

    • 既存のDBシークレットにDATASTREAM_USER_PASSWORDキーを追加
  2. AWSマネジメントコンソールから踏み台EC2用のポリシー作成

    • ポリシー名: tmp-${project}-for-setup-datastream
    権限
    hcl
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Action": [
                    "secretsmanager:GetSecretValue"
                ],
                "Effect": "Allow",
                "Resource": "*"
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:List*",
                    "s3:GetObject"
                ],
                "Resource": "*"
            },
            {
                "Effect": "Allow",
                "Action": [
                    "rds:DescribeDBClusters",
                    "rds:DescribeDBInstances"
                ],
                "Resource": "*"
            }
        ]
    }
  3. 踏み台EC2のIAMロールに作成したtmp-${project}-for-setup-datastreamポリシーをアタッチ

    • 踏み台EC2の条件
      • 対象DBと同じプライベートサブネットに所属
      • 対象DBのセキュリティグループで踏み台からの接続許可ルールがある
  4. S3にスクリプト用バケット作成

    bash
    aws s3api create-bucket --bucket work-setup-db-datastream --profile ${AWS_PROFILE} --region ap-northeast-1 --create-bucket-configuration LocationConstraint="ap-northeast-1"
  5. バケットにスクリプトをアップロード

    bash
    aws s3 cp ./setup_datastream.sh "s3://work-setup-db-datastream/" --profile ${AWS_PROFILE} --region ap-northeast-1
  6. SSMで踏み台(fd-platform)にログイン

    • tmp-${project}-for-setup-datastreamポリシーをアタッチしたロールをもつ踏み台EC2のインスタンスIDについて、コンソールから確認する
    bash
    aws ssm start-session --target ${ec2のインスタンスID指定} --profile ${AWS_PROFILE} --region ap-northeast-1
  7. バケットから踏み台にスクリプトをコピー

    bash
    aws s3 cp s3://work-setup-db-datastream/setup_datastream.sh ./
    chmod 755 setup_datastream.sh
    スクリプトの内容(変数を書き換えること)
    bash
    #!/bin/bash
    
    TIMESTAMP=$(date +"%Y%m%d_%H%M%S")
    LOGFILE="setup_datastream_$TIMESTAMP.log"
    
    SECRET_ID="${Secrets Manager name}"
    DB_CLUSTER_NAME="${DB_CLUSTER_NAME}"
    DB_HOST=$(aws rds describe-db-clusters --db-cluster-identifier "$DB_CLUSTER_NAME"--query 'DBClusters[0].Endpoint' --output text)
    DB_PORT="5432"
    DB_NAME="${DB_NAME}"
    DB_USER=$(aws secretsmanager get-secret-value --secret-id "$SECRET_ID" --querySecretString --output text | jq -r '.DB_USERNAME')
    
    echo "Retrieving PostgreSQL admin password." | tee -a "$LOGFILE"
    PGPASSWORD=$(aws secretsmanager get-secret-value --secret-id "$SECRET_ID" --querySecretString --output text | jq -r '.DB_PASSWORD')
    
    if [ -z "$PGPASSWORD" ]; then
        echo "ERROR: Failed to retrieve PostgreSQL admin password."
        exit 1
    fi
    echo "Admin password retrieved successfully." | tee -a "$LOGFILE"
    
    echo "Retrieving datastream user password." | tee -a "$LOGFILE"
    DATASTREAM_USER_PASSWORD=$(aws secretsmanager get-secret-value --secret-id"$SECRET_ID" --query SecretString --output text | jq -r 'DATASTREAM_USER_PASSWORD')
    
    if [ -z "$DATASTREAM_USER_PASSWORD" ]; then
        echo "ERROR: Failed to retrieve datastream password."
        exit 1
    fi
    echo "Datastream password retrieved successfully." | tee -a "$LOGFILE"
    
    export DB_HOST DB_PORT DB_NAME DB_USER PGPASSWORD
    
    set -e
    
    execute_sql() {
        echo "----------------------------------------" | tee -a "$LOGFILE"
        echo "Executing: $1" | tee -a "$LOGFILE"
        echo "----------------------------------------" | tee -a "$LOGFILE"
    
        OUTPUT=$(psql -h "$DB_HOST" -p "$DB_PORT" -U "$DB_USER" -d "$DB_NAME" -v ON_ERROR_STOP=1 -c "$2" 2>&1 | tee -a "$LOGFILE")
    
        if [ $? -ne 0 ]; then
            echo "ERROR: SQL execution failed." | tee -a "$LOGFILE"
            echo "$OUTPUT" | tee -a "$LOGFILE"
            exit 1
        fi
    
        echo "Done" | tee -a "$LOGFILE"
        echo "" | tee -a "$LOGFILE"
    }
    
    execute_sql "CREATE PUBLICATION" "CREATE PUBLICATION datastream FOR ALL TABLES;"
    execute_sql "SELECT PUBLICATION" "
    SELECT pubname, pubowner::regrole, puballtables, pubinsert, pubupdate, pubdelete,pubtruncate
    FROM pg_publication;"
    execute_sql "CREATE PG_CREATE_LOGICAL_REPLICATION_SLOT" "SELECTPG_CREATE_LOGICAL_REPLICATION_SLOT('datastream','pgoutput');"
    execute_sql "SELECT PG_CREATE_LOGICAL_REPLICATION_SLOT" "
    SELECT slot_name, plugin, slot_type, active, confirmed_flush_lsn
    FROM pg_replication_slots;"
    execute_sql "CREATE USER" "CREATE USER datastream WITH PASSWORD'$DATASTREAM_USER_PASSWORD';"
    execute_sql "GRANT RDS_REPLICATION" "GRANT RDS_REPLICATION TO datastream;"
    execute_sql "GRANT SELECT ON ALL TABLES" "GRANT SELECT ON ALL TABLES IN SCHEMApublic TO datastream;"
    execute_sql "GRANT USAGE ON SCHEMA" "GRANT USAGE ON SCHEMA public TO datastream;"
    execute_sql "ALTER DEFAULT PRIVILEGES IN SCHEMA" "ALTER DEFAULT PRIVILEGES INSCHEMA public GRANT SELECT ON TABLES TO datastream;"
    execute_sql "SELECT GRANT" "
    SELECT grantee, privilege_type, table_schema, table_name
    FROM information_schema.role_table_grants
    WHERE grantee = 'datastream';"
    execute_sql "SELECT ROLE" "
    SELECT rolname, rolsuper, rolreplication, rolcanlogin, rolcreatedb
    FROM pg_roles;"
    
    echo "Completed setup_datastream." | tee -a "$LOGFILE"
  8. コマンドインストール

    bash
    yum install -y jq
    yum install -y postgresql
  9. スクリプトを実行

    bash
    ./setup_datastream.sh
  10. ログにエラーがないことを確認

    • ログファイル: setup_datastream_YYYYMMDD_HHMMSS.log
  11. S3のスクリプト削除

    bash
    aws s3 rm s3://work-setup-db-datastream --recursive --profile ${AWS_PROFILE} --region ap-northeast-1
  12. S3のバケット削除

    bash
    aws s3api delete-bucket --bucket work-setup-db-datastream --profile ${AWS_PROFILE} --region ap-northeast-1
  13. tmp-${project}-for-setup-datastreamポリシーをロールからデタッチ

  14. tmp-${project}-for-setup-datastreamポリシーを削除

10. Datastream設定

【サービスごと】重要: 前の手順(レプリケーションスロット作成)から1GB以上書き込まれる前にストリーム開始まで実施すること

  1. Connection Profile作成(転送元)

    • 【サービスごと】 追加するサービスごとに作成
    設定ルール

    Terraformリソース名: google_datastream_connection_profile.source_${service}

    • connection_profile_id
      • ${var.service}-source
    • display_name
      • ${var.service}-source
    • project
      • GCPプロジェクト名を指定
    • location
      • VPN接続先と同じリージョンを指定
    • postgresql_profile
      • hostname
        • google_compute_address.${service}.address
          • パススルーNLBの固定IPアドレス指定
      • port
        • 5432
      • database
        • 対象DBのデータベース名
      • username
        • datastream
          • Datastreamユーザー指定
      • password
        • data.google_secret_manager_secret_version.db_user_password_${service}.secret_data
          • Secret Managerの値をdata参照で取得
    • private_connectivity
      • private_connection
        • google_datastream_private_connection.main.id
          • プライベート接続構成のID指定(共有リソース)
    • depends_on
      • google_secret_manager_secret_version.db_user_password_${service}
      • google_compute_forwarding_rule.${service}
      • google_compute_router_peer.main
  2. Connection Profile作成(転送先: BigQuery)

    • 【サービスごと】 追加するサービスごとに作成
    設定ルール

    Terraformリソース名: google_datastream_connection_profile.destination_${service}

    • connection_profile_id
      • ${var.service}-destination
    • display_name
      • ${var.service}-destination
    • project
      • GCPプロジェクト名を指定
    • location
      • VPN接続先と同じリージョンを指定
    • bigquery_profile
      • {}
    • depends_on
      • google_bigquery_dataset.${service}_public
  3. BigQuery Dataset作成

    • 【サービスごと】 追加するサービスごとに作成
    設定ルール

    Terraformリソース名: google_bigquery_dataset.${service}_public

    • dataset_id
      • サービス名に応じた適切なDataset ID
    • location
      • VPN接続先と同じリージョンを指定
    • project
      • GCPプロジェクト名を指定
    • access
      • このブロック内でroleなどの権限を指定する
  4. Datastream Stream作成

    • 【サービスごと】 追加するサービスごとに作成
    設定ルール

    Terraformリソース名: google_datastream_stream.${service}

    • stream_id
      • ${var.service}
    • display_name
      • ${var.service}
    • project
      • GCPプロジェクト名を指定
    • location
      • VPN接続先と同じリージョンを指定
    • desired_state
      • "NOT_STARTED"
        • 作成時に意図せず開始しないように
    • source_config
      • source_connection_profile
        • google_datastream_connection_profile.source_${service}.id
          • 接続元プロファイルのID指定
      • postgresql_source_config
        • max_concurrent_backfill_tasks
          • 初回バックフィルのタスク数
          • タスク数を増やせばバックフィル時間を短縮できるが対象DBに負荷がかかる
          • 対象DBのCPU使用率やメモリなどのリソース状況を鑑みて決めること
          • 参考
        • replication_slot
          • datastream
            • レプリケーションスロット名を指定
        • publication
          • datastream
            • Publication名を指定
        • include_objects
    • destination_config
      • destination_connection_profile
        • google_datastream_connection_profile.destination_${service}.id
          • 接続先プロファイルのIDを指定
      • bigquery_destination_config
        • data_freshness
          • 300s
            • 送信元と送信先の間の時間差であるが、要件によって変更すること
            • 狭めればその分コストとDB負荷がかかる
        • source_hierarchy_datasets
          • dataset_template
            • location
              • VPN接続先と同じリージョンを指定
            • dataset_id_prefix
              • サービス固有のプレフィックス
    • backfill_all
      • {}
        • バックフィル対象を自動で全て含む

11. ストリーム開始とモニタリング

【サービスごと】

  1. GCPのコンソールからストリームを開始する

    • GCP Console > Datastream > Streams
    • 該当のStreamを選択
    • STARTをクリック
  2. DBの負荷状況やDatastreamの遅延状況をモニタリングする


Datastream監視設定

対象: サービス追加時に実行

12. GCP監視設定

【サービスごと】

GCPのDatastreamを設定したモジュール(gcp/services/fd-datastream/${env})内で以下を追加

  • production: gcp/services/fd-datastream/production
  • staging: gcp/services/fd-datastream/staging
  1. Datastreamステータス監視

    • 【サービスごと】 追加するサービスごとに作成
    • GCPアラートポリシー(ログ監視)で実装
    設定内容

    Terraformリソース: google_logging_metric.datastream_status_error_${service}

    • project
      • GCPプロジェクト名を指定
    • name
      • datastream-status-error-${var.service}
    • filter
      severity="ERROR" AND jsonPayload.eventCode="STREAM_STATE_CHANGED" AND resource.labels.stream_id="${var.service}"
    • metric_descriptor
      • metric_kind: "DELTA"
      • value_type: "INT64"
      • unit: "1"
      • display_name: datastream-status-error-${var.service}

    Terraformリソース: google_monitoring_alert_policy.datastream_status_error_${service}

    • project
      • GCPプロジェクト名を指定
    • combiner
      • "OR"
    • display_name
      • datastream-status-error-${var.service}
    • notification_channels
      • 通知先のSlackチャネルを指定
      • サービスごとに異なる通知先を設定可能
      • 指定方法参考
    • severity
      • "CRITICAL"
    • alert_strategy
      • notification_prompts: ["OPENED", "CLOSED"]
    • conditions
      • display_name: datastream-status-error-${var.service}
      • condition_threshold
        • filter
          "resource.type=\"datastream.googleapis.com/Stream\" AND metric.type=\"logging.googleapis.com/user/${google_logging_metric.datastream_status_error_${service}.name}\""
        • threshold_value: 0
          • 閾値0より大きい場合
        • comparison: "COMPARISON_GT"
        • duration: "60s"
          • 再テストウィンドウ
        • aggregations
          • alignment_period: "300s"
            • ローリングウィンドウ
          • per_series_aligner: "ALIGN_SUM"
          • cross_series_reducer: "REDUCE_SUM"
        • trigger
          • count: 1
            • 1つ以上の上記の条件を満たせばアラートをトリガー
          • percent: 0
    • documentation
      • content
        "Datastream (${var.service})のステータスがエラーとなり同期できていない可能性があります。"
      • subject
        "datastream-status-error (${var.service})"
  2. サポートされていないイベント数の監視

    • 【サービスごと】 追加するサービスごとに作成
    • GCPアラートポリシー(ログ監視)で実装
    設定内容

    Terraformリソース: google_logging_metric.datastream_unsupported_events_${service}

    • project
      • GCPプロジェクト名を指定
    • name
      • datastream-unsupported-events-${var.service}
    • filter
      severity="WARNING" AND jsonPayload.event_code="UNSUPPORTED_EVENTS_DISCARDED" AND resource.labels.stream_id="${var.service}"
    • metric_descriptor
      • metric_kind: "DELTA"
      • value_type: "INT64"
      • unit: "1"
      • display_name: datastream-unsupported-events

    Terraformリソース: google_monitoring_alert_policy.datastream_unsupported_events_${service}

    • project
      • GCPプロジェクト名を指定
    • combiner
      • "OR"
    • display_name
      • datastream-unsupported-events-${var.service}
    • notification_channels
      • 通知先のSlackチャネルを指定
      • サービスごとに異なる通知先を設定可能
    • severity
      • "CRITICAL"
    • alert_strategy
      • notification_prompts: ["OPENED", "CLOSED"]
    • conditions
      • display_name: datastream-unsupported-events
      • condition_threshold
        • filter
          "resource.type=\"datastream.googleapis.com/Stream\" AND metric.type=\"logging.googleapis.com/user/${google_logging_metric.datastream_unsupported_events_${service}.name}\""
        • threshold_value: 0
        • comparison: "COMPARISON_GT"
        • duration: "60s"
        • aggregations
          • alignment_period: "300s"
          • per_series_aligner: "ALIGN_SUM"
          • cross_series_reducer: "REDUCE_SUM"
        • trigger
          • count: 1
          • percent: 0
    • documentation
      • content
        "Datastream (${var.service})のサポートされていないイベントを検知しました。BigQueryに転送できていないイベントがあります。"
      • subject
        "datastream-unsupported-events (${var.service})"

13. Datadog監視設定

【サービスごと】

/terraform_for_aws/datadog/services/${service}/${env}/monitor配下で以下を追加

  1. レイテンシ監視

    • 【サービスごと】 追加するサービスごとに作成
    • ファイル名: datastream-latency.tftpl
    • 参考実装
    設定内容
    • query: max(last_5m):max:gcp.datastream.stream.total_latencies.avg{stream_id:${service}} >= ${threshold}
    • 閾値
      • タイムウィンドウ: 5m
      • critical: 600(10分以上)
      • critical_recovery: リカバリ閾値
    • tags: ["service:${service}"]
  2. データ鮮度の監視

    • 【サービスごと】 追加するサービスごとに作成
    • ファイル名: datastream-freshness.tftpl
    • 参考実装
    設定内容
    • query: max(last_5m):max:gcp.datastream.stream.freshness{stream_id:${service}} >= ${threshold}
    • 閾値
      • タイムウィンドウ: 5m
      • critical: 600(10分以上)
      • critical_recovery: リカバリ閾値
    • tags: ["service:${service}"]

Tips

事象

  • terraformでdatastream作成時に以下のエラーが発生した場合
jsx

Error: Error waiting to create Stream: Error waiting for Creating Stream: {"@type":"type.googleapis.com/google.rpc.ErrorInfo","domain":"datastream.googleapis.com","metadata":{"message":"Some validations failed to complete successfully, see the full list in the operation metadata.","originalMessage":"","time":"2025-03-25T08:39:35Z","uuid":"ae744438-0954-11f0-a769-beb01129c829"},"reason":"VALIDATION_FAILURE"}
│ {"code":"POSTGRES_VALIDATE_REPLICATION_SLOT","description":"Validates that the replication slot exists and not lost.","message":[{"code":"POSTGRES_REPLICATION_SLOT_LOST","level":"ERROR","message":"The stream has failed because the replication slot {slot_name} status is 'lost' (wal_status). Make sure that the slot is active or recreate the slot, and recover the stream.","metadata":{"original_error":"None","slot_name":"datastream"}}],"state":"FAILED"}



with google_datastream_stream.main,
│   on datastream.tf line 57, in resource "google_datastream_stream" "main":
57: resource "google_datastream_stream" "main" {

原因

  • lostは、必要なWALファイルの一部が削除されており、このスロットはもはや利用可能ではないことを意味
  • レプリケーションスロットを作成してから時間が経ってしまい、max_slot_wal_keep_size(1GB)を超えたためwalが一部削除されて追えなくなったため

対処法

レプリケーションスロットを再作成する。

sql
# 存在確認
SELECT * FROM pg_replication_slots;

# 削除
SELECT pg_drop_replication_slot('datastream');

# 作成
SELECT PG_CREATE_LOGICAL_REPLICATION_SLOT('datastream', 'pgoutput');

# 存在確認
SELECT * FROM pg_replication_slots;

関連ドキュメント