VISASQ Dev Blog

ビザスク開発ブログ

Cloud Run Jobsで実現するSalesforceデータの日次バックアップ内製化

はじめに

こんにちは! インフラチームの酒井とSalesforceチームの伊藤です。
今回はインフラチームとSalesforceチーム合同で開発した「Salesforceデータの日次バックアップ機能」についてご紹介します。

なぜ日次バックアップの仕組みを内製したのか?

Salesforce標準のデータエクスポート機能は週次または月次の頻度に限られておりますが、社内ではBCP(事業継続計画)対策としてより鮮度の高いSalesforceデータをバックアップすることが求められていました。

外部ツールの導入という選択肢もありましたが、ビザスクには自社内にインフラとSalesforceの専門チームが存在しており、「双方の知見を組み合わせることで、コストを抑えつつ要件を満たすことができるのではないか?」というアイデアから、日次バックアップ機能の内製化という試みに至りました。

本記事では、前半はインフラ側のアーキテクチャ、後半はSalesforce側のバックアップスクリプト実装について順に記載します。

インフラ側のアーキテクチャについて

まずは、今回のバックアップ基盤の全体像から解説します。

インフラ構成図

構成は非常にシンプルです。Cloud Scheduler をトリガーに Cloud Run Jobs を起動し、Salesforce から取得したデータを Cloud Storage (GCS) へ格納するフローを採用しています。

また、セキュリティ面では最小権限の法則に準拠した設計を行っています。バックアップに関連するリソースへのアクセスは、実務を担うSalesforceチームとインフラチームのみに制限し、他部署や不要なユーザーが触れないよう厳格に管理しています。

なぜ Cloud Run Jobs を選んだのか?

バックアップスクリプトの実行基盤として、Compute Engine (VM) や Cloud Functions も検討しましたが、以下の3つの観点から Cloud Run Jobs が最適だと判断しました。

  • 運用管理のしやすさ
    ソースコードはSalesforceチーム、インフラはインフラチームと、それぞれ別のチームで管理することになったため、コンテナ化をすることで、Terraformを含めたデプロイの運用や利便性が向上しました。

  • コスト効率の最大化
    常時起動の VM と比較し、実行時間分のみ課金されるサーバーレスな構成にすることでコストを最小限に抑えています。Cloud Functions と比べてもCloud Run Jobsの方が安かったです。

  • メンテナンス負荷の軽減
    Cloud Functions はランタイム(Python 等)のバージョンサポート期限を意識する必要があります。Cloud Run Jobs であれば、コンテナベースで柔軟にバージョンを管理できるため、インフラ都合の強制アップデートに振り回されるリスクを軽減できます。

また、「サーバーレスだとメモリ不足が心配」という懸念もありましたが、後述する通り 「ストリーミングアップロード」 を実装することで、最小限のスペック(1CPU / 2GiB)でも大容量データの処理を実現しています。

(Cloud Run JobsのTerraform)

resource "google_cloud_run_v2_job" "sf_backup" {
  name                = "sf-backup"
  location            = var.gcp_region

  template {
    template {
      service_account = google_service_account.sf_backup.email
      max_retries     = 1
      timeout         = "21600s" # 6 hours

      containers {
        image = "${var.gcp_region}-docker.pkg.dev/${var.gcp_project}/sf-backup/sf-backup:latest"

        resources {
          limits = {
            cpu    = "1"
            memory = "2Gi"
          }
        }

        env {
          name  = "GCS_BUCKET_NAME"
          value = google_storage_bucket.sf_backup.name
        }

        env {
          name = "SFDC_CREDENTIALS"
          value_source {
            secret_key_ref {
              secret  = data.google_secret_manager_secret_version.sf_client_credenatials.secret
              version = "latest"
            }
          }
        }
      }
    }
  }
}

Salesforce側のバックアップスクリプトについて

ここからは、Cloud Run Jobsで動かすバックアップスクリプトの内容について記載します。
スクリプト本体はPythonで実装しています。

処理の流れ

  1. クライアントログイン情報フローによるOAuth認証でSalesforceと接続する
  2. Salesforce Bulk API 2.0を使用して、JSONファイルから読み込んだオブジェクトのデータを全件取得するクエリジョブを作成
  3. 取得したデータをCSV形式でストリーミング処理し、ZIPファイルに圧縮しながら直接Google Cloud Storageにアップロード

Salesforceとの認証設定手順

Salesforce側でクライアントログイン情報フローを利用する設定を行います。
(接続アプリケーションを使用した設定手順についてはこちらの記事 の「#設定手順」にも記載がありますので、必要に応じてご参照ください)

⚠️ 注意事項
SalesforceのSpring'26 以降は接続アプリケーションの新規作成が制限されるため、新たに接続設定を行う場合は 外部クライアントアプリケーション を使用されることをお勧めします。

Salesforce側のクライアントログイン情報フローの設定が完了したら、以下の対応をGCP側で行います。

  • GCPのSecret Managerに、接続アプリケーション(または外部クライアントアプリケーション)から取得したコンシューマー鍵(client_id)、コンシューマーの秘密(client_secret)の値と、接続先のBaseURL(https://{[私のドメイン] の URL})の値をそれぞれ登録する。

これで認証設定の準備は完了です!
続いてスクリプト側の処理について記載します。

各処理の詳細

※エラーハンドリングやロギング処理なども含め、ソースコードを一部省略しています

1. クライアントログイン情報フローによるOAuth認証

GCPのSecret Managerに登録した認証情報を使用したクライアントログイン情報フローによるOAuth認証でSalesforceに接続し、アクセストークンを取得します。
取得したアクセストークンを使用し、後続処理でのAPIリクエストに使用するリクエストヘッダを生成します。

class SFAPIClient:
    def __init__(self):
        # 環境変数(GCP側で設定)から認証情報を取得
        credentials = json.loads(os.environ.get("SFDC_CREDENTIALS"))
        
        client_id = credentials["client_id"]
        client_secret = credentials["client_secret"]
        base_url = credentials["base_url"]
      
        self.base_url = base_url
        self.api_base_url = f"{self.base_url}/services/data/{SFDC_API_VERSION}"
      
        # OAuth認証を実行してヘッダー情報を取得する
        self.headers = self.get_headers(client_id, client_secret)

    # ...(中略)

    @retry(...) # リトライ設定(省略)
    def call_get_token_api(self, client_id: str, client_secret: str):
        """クライアントログイン情報フローでアクセストークンを取得"""
        return requests.post(
            f"{self.base_url}/services/oauth2/token",
            data={
                "grant_type": "client_credentials",  # クライアントログイン情報フロー指定
                "client_id": client_id,
                "client_secret": client_secret,
            },
            timeout=TIMEOUT_SECONDS,
        )

2. JSONファイルからオブジェクトを読み込み、Bulk API v2.0でクエリジョブを作成

JSONファイルに定義されたバックアップ対象のオブジェクト名を読み込み、各オブジェクトに存在する項目情報を取得して動的にSOQLクエリを生成します。
生成したSOQLクエリを使用して、Bulk API 2.0のクエリジョブを作成してジョブIDを取得します。

def main():
    # ...(認証処理)
  
    # JSONファイルからバックアップ対象オブジェクトのリストを読み込み
    with open(SFDC_OBJECTS, "r", encoding="utf-8") as f:
        object_list = json.load(f)
        target_objects = object_list["object_names"]  # ["Account", "Contact", ...]
    
    job_dict = {}  # ジョブIDとオブジェクト名のマッピング
    
    for obj_name in target_objects:
        # 各オブジェクトの項目情報を動的に取得
        field_list = sf_api.get_fields(obj_name)
        
        # SOQLクエリを生成(例: "SELECT Id, Name, ... FROM Account")
        soql_query = create_bulk_query(obj_name, field_list)
        
        # Bulk API 2.0でクエリジョブを作成
        job_id = sf_api.create_job(soql_query)
        job_dict[job_id] = obj_name


class SFAPIClient:
    # ...(中略)

    @retry(...) # リトライ設定(省略)
    def call_create_bulk_query_job_api(self, query: str):
        """Bulk API 2.0のクエリジョブ作成API呼び出し"""
        return requests.post(
            f"{self.api_base_url}/jobs/query",  # Bulk API 2.0エンドポイント
            headers=self.headers,
            data=json.dumps({
                "operation": "queryAll",  # アーカイブ済みTodoなど含む全件取得
                "query": query,           # SOQLクエリ
            }),
            timeout=TIMEOUT_SECONDS,
        )

3. CSVストリーミング → ZIP圧縮 → GCSアップロード

バックアップファイルの保存先となるGCSバケットを取得し、直接ストリーミング書き込みを行います。
ストリーム上で作成したZipファイルに、クエリジョブの結果として取得された各Salesforceオブジェクトのデータをチャンクごとに分割しながらCSVデータとして書き込んでいきます。

対象オブジェクトのデータ数が膨大な場合、レスポンスのSforce-Locatorヘッダーに次の結果セットが存在することを示すロケーター値が設定されてくるので、この値の存在チェックも行います。

def main():
    # ...(クエリジョブ作成処理)

    # 保存先のGCSバケットを取得
    bucket = storage.Client().bucket(os.environ.get("GCS_BUCKET_NAME"))
    zip_filename = f'{format(datetime.now(ZoneInfo("Asia/Tokyo")), "%Y%m%d")}_sfbackup.zip'
    blob = storage.Blob(zip_filename, bucket)
    
    # GCSへのストリーミング書き込み開始
    with blob.open("wb", ignore_flush=True) as gcs_stream:
        # ZIPファイルをストリーム上で直接作成
        with zipfile.ZipFile(gcs_stream, "w", compression=zipfile.ZIP_DEFLATED) as zip_archive:
            
            for job_id, obj_name in job_dict.items():
                # ジョブ完了まで待機
                sf_api.wait_for_job_completion(job_id)
                
                # CSVデータをストリーミング取得してZIPに追加
                backup_storage.generate_backup_file(obj_name, job_id, zip_archive)

class BackupStorage:
    def generate_backup_file(self, object_name: str, job_id: str, zip_archive):
        """Bulk API結果をストリーミング取得してZIPに書き込み"""
        chunk_num = 0
        locator = ""  # 次の結果セットの存在チェック用ロケーター
        
        # Sforce-Locatorの値を使った次の結果セット存在チェック
        while True:
            if locator is None:  # 全結果取得完了
                break
            
            chunk_num += 1
            locator_param = f"locator={locator}" if locator else ""
            
            # ジョブ結果をストリーミングで取得
            response = self.sf_api.call_get_job_result_api(
                job_id, locator_param, stream=True  # stream=Trueで大容量データ対応
            )
            
            # 次の結果セットのロケーター取得
            locator = response.headers.get("Sforce-Locator", "")
            locator = None if locator == "null" else locator
            
            csv_filename = f"{object_name}_data_{chunk_num:03d}.csv"
            
            # ZIPアーカイブ内にCSVファイルを作成
            with zip_archive.open(csv_filename, "w") as csv_file:
                # チャンクごとにストリーミング書き込み
                for data_chunk in response.iter_content(chunk_size=CHUNK_SIZE_BYTES):
                    if data_chunk:
                        csv_file.write(data_chunk)  # メモリ上に全データを保持せずZIPに直接書き込み
            
            response.close()

これらの処理により、JSONファイルで指定したSalesforceオブジェクトのデータをGCS上にバックアップファイルとして保存することが可能となりました。

おわりに

以上、Salesforceデータの日次バックアップ内製化の取り組みをご紹介しました。
インフラとSalesforce、それぞれの知見を持ち寄ることで、要件に最適化した柔軟な構成をとることができました。

今後もエンジニア同士が協力し合い、ビジネスの成長を支える堅牢な基盤づくりに注力していきます!

ビザスクではエンジニアの仲間を募集しています! 少しでもビザスク開発組織にご興味を持たれた方は、ぜひ一度カジュアルにお話ししましょう! recruit.visasq.co.jp