/
SourceConfigDefinition.java
150 lines (142 loc) · 5.45 KB
/
SourceConfigDefinition.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
/*
* Copyright (c) 2018, salesforce.com, inc.
* All rights reserved.
* SPDX-License-Identifier: BSD-3-Clause
* For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause
*
*/
package com.salesforce.mirus.config;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.ConfigKey;
import org.apache.kafka.connect.runtime.ConnectorConfig;
/**
* Properties listed here can be applied to the MirusSourceConnector configuration object, which is
* submitted as a JSON object to the REST Config API. These include details of the cluster the
* {@link com.salesforce.mirus.MirusSourceConnector} will mirror data from, along with some
* information on the destination cluster for topic metadata validation.
*/
public enum SourceConfigDefinition {
TOPICS_WHITELIST(
"topics.whitelist",
ConfigDef.Type.LIST,
Collections.EMPTY_LIST,
ConfigDef.Importance.HIGH,
"Whitelisted topic names will be mirrored. Comma-separated list."),
TOPICS_REGEX(
"topics.regex",
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.HIGH,
"Java regex whitelist. Matching topics names will be mirrored"),
TOPICS_REGEX_LIST(
"topics.regex.list",
ConfigDef.Type.LIST,
Collections.EMPTY_LIST,
ConfigDef.Importance.HIGH,
"Comma separated Java regex list. Matching topic names will be mirrored. Regex strings must not contain commas."),
MONITOR_POLL_WAIT_MS(
"monitor.poll.wait.ms",
ConfigDef.Type.LONG,
TimeUnit.MILLISECONDS.convert(60, TimeUnit.SECONDS),
ConfigDef.Importance.HIGH,
"Wait time between attempts to poll for source configuration changes (milliseconds)"),
POLL_TIMEOUT_MS(
"poll.timeout.ms",
ConfigDef.Type.LONG,
TimeUnit.MILLISECONDS.convert(1, TimeUnit.SECONDS),
ConfigDef.Importance.HIGH,
"Timeout on Kafka consumer poll call (milliseconds)"),
DESTINATION_TOPIC_NAME_PREFIX(
"destination.topic.name.prefix",
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.MEDIUM,
"Prefix all destination topics names with this string"),
DESTINATION_TOPIC_NAME_SUFFIX(
"destination.topic.name.suffix",
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.MEDIUM,
"Suffix all destination topics names with this string"),
ENABLE_PARTITION_MATCHING(
"enable.partition.matching",
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.MEDIUM,
"Ensures records are written to the destination partition with the same identifier as the source partition"),
ENABLE_DESTINATION_TOPIC_CHECKING(
"enable.destination.topic.checking",
ConfigDef.Type.BOOLEAN,
true,
ConfigDef.Importance.LOW,
"Enables destination topic checking to ensure the topic exists in the destination cluster."
+ " Supports the RegexRouter SMT but not other Router transformations or other topic-rerouting"
+ " transformations. Disable to use other Kafka Connect Transformations to reroute messages"
+ "to different topics."),
SOURCE_KEY_CONVERTER(
"source.key.converter",
ConfigDef.Type.CLASS,
"org.apache.kafka.connect.converters.ByteArrayConverter",
ConfigDef.Importance.MEDIUM,
"Converter class to apply to source record keys"),
SOURCE_VALUE_CONVERTER(
"source.value.converter",
ConfigDef.Type.CLASS,
"org.apache.kafka.connect.converters.ByteArrayConverter",
ConfigDef.Importance.MEDIUM,
"Converter class to apply to source record values"),
SOURCE_HEADER_CONVERTER(
"source.header.converter",
ConfigDef.Type.CLASS,
"org.apache.kafka.connect.converters.ByteArrayConverter",
ConfigDef.Importance.MEDIUM,
"Converter class to apply to source record headers"),
COMMIT_FAILURE_RESTART_MS(
"commit.failure.restart.ms",
ConfigDef.Type.LONG,
TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES),
ConfigDef.Importance.MEDIUM,
"Fail task if no successful commit is seen for this time. Tasks automatically restart by default"),
@Deprecated
DESTINATION_BOOTSTRAP_SERVERS(
"destination.bootstrap.servers",
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.HIGH,
"Comma-separated list of destination bootstrap server endpoints in standard format. This is used by the Kafka Monitor for run-time topic validation");
String key;
ConfigDef.Type type;
Object defaultValue;
ConfigDef.Importance importance;
String doc;
SourceConfigDefinition(
String key,
ConfigDef.Type type,
Object defaultValue,
ConfigDef.Importance importance,
String doc) {
this.key = key;
this.type = type;
this.defaultValue = defaultValue;
this.importance = importance;
this.doc = doc;
}
public static ConfigDef configDef() {
ConfigDef configDef = new ConfigDef();
for (SourceConfigDefinition f : SourceConfigDefinition.values()) {
configDef = configDef.define(f.key, f.type, f.defaultValue, f.importance, f.doc);
}
// Share name and transforms config definitions from ConnectorConfig
for (ConfigKey key : ConnectorConfig.configDef().configKeys().values()) {
if ("Transforms".equals(key.group) || ConnectorConfig.NAME_CONFIG.equals(key.name)) {
configDef.define(key);
}
}
return configDef;
}
public String getKey() {
return key;
}
}