forked from GoogleCloudPlatform/dataproc-templates
-
Notifications
You must be signed in to change notification settings - Fork 0
/
HiveToBigQuery.java
128 lines (118 loc) · 4.85 KB
/
HiveToBigQuery.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
/*
* Copyright (C) 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.dataproc.templates.hive;
import static com.google.cloud.dataproc.templates.util.TemplateConstants.*;
import com.google.cloud.dataproc.templates.BaseTemplate;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Spark job to move data or/and schema from Hive table to BigQuery. This template can be configured
* to run in few different modes. In default mode hivetobq.append.mode is set to ErrorIfExists. This
* will cause failure if target BigQuery table already exists. Other possible values for this
* property are: 1. Append 2. Overwrite 3. ErrorIfExists 4. Ignore For detailed list of properties
* refer "HiveToBQ Template properties" section in resources/template.properties file.
*/
public class HiveToBigQuery implements BaseTemplate {
private static final Logger LOGGER = LoggerFactory.getLogger(HiveToBigQuery.class);
private String bqLocation;
private String warehouseLocation;
private String hiveInputTable;
private String hiveInputDb;
private String bqAppendMode;
private String partitionColumn;
public HiveToBigQuery() {
bqLocation = getProperties().getProperty(HIVE_TO_BQ_BIGQUERY_LOCATION);
warehouseLocation = getProperties().getProperty(HIVE_TO_BQ_WAREHOUSE_LOCATION_PROP);
hiveInputTable = getProperties().getProperty(HIVE_TO_BQ_INPUT_TABLE_PROP);
hiveInputDb = getProperties().getProperty(HIVE_TO_BQ_INPUT_TABLE_DATABASE_PROP);
bqAppendMode = getProperties().getProperty(HIVE_TO_BQ_APPEND_MODE);
partitionColumn = getProperties().getProperty(HIVE_TO_BQ_PARTITION_COL);
}
@Override
public void runTemplate() {
if (StringUtils.isAllBlank(bqLocation)
|| StringUtils.isAllBlank(hiveInputTable)
|| StringUtils.isAllBlank(warehouseLocation)
|| StringUtils.isAllBlank(hiveInputDb)) {
LOGGER.error(
"{},{},{},{} is required parameter. ",
HIVE_TO_BQ_BIGQUERY_LOCATION,
HIVE_TO_BQ_INPUT_TABLE_PROP,
HIVE_TO_BQ_INPUT_TABLE_DATABASE_PROP,
HIVE_TO_BQ_WAREHOUSE_LOCATION_PROP);
throw new IllegalArgumentException(
"Required parameters for HiveToBigQuery not passed. "
+ "Set mandatory parameter for HiveToBigQuery template "
+ "in resources/conf/template.properties file.");
}
SparkSession spark = null;
LOGGER.info(
"Starting Hive to BigQuery spark jo;b with following parameters:"
+ "1. {}:{}"
+ "2. {}:{}"
+ "3. {}:{}"
+ "4. {},{}"
+ "5. {},{}",
HIVE_TO_BQ_BIGQUERY_LOCATION,
bqLocation,
HIVE_TO_BQ_WAREHOUSE_LOCATION_PROP,
warehouseLocation,
HIVE_TO_BQ_INPUT_TABLE_PROP,
hiveInputTable,
HIVE_TO_BQ_INPUT_TABLE_DATABASE_PROP,
hiveInputDb,
HIVE_TO_BQ_APPEND_MODE,
bqAppendMode);
try {
// Initialize Spark session
spark =
SparkSession.builder()
.appName("Spark HiveToBigQuery Job")
.config(HIVE_TO_BQ_WAREHOUSE_LOCATION_PROP, warehouseLocation)
.enableHiveSupport()
.getOrCreate();
LOGGER.debug("added jars : {}", spark.sparkContext().addedJars().keys());
/** Read Input data from Hive table */
Dataset<Row> inputData = spark.sql("select * from " + hiveInputDb + "." + hiveInputTable);
/**
* Write output to BigQuery
*
* <p>Warehouse location to be used as temporary GCS bucket location for staging data before
* writing to BQ. Job failures would require a manual cleanup of this data.
*/
// TODO -- Remove using warehouse location for staging data add new property
inputData
.write()
.mode(bqAppendMode)
.format("bigquery")
.option("table", bqLocation)
.option("temporaryGcsBucket", (warehouseLocation + "/temp/spark").replace("gs://", ""))
.save();
LOGGER.info("HiveToBigQuery job completed.");
spark.stop();
} catch (Throwable th) {
LOGGER.error("Exception in HiveToBigQuery", th);
if (Objects.nonNull(spark)) {
spark.stop();
}
}
}
}