Skip to content

Latest commit

 

History

History

cloud-datafusion-functions-plugins

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 
 
 

Reusable Plugins for Cloud Data Fusion (CDF) / CDAP

Overview

The CDF/CDAP plugins detailed below can be reused in the context of data pipelines.

Let's say you run your incremental pipeline once every 5 minutes. When running an incremental pipeline, you have to filter the records by a specific field (e.g., lastUpdateDateTime of records > latest watermark value - buffer time) so it will sync the records that were updated since your last incremental sync. Subsequently, a merge and dedupe step is done to make sure only new/updated are synced into the destination table.

CheckPointReadAction, CheckPointUpdateAction

Plugin Description
Creates, reads, and updates checkpoints in incremental pull pipelines.

CheckPointReadAction - reads checkpoints in Firestore DB and provides the data during runtime as environment variable

CheckPointUpdateAction - updates checkpoints in Firestore DB (i.e., creates a new document and stores maximum update date / time from BQ so the next run it can use this checkpoint value to filter records that were added since then)

For now these plugins only support timestamp values - in the future, integer values can potentially be added.

Dependencies

Setting up Firestore

  1. Setup Firestore DB

    1. Firestore is used to store / read checkpoints, which is used in your incremental pipelines
  2. Create a collection with a document from the parent path /

    1. Collection ID: PIPELINE_CHECKPOINTS
    2. Document ID: INCREMENTAL_DEMO

image

  1. Create a collection under Parent path /PIPELINE_CHECKPOINTS/INCREMENTAL_DEMO
    1. Collection ID: CHECKPOINT
    2. Document ID: just accept what was provided initially
      1. Field #1

        1. Note:

          1. Set to maximum timestamp from destination (BQ table)
          2. Set to minimum timestamp if running for the first time from source (e.g., SQL server table)
        2. Field name: CREATED_TIMESTAMP

        3. Field type: string

        4. Date and time: 2020-05-08 17:21:01

      2. Field #2

        1. Note: enter the current time in timestamp format
        2. Field name: CREATED_TIMESTAMP
        3. Field type: timestamp
        4. Date and time: 25/08/2020, 15:49

image

Set Runtime Arguments

Before running the pipeline, add the lastWatermarkValue variable as runtime argument (on Pipeline Studio view, click on drop-down arrow for Run button) and set the value = 0 :

image

CheckpointReadAction will populate lastWatermarkValue with the CHECKPOINT_VALUE from Firestore. lastWatermarkValue runtime argument will be used as parameter of the import query of the Database Source in a subsequent step:

SELECT * FROM test WHERE last_update_datetime > '${latestWatermarkValue}'

BigQuery - actual destination table name (this is where max checkpoint is taken from - i.e., max timestamp)

Use Case
This plugin can be used at the beginning of an incremental CDAP data pipeline to read the checkpoint value from the last sync.

Let's say you run your pipeline once every 5 minutes. When running an incremental pipeline, you have to filter the records by a specific field (timestamp - current date > current date -3) - it is doing merge and dedupe even though we are processing the same records to make sure duplicate records are not in the destination table.

CheckPointReadAction - reads checkpoints in Firestore DB and provides the data during runtime as environment variable

CheckPointUpdateAction - updates checkpoints in Firestore DB (i.e., creates a new document and stores maximum update date / time from BQ so the next run it can use this checkpoint value to filter records that were added since then)

For now these plugins only support timestamp values - in the future, integer values can potentially be added.

CheckpointReadAction plugin requires the following config properties:

  • Label : plugin label name.
  • Specify the collection name in firestore DB: Name of the Collection.
  • Specify the document name to read the checkpoint details: Provide the document name specified in the Collection.
  • Buffer time to add to checkpoint value. (Note: in Minutes): Number of minutes that need to be subtracted from the Firestore collection value.
  • Project: project ID.
  • Key path: Service account key file path to communicate with the Firestore DB.

Please see the following screenshot for example.

image

CheckpointUpdateAction plugin requires the following configuration:

  • Label : plugin label name.
  • Specify the collection name in firestore DB: Name of the Collection.
  • Specify the document name to read the checkpoint details: Provide the document name specified in the Collection.
  • Dataset name where incremental pull table exists: Big Query Dataset name.
  • Table name that needs incremental pull: Big Query table name.
  • Specify the checkpoint column from incremental pull table:
  • Project: project ID.
  • Key path: Service account key file path to communicate with the Firestore DB.

Please see the below screenshot for example:

image

CopyTableAction

Plugin description
Copies the BigQuery table from staging to destination at the end of the pipeline run. A new table is created if it doesn't exist. Otherwise, if the table exists, the plugin replaces the existing BigQuery destination table with data from staging.

Use case
This is applicable in the CDAP data pipelines which do the full import/scan the data from source system to BigQuery.

Dependencies
Destination dataset : bq_dataset
Destination table : bq_table
Source dataset : bq_dataset_batch_staging Source table : bq_table

CopyTableAction plugin requires the following configuration:

  • Label: plugin label name.
  • Key path: Service account key file path to call the Big Query API.
  • Project ID: GCP project ID.
  • Dataset: Big Query dataset name.
  • Table Name: Big Query table name.

Please see the following screenshot for example:

image

DropTableAction

Plugin Description
Drops a BigQuery table in the beginning of the pipeline runs.

Use Case
Useful to drop staging tables.

Dependencies
Requires BQ table to drop to exist.

Drop table action plugin requires the following configuration:

  • Label : plugin label name.
  • Key path: Service account key file path to call the Big Query API.
  • Project ID: GCP project ID.
  • Dataset: Big Query dataset name.
  • Table Name: Big Query table name.

Please see the following screenshot for example configuration:

image

TruncateTableAction

Plugin Description
Truncates a BigQuery table when we set pipelines to restore the data from source.

Use Case
Applicable in restoring data pipelines from source.

TruncateTable action plugin requires the following configuration:

  • Label : plugin label name.
  • Key path: Service account key file path to call the Big Query API.
  • Project ID: GCP project ID.
  • Dataset: Big Query dataset name.
  • Table Name: Big Query table name.

Please see the following screenshot for example configuration:

image

Putting it all together into a Pipeline

CheckPointReadActionTruncateTableAction → Database → BigQuery → MergeLastUpdateTSActionCheckPointUpdateAction

What does the pipeline do?

  1. CheckPointReadAction - reads latest checkpoint from Firestore
  2. TruncateTableAction - truncate the records in the log table
  3. Database Source- imports data from the source
  4. BigQuery Sink - exports data into BigQuery from previous step (database source)
  5. MergeLastUpdateTSAction - merge based on timestamp and the update column list (columns to keep in the merge).
  6. CheckPointUpdateAction - update checkpoint in Firestore from the max record lastUpdateTimestamp in BigQuery

Successful run of Incremental Pipeline

image

Runtime arguments (set latestWatermarkValue to 0)

image

CheckPointReadAction

Label:
CheckPointReadAction

Specify the document name to read the checkpoint details*:
INCREMENTAL_DEMO

Buffer time to add to checkpoint value. (Note: in Minutes):
1

project:
pso-cdf-plugins-287518

serviceFilePath:
auto-detect

Screenshot:

image

TruncateTableAction

Label: TruncateTableAction

Key path:*
auto-detect

ProjectId :*
pso-cdf-plugins-287518

Dataset * bq_dataset

Table name*
bq_table_LOG

image

Database source

Label * Database

Reference Name*
test

Plugin Name*
sqlserver42

Plugin Type
jdbc

Connection String
jdbc:sqlserver://:;databaseName=main;user=;password=;

Import Query:

SELECT * FROM test WHERE last_update_datetime > '${latestWatermarkValue}'

image

BigQuery sink

Label * BigQuery

Reference Name*
bq_table_sink

Project ID
pso-cdf-plugins-287518

Dataset*
bq_dataset

Table* (write to a temporary table, e.g., bq_table_LOG) bq_table_LOG

Service Account File Path
auto-detect

Schema

image

MergeLastUpdateTSAction

Label*
MergeLastUpdateTSAction

Key path*
auto-detect

Project ID*
pso-cdf-plugins-287518

Dataset name
bq_dataset

Table name*
bq_table

Primary key list*
id

Update columns list*
id,name,last_update_datetime

image

CheckPointUpdateAction

Label*
CheckPointUpdateAction

Specify the collection name in firestore DB*
PIPELINE_CHECKPOINTS

Specify the document name to read the checkpoint details*
INCREMENTAL_DEMO

Dataset name where incremental pull table exists*
bq_dataset

Table name that needs incremental pull*
bq_table

Specify the checkpoint column from incremental pull table*
last_update_datetime

serviceFilePath
auto-detect

project
pso-cdf-plugins-287518

image

Building the CDF/CDAP Plugin (JAR file / JSON file) and deploying into CDF/CDAP

This plugin requires Java JDK1.8 and maven.

  1. To build the CDAP / CDF plugin jar, execute the following command on the root.
mvn clean compile package
  1. You will find the generated JAR file and JSON file under target folder:

    1. GoogleFunctions-1.6.jar
    2. GoogleFunctions-1.6.json
  2. Deploy GoogleFunctions-1.6.jar and GoogleFunctions-1.6.json into CDF/CDAP (note that if you have the same version already deployed then you’ll get an error that it already exists):

    1. Go to Control Center
    2. Delete GoogleFunctions artifact if the same version already exists.
    3. Upload plugin by clicking on the circled green + button
    4. Pick the JAR file / JSON file created under target folder
    5. You’ll see a confirmation of the successful plugin upload