Skip to content
This repository has been archived by the owner on Nov 29, 2023. It is now read-only.

made a complete and working example of working with cross regional dataset transfer configs #82

Closed
sloev opened this issue Dec 10, 2020 · 2 comments
Assignees
Labels
api: bigquerydatatransfer Issues related to the googleapis/python-bigquery-datatransfer API. samples Issues that are directly related to samples. type: docs Improvement to the documentation for an API.

Comments

@sloev
Copy link

sloev commented Dec 10, 2020

Is your feature request related to a problem? Please describe.
The documentation is out of date of not helpful in answering questions and getting me to a functional poc.

Describe the solution you'd like
More examples that are up to date.

Describe alternatives you've considered
looked on the internet, looked in this codebase, looked in googles documentation, looked in github issues mentioning issues i ran across.

Additional context

I have created a full example for google-cloud-bigquery-datatransfer==2.1.0
of creating and listing cross-regional transfer configs/jobs

you can find the example here:
https://gist.github.com/sloev/e8e717d7a725b6f03909fd7fd8d4faba

and ive pasted it here as well:

full gist contents for example:
"""
How to create cross-region scheduled bigquery dataset copy jobs in GCP using python
______________________________________________________________
* use python 3.7+

* install requirements

google-api-python-client==1.12.8
google-auth==1.23.0
google-auth-oauthlib==0.4.2
google-auth-httplib2==0.0.4
google-cloud-bigquery==2.6.0
google.cloud==0.34.0
protobuf==4.0.0rc2

* create source project and destination project

* create service account in source project

* enable google data trasnfer API on source and destination project
https://console.cloud.google.com/apis/api/bigquerydatatransfer.googleapis.com/overview

* add roles to your service account via IAM of destination project
add these roles to your service account:
BigQuery Admin
BigQuery Data Transfer Service Agent

* add roles to your service account via IAM of source project
add these roles to your service account:
BigQuery Admin
BigQuery Data Transfer Service Agent
Service Account Token Creator

* make service account able to auth with google data trasnfer api
$ gcloud iam service-accounts add-iam-policy-binding {service account email} \
   --member='serviceAccount:service-{user_project_number}@'\
   'gcp-sa-bigquerydatatransfer.iam.gserviceaccount.com' \
   --role='roles/iam.serviceAccountTokenCreator'

reading material:
* https://cloud.google.com/bigquery/docs/copying-datasets
* https://cloud.google.com/bigquery/docs/enable-transfer-service

"""

import logging
import os
import time
from typing import List
import time

import google.protobuf.json_format
from google.protobuf.timestamp_pb2 import Timestamp
from google.api_core.exceptions import ResourceExhausted
from google.cloud import bigquery_datatransfer_v1

bigquery_data_transfer_client = bigquery_datatransfer_v1.DataTransferServiceClient()

SOURCE_PROJECT = "source"
DESTINATION_PROJECT = "destination"
DESTINATION_PROJECT_LOCATION = "europe-west3"
DATASET_NAME = "dataset"
NOTIFICATION_TOPIC = os.environ.get("NOTIFICATION_TOPIC")
SUPPORTED_LOCATIONS = [
    "us-west4",
    "us-west2",
    "northamerica-northeast1",
    "us-east4",
    "us-west1",
    "us-west3",
    "southamerica-east1",
    "us-east1",
    "europe-west1",
    "europe-north1",
    "europe-west3",
    "europe-west2",
    "europe-west4",
    "europe-west6",
    "asia-east2",
    "asia-southeast2",
    "asia-south1",
    "asia-northeast2",
    "asia-northeast3",
    "asia-southeast1",
    "australia-southeast1",
    "asia-east1",
    "asia-northeast1",
]


def paginate_transfer_configs():
    logging.warning(
        f"going through all {len(SUPPORTED_LOCATIONS)} supported locations, might take a few moments"
    )
    for location in SUPPORTED_LOCATIONS:
        request = bigquery_datatransfer_v1.types.ListTransferConfigsRequest(
            {
                "parent": f"projects/{DESTINATION_PROJECT}/locations/{location}",
            }
        )
        try:
            transfer_configs = bigquery_data_transfer_client.list_transfer_configs(
                request
            )
        except ResourceExhausted:
            logging.warning(
                "got throttled during fetch of transfer config, waiting 5 second and retrying"
            )
            time.sleep(5)
            transfer_configs = bigquery_data_transfer_client.list_transfer_configs(
                request
            )
        for transfer_config in transfer_configs:
            yield transfer_config


def get_transfer_config():
    request = bigquery_datatransfer_v1.types.ListTransferConfigsRequest(
        {
            "parent": f"projects/{DESTINATION_PROJECT}/locations/{DESTINATION_PROJECT_LOCATION}",
        }
    )
    try:
        transfer_configs = bigquery_data_transfer_client.list_transfer_configs(request)
    except ResourceExhausted:
        logging.warning(
            "got throttled during fetch of transfer config, waiting 1 second and retrying"
        )
        time.sleep(1)
        transfer_configs = bigquery_data_transfer_client.list_transfer_configs(request)

    for transfer_config in transfer_configs:
        if transfer_config.destination_dataset_id == DATASET_NAME:
            return transfer_config


def create_transfer_config():

    transfer_config = {
        "destination_dataset_id": DATASET_NAME,
        "display_name": DATASET_NAME,
        "data_source_id": "cross_region_copy",
        "params": {
            "source_dataset_id": DATASET_NAME,
            "source_project_id": SOURCE_PROJECT,
            "overwrite_destination_table": True,
        },
        "schedule": "every day 04:00",
    }

    if NOTIFICATION_TOPIC:
        transfer_config["notification_pubsub_topic"] = NOTIFICATION_TOPIC

    transfer_config = google.protobuf.json_format.ParseDict(
        transfer_config,
        bigquery_datatransfer_v1.types.TransferConfig()._pb,
    )
    transfer_config = bigquery_data_transfer_client.create_transfer_config(
        parent=f"projects/{DESTINATION_PROJECT}", transfer_config=transfer_config
    )
    logging.info(f"created transfer_config with name: {transfer_config.name}")
    return transfer_config


def run_transfer_config(transfer_config):
    start_time = Timestamp(seconds=int(time.time()))

    request = bigquery_datatransfer_v1.types.StartManualTransferRunsRequest(
        {"parent": transfer_config.name, "requested_run_time": start_time}
    )

    response = bigquery_data_transfer_client.start_manual_transfer_runs(
        request, timeout=360
    )
    logging.info(f"transfer config with name {transfer_config.name} is now running")

    return response.runs[0].name


def delete_transfer_config(transfer_config):
    request = bigquery_datatransfer_v1.types.DeleteTransferConfigRequest(
        {"name": transfer_config.name}
    )
    bigquery_data_transfer_client.delete_transfer_config(request)
    logging.info(f"deleted transfer_config with name: {transfer_config.name}")


if __name__ == "__main__":
    create_transfer_config()
    for transfer_config in paginate_transfer_configs():
        print(transfer_config)
    transfer_config = get_transfer_config()
    run_transfer_config(transfer_config)
    delete_transfer_config(transfer_config)
    for transfer_config in paginate_transfer_configs():
        print(transfer_config)
@product-auto-label product-auto-label bot added the api: bigquerydatatransfer Issues related to the googleapis/python-bigquery-datatransfer API. label Dec 10, 2020
@meredithslota meredithslota added samples Issues that are directly related to samples. type: docs Improvement to the documentation for an API. labels Dec 10, 2020
@sloev
Copy link
Author

sloev commented Dec 11, 2020

potentially solved by this pr: #83

@tswast
Copy link
Contributor

tswast commented Dec 14, 2020

I think this is fixed by #76

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
api: bigquerydatatransfer Issues related to the googleapis/python-bigquery-datatransfer API. samples Issues that are directly related to samples. type: docs Improvement to the documentation for an API.
Projects
None yet
Development

No branches or pull requests

3 participants