-
Notifications
You must be signed in to change notification settings - Fork 4.9k
/
KafkaConfiguration.java
executable file
·1955 lines (1699 loc) · 82.5 KB
/
KafkaConfiguration.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderSerializer;
import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
import org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.HeaderFilterStrategyAware;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.StateRepository;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriParams;
import org.apache.camel.spi.UriPath;
import org.apache.camel.support.jsse.CipherSuitesParameters;
import org.apache.camel.support.jsse.KeyManagersParameters;
import org.apache.camel.support.jsse.KeyStoreParameters;
import org.apache.camel.support.jsse.SSLContextParameters;
import org.apache.camel.support.jsse.SecureSocketProtocolsParameters;
import org.apache.camel.support.jsse.TrustManagersParameters;
import org.apache.camel.util.ObjectHelper;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
@UriParams
public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware {
// Common configuration properties
@UriPath(label = "common")
@Metadata(required = true)
private String topic;
@UriParam(label = "common")
private String brokers;
@UriParam(label = "common")
private String clientId;
@UriParam(label = "common",
description = "To use a custom HeaderFilterStrategy to filter header to and from Camel message.")
private HeaderFilterStrategy headerFilterStrategy = new KafkaHeaderFilterStrategy();
@UriParam(label = "consumer", defaultValue = "true")
private boolean preValidateHostAndPort = true;
@UriParam(label = "consumer")
private boolean topicIsPattern;
@UriParam(label = "consumer")
private String groupId;
@UriParam(label = "consumer")
private String groupInstanceId;
@UriParam(label = "consumer", defaultValue = "1")
private int consumersCount = 1;
@UriParam(label = "consumer", description = "To use a custom KafkaHeaderDeserializer to deserialize kafka headers values")
private KafkaHeaderDeserializer headerDeserializer = new DefaultKafkaHeaderDeserializer();
// interceptor.classes
@UriParam(label = "common,monitoring")
private String interceptorClasses;
// key.deserializer
@UriParam(label = "consumer", defaultValue = KafkaConstants.KAFKA_DEFAULT_DESERIALIZER)
private String keyDeserializer = KafkaConstants.KAFKA_DEFAULT_DESERIALIZER;
// value.deserializer
@UriParam(label = "consumer", defaultValue = KafkaConstants.KAFKA_DEFAULT_DESERIALIZER)
private String valueDeserializer = KafkaConstants.KAFKA_DEFAULT_DESERIALIZER;
// fetch.min.bytes
@UriParam(label = "consumer", defaultValue = "1")
private Integer fetchMinBytes = 1;
// fetch.min.bytes
@UriParam(label = "consumer", defaultValue = "52428800")
private Integer fetchMaxBytes = 50 * 1024 * 1024;
// heartbeat.interval.ms
@UriParam(label = "consumer", defaultValue = "3000")
private Integer heartbeatIntervalMs = 3000;
// max.partition.fetch.bytes
@UriParam(label = "consumer", defaultValue = "1048576")
private Integer maxPartitionFetchBytes = 1048576;
// session.timeout.ms
@UriParam(label = "consumer", defaultValue = "45000")
private Integer sessionTimeoutMs = 45000;
@UriParam(label = "consumer", defaultValue = "500")
private Integer maxPollRecords;
@UriParam(label = "consumer", defaultValue = "5000", javaType = "java.time.Duration")
private Long pollTimeoutMs = 5000L;
@UriParam(label = "consumer", javaType = "java.time.Duration")
private Integer maxPollIntervalMs;
// auto.offset.reset1
@UriParam(label = "consumer", defaultValue = "latest", enums = "latest,earliest,none")
private String autoOffsetReset = "latest";
// partition.assignment.strategy
@UriParam(label = "consumer", defaultValue = KafkaConstants.PARTITIONER_RANGE_ASSIGNOR)
private String partitionAssignor = KafkaConstants.PARTITIONER_RANGE_ASSIGNOR;
// request.timeout.ms
@UriParam(label = "consumer", defaultValue = "30000")
private Integer consumerRequestTimeoutMs = 30000;
// auto.commit.interval.ms
@UriParam(label = "consumer", defaultValue = "5000")
private Integer autoCommitIntervalMs = 5000;
// check.crcs
@UriParam(label = "consumer", defaultValue = "true")
private Boolean checkCrcs = true;
// fetch.max.wait.ms
@UriParam(label = "consumer", defaultValue = "500")
private Integer fetchWaitMaxMs = 500;
@UriParam(label = "consumer")
private SeekPolicy seekTo;
// Consumer configuration properties
@UriParam(label = "consumer", defaultValue = "true")
private boolean autoCommitEnable = true;
@UriParam(label = "consumer")
private boolean allowManualCommit;
@UriParam(label = "consumer")
private boolean breakOnFirstError;
@UriParam(label = "consumer")
private StateRepository<String, String> offsetRepository;
@UriParam(label = "consumer", defaultValue = "ERROR_HANDLER")
private PollOnError pollOnError = PollOnError.ERROR_HANDLER;
@UriParam(label = "consumer", defaultValue = "5000", javaType = "java.time.Duration")
private Long commitTimeoutMs = 5000L;
@UriParam(label = "consumer,advanced", defaultValue = "read_uncommitted", enums = "read_uncommitted,read_committed")
private String isolationLevel;
// Producer configuration properties
@UriParam(label = "producer")
private String partitioner;
@UriParam(label = "producer", defaultValue = "false")
private boolean partitionerIgnoreKeys;
@UriParam(label = "producer", defaultValue = "100")
private Integer retryBackoffMs = 100;
@UriParam(label = "producer")
private ExecutorService workerPool;
@UriParam(label = "producer", defaultValue = "10")
private Integer workerPoolCoreSize = 10;
@UriParam(label = "producer", defaultValue = "20")
private Integer workerPoolMaxSize = 20;
// Async producer config
@UriParam(label = "producer", defaultValue = "10000")
private Integer queueBufferingMaxMessages = 10000;
@UriParam(label = "producer", defaultValue = KafkaConstants.KAFKA_DEFAULT_SERIALIZER)
private String valueSerializer = KafkaConstants.KAFKA_DEFAULT_SERIALIZER;
@UriParam(label = "producer", defaultValue = KafkaConstants.KAFKA_DEFAULT_SERIALIZER)
private String keySerializer = KafkaConstants.KAFKA_DEFAULT_SERIALIZER;
@UriParam(label = "producer")
private String key;
@UriParam(label = "producer")
private Integer partitionKey;
@UriParam(label = "producer", enums = "all,-1,0,1", defaultValue = "all")
private String requestRequiredAcks = "all";
// buffer.memory
@UriParam(label = "producer", defaultValue = "33554432")
private Integer bufferMemorySize = 33554432;
// compression.type
@UriParam(label = "producer", defaultValue = "none", enums = "none,gzip,snappy,lz4,zstd")
private String compressionCodec = "none";
// retries
@UriParam(label = "producer")
private Integer retries;
// use individual headers if exchange.body contains Iterable or similar of Message or Exchange
@UriParam(label = "producer", defaultValue = "false")
private boolean batchWithIndividualHeaders;
// batch.size
@UriParam(label = "producer", defaultValue = "16384")
private Integer producerBatchSize = 16384;
// connections.max.idle.ms
@UriParam(label = "producer", defaultValue = "540000")
private Integer connectionMaxIdleMs = 540000;
// linger.ms
@UriParam(label = "producer", defaultValue = "0")
private Integer lingerMs = 0;
// linger.ms
@UriParam(label = "producer", defaultValue = "60000")
private Integer maxBlockMs = 60000;
// max.request.size
@UriParam(label = "producer", defaultValue = "1048576")
private Integer maxRequestSize = 1048576;
// receive.buffer.bytes
@UriParam(label = "producer", defaultValue = "65536")
private Integer receiveBufferBytes = 65536;
// request.timeout.ms
@UriParam(label = "producer", defaultValue = "30000")
private Integer requestTimeoutMs = 30000;
// delivery.timeout.ms
@UriParam(label = "producer", defaultValue = "120000")
private Integer deliveryTimeoutMs = 120000;
// send.buffer.bytes
@UriParam(label = "producer", defaultValue = "131072")
private Integer sendBufferBytes = 131072;
@UriParam(label = "producer", defaultValue = "true")
private boolean recordMetadata = true;
// max.in.flight.requests.per.connection
@UriParam(label = "producer", defaultValue = "5")
private Integer maxInFlightRequest = 5;
// metadata.max.age.ms
@UriParam(label = "producer", defaultValue = "300000")
private Integer metadataMaxAgeMs = 300000;
// metric.reporters
@UriParam(label = "producer")
private String metricReporters;
// metrics.num.samples
@UriParam(label = "producer", defaultValue = "2")
private Integer noOfMetricsSample = 2;
// metrics.sample.window.ms
@UriParam(label = "producer", defaultValue = "30000")
private Integer metricsSampleWindowMs = 30000;
// reconnect.backoff.ms
@UriParam(label = "producer", defaultValue = "50")
private Integer reconnectBackoffMs = 50;
// enable.idempotence
// reconnect.backoff.ms
@UriParam(label = "producer", defaultValue = "true")
private boolean enableIdempotence;
@UriParam(label = "producer", description = "To use a custom KafkaHeaderSerializer to serialize kafka headers values")
private KafkaHeaderSerializer headerSerializer = new DefaultKafkaHeaderSerializer();
// reconnect.backoff.max.ms
@UriParam(label = "common", defaultValue = "1000")
private Integer reconnectBackoffMaxMs = 1000;
// SSL
@UriParam(label = "common,security")
private SSLContextParameters sslContextParameters;
// SSL
// ssl.key.password
@UriParam(label = "common,security", secret = true)
private String sslKeyPassword;
// ssl.keystore.location
@UriParam(label = "common,security")
private String sslKeystoreLocation;
// ssl.keystore.password
@UriParam(label = "common,security", secret = true)
private String sslKeystorePassword;
// ssl.truststore.location
@UriParam(label = "common,security")
private String sslTruststoreLocation;
// ssl.truststore.password
@UriParam(label = "common,security", secret = true)
private String sslTruststorePassword;
// SSL
// ssl.enabled.protocols
@UriParam(label = "common,security")
private String sslEnabledProtocols = SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS;
// ssl.keystore.type
@UriParam(label = "common,security", defaultValue = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE)
private String sslKeystoreType = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE;
// ssl.protocol
@UriParam(label = "common,security")
private String sslProtocol = SslConfigs.DEFAULT_SSL_PROTOCOL;
// ssl.provider
@UriParam(label = "common,security")
private String sslProvider;
// ssl.truststore.type
@UriParam(label = "common,security", defaultValue = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE)
private String sslTruststoreType = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE;
// SSL
// ssl.cipher.suites
@UriParam(label = "common,security")
private String sslCipherSuites;
// ssl.endpoint.identification.algorithm
@UriParam(label = "common,security", defaultValue = "https")
private String sslEndpointAlgorithm = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM;
// ssl.keymanager.algorithm
@UriParam(label = "common,security", defaultValue = "SunX509")
private String sslKeymanagerAlgorithm = "SunX509";
// ssl.trustmanager.algorithm
@UriParam(label = "common,security", defaultValue = "PKIX")
private String sslTrustmanagerAlgorithm = "PKIX";
// SASL & sucurity Protocol
// sasl.kerberos.service.name
@UriParam(label = "common,security")
private String saslKerberosServiceName;
// security.protocol
@UriParam(label = "common,security", defaultValue = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL)
private String securityProtocol = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL;
// SASL
// sasl.mechanism
@UriParam(label = "common,security", defaultValue = SaslConfigs.DEFAULT_SASL_MECHANISM)
private String saslMechanism = SaslConfigs.DEFAULT_SASL_MECHANISM;
// sasl.kerberos.kinit.cmd
@UriParam(label = "common,security", defaultValue = SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD)
private String kerberosInitCmd = SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD;
// sasl.kerberos.min.time.before.relogin
@UriParam(label = "common,security", defaultValue = "60000")
private Integer kerberosBeforeReloginMinTime = 60000;
// sasl.kerberos.ticket.renew.jitter
@UriParam(label = "common,security", defaultValue = "0.05")
private Double kerberosRenewJitter = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER;
// sasl.kerberos.ticket.renew.window.factor
@UriParam(label = "common,security", defaultValue = "0.8")
private Double kerberosRenewWindowFactor = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR;
@UriParam(label = "common,security", defaultValue = "DEFAULT")
// sasl.kerberos.principal.to.local.rules
private String kerberosPrincipalToLocalRules;
@UriParam(label = "common,security", secret = true)
// sasl.jaas.config
private String saslJaasConfig;
// Schema registry only options
@UriParam(label = "schema")
private String schemaRegistryURL;
@UriParam(label = "schema,consumer")
private boolean specificAvroReader;
// Additional properties
@UriParam(label = "common", prefix = "additionalProperties.", multiValue = true)
private Map<String, Object> additionalProperties = new HashMap<>();
@UriParam(label = "common", defaultValue = "30000")
private int shutdownTimeout = 30000;
@UriParam(defaultValue = "false", label = "advanced",
description = "Sets whether synchronous processing should be strictly used")
private boolean synchronous;
@UriParam(label = "common,security")
private String kerberosConfigLocation;
@UriParam(label = "consumer", defaultValue = "false")
private boolean batching;
public KafkaConfiguration() {
}
/**
* Returns a copy of this configuration
*/
public KafkaConfiguration copy() {
try {
KafkaConfiguration copy = (KafkaConfiguration) clone();
copy.additionalProperties = new HashMap<>(this.additionalProperties);
return copy;
} catch (CloneNotSupportedException e) {
throw new RuntimeCamelException(e);
}
}
public Properties createProducerProperties() {
Properties props = new Properties();
addPropertyIfNotEmpty(props, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, getKeySerializer());
addPropertyIfNotEmpty(props, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getValueSerializer());
addPropertyIfNotEmpty(props, ProducerConfig.ACKS_CONFIG, getRequestRequiredAcks());
addPropertyIfNotEmpty(props, ProducerConfig.BUFFER_MEMORY_CONFIG, getBufferMemorySize());
addPropertyIfNotEmpty(props, ProducerConfig.COMPRESSION_TYPE_CONFIG, getCompressionCodec());
addPropertyIfNotEmpty(props, ProducerConfig.RETRIES_CONFIG, getRetries());
addPropertyIfNotEmpty(props, ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, getInterceptorClasses());
addPropertyIfNotEmpty(props, ProducerConfig.BATCH_SIZE_CONFIG, getProducerBatchSize());
addPropertyIfNotEmpty(props, ProducerConfig.CLIENT_ID_CONFIG, getClientId());
addPropertyIfNotEmpty(props, ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, getConnectionMaxIdleMs());
addPropertyIfNotEmpty(props, ProducerConfig.LINGER_MS_CONFIG, getLingerMs());
addPropertyIfNotEmpty(props, ProducerConfig.MAX_BLOCK_MS_CONFIG, getMaxBlockMs());
addPropertyIfNotEmpty(props, ProducerConfig.MAX_REQUEST_SIZE_CONFIG, getMaxRequestSize());
addPropertyIfNotEmpty(props, ProducerConfig.PARTITIONER_CLASS_CONFIG, getPartitioner());
addPropertyIfNotEmpty(props, ProducerConfig.PARTITIONER_IGNORE_KEYS_CONFIG, isPartitionerIgnoreKeys());
addPropertyIfNotEmpty(props, ProducerConfig.RECEIVE_BUFFER_CONFIG, getReceiveBufferBytes());
addPropertyIfNotEmpty(props, ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, getRequestTimeoutMs());
addPropertyIfNotEmpty(props, ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, getDeliveryTimeoutMs());
addPropertyIfNotEmpty(props, ProducerConfig.SEND_BUFFER_CONFIG, getSendBufferBytes());
addPropertyIfNotEmpty(props, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, getMaxInFlightRequest());
addPropertyIfNotEmpty(props, ProducerConfig.METADATA_MAX_AGE_CONFIG, getMetadataMaxAgeMs());
addPropertyIfNotEmpty(props, ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, getMetricReporters());
addPropertyIfNotEmpty(props, ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, getNoOfMetricsSample());
addPropertyIfNotEmpty(props, ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, getMetricsSampleWindowMs());
addPropertyIfNotEmpty(props, ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs());
addPropertyIfNotEmpty(props, ProducerConfig.RETRY_BACKOFF_MS_CONFIG, getRetryBackoffMs());
addPropertyIfNotEmpty(props, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, isEnableIdempotence());
addPropertyIfNotEmpty(props, ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, getReconnectBackoffMaxMs());
addPropertyIfNotEmpty(props, "schema.registry.url", getSchemaRegistryURL());
// SSL
if (sslContextParameters != null) {
applySslConfigurationFromContext(props, sslContextParameters);
} else {
applyProducerSslConfiguration(props);
}
addPropertyIfNotEmpty(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
// SASL
if (isSasl(securityProtocol)) {
applySaslConfiguration(props);
}
// additional properties
applyAdditionalProperties(props, getAdditionalProperties());
return props;
}
private void applySaslConfiguration(Properties props) {
addPropertyIfNotEmpty(props, SaslConfigs.SASL_KERBEROS_SERVICE_NAME, getSaslKerberosServiceName());
addPropertyIfNotEmpty(props, SaslConfigs.SASL_KERBEROS_KINIT_CMD, getKerberosInitCmd());
addPropertyIfNotEmpty(props, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, getKerberosBeforeReloginMinTime());
addPropertyIfNotEmpty(props, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, getKerberosRenewJitter());
addPropertyIfNotEmpty(props, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, getKerberosRenewWindowFactor());
addPropertyIfNotEmpty(props, BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG,
getKerberosPrincipalToLocalRules());
addPropertyIfNotEmpty(props, SaslConfigs.SASL_MECHANISM, getSaslMechanism());
addPropertyIfNotEmpty(props, SaslConfigs.SASL_JAAS_CONFIG, getSaslJaasConfig());
}
private void applyProducerSslConfiguration(Properties props) {
if (securityProtocol.equals(SecurityProtocol.SSL.name()) || securityProtocol.equals(SecurityProtocol.SASL_SSL.name())) {
addPropertyIfNotEmpty(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol());
addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, getSslKeyPassword());
addPropertyIfNotEmpty(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, getSslKeystoreLocation());
addPropertyIfNotEmpty(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, getSslKeystorePassword());
addPropertyIfNotEmpty(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, getSslTruststoreLocation());
addPropertyIfNotEmpty(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, getSslTruststorePassword());
addPropertyIfNotEmpty(props, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, getSslEnabledProtocols());
addUpperCasePropertyIfNotEmpty(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, getSslKeystoreType());
addPropertyIfNotEmpty(props, SslConfigs.SSL_PROTOCOL_CONFIG, getSslProtocol());
addPropertyIfNotEmpty(props, SslConfigs.SSL_PROVIDER_CONFIG, getSslProvider());
addUpperCasePropertyIfNotEmpty(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, getSslTruststoreType());
addPropertyIfNotEmpty(props, SslConfigs.SSL_CIPHER_SUITES_CONFIG, getSslCipherSuites());
String algo = getSslEndpointAlgorithm();
if (algo != null && !algo.equals("none") && !algo.equals("false")) {
addPropertyIfNotNull(props, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, algo);
}
addPropertyIfNotEmpty(props, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, getSslKeymanagerAlgorithm());
addPropertyIfNotEmpty(props, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, getSslTrustmanagerAlgorithm());
}
}
public Properties createConsumerProperties() {
Properties props = new Properties();
addPropertyIfNotEmpty(props, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, getKeyDeserializer());
addPropertyIfNotEmpty(props, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, getValueDeserializer());
addPropertyIfNotEmpty(props, ConsumerConfig.FETCH_MIN_BYTES_CONFIG, getFetchMinBytes());
addPropertyIfNotEmpty(props, ConsumerConfig.FETCH_MAX_BYTES_CONFIG, getFetchMaxBytes());
addPropertyIfNotEmpty(props, ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, getHeartbeatIntervalMs());
addPropertyIfNotEmpty(props, ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, getMaxPartitionFetchBytes());
addPropertyIfNotEmpty(props, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, getSessionTimeoutMs());
addPropertyIfNotEmpty(props, ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, getMaxPollIntervalMs());
addPropertyIfNotEmpty(props, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, getMaxPollRecords());
addPropertyIfNotEmpty(props, ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, getInterceptorClasses());
addPropertyIfNotEmpty(props, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, getAutoOffsetReset());
addPropertyIfNotEmpty(props, ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, getConnectionMaxIdleMs());
addPropertyIfNotEmpty(props, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, getAutoCommitEnable());
addPropertyIfNotEmpty(props, ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, getPartitionAssignor());
addPropertyIfNotEmpty(props, ConsumerConfig.RECEIVE_BUFFER_CONFIG, getReceiveBufferBytes());
addPropertyIfNotEmpty(props, ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, getConsumerRequestTimeoutMs());
addPropertyIfNotEmpty(props, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, getAutoCommitIntervalMs());
addPropertyIfNotEmpty(props, ConsumerConfig.CHECK_CRCS_CONFIG, getCheckCrcs());
addPropertyIfNotEmpty(props, ConsumerConfig.CLIENT_ID_CONFIG, getClientId());
addPropertyIfNotEmpty(props, ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, getFetchWaitMaxMs());
addPropertyIfNotEmpty(props, ConsumerConfig.METADATA_MAX_AGE_CONFIG, getMetadataMaxAgeMs());
addPropertyIfNotEmpty(props, ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, getMetricReporters());
addPropertyIfNotEmpty(props, ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, getNoOfMetricsSample());
addPropertyIfNotEmpty(props, ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, getMetricsSampleWindowMs());
addPropertyIfNotEmpty(props, ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs());
addPropertyIfNotEmpty(props, ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, getRetryBackoffMs());
addPropertyIfNotEmpty(props, ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, getReconnectBackoffMaxMs());
addPropertyIfNotEmpty(props, ConsumerConfig.ISOLATION_LEVEL_CONFIG, getIsolationLevel());
addPropertyIfNotEmpty(props, "schema.registry.url", getSchemaRegistryURL());
addPropertyIfNotFalse(props, "specific.avro.reader", isSpecificAvroReader());
// SSL
if (sslContextParameters != null) {
applySslConfigurationFromContext(props, sslContextParameters);
} else {
applySslConsumerConfigurationFromOptions(props);
}
// Security protocol
addPropertyIfNotEmpty(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
// SASL
if (isSasl(securityProtocol)) {
applySaslConfiguration(props);
}
// additional properties
applyAdditionalProperties(props, getAdditionalProperties());
return props;
}
private boolean isSasl(String securityProtocol) {
return securityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name())
|| securityProtocol.equals(SecurityProtocol.SASL_SSL.name());
}
private void applySslConsumerConfigurationFromOptions(Properties props) {
if (securityProtocol.equals(SecurityProtocol.SSL.name()) || securityProtocol.equals(SecurityProtocol.SASL_SSL.name())) {
addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, getSslKeyPassword());
addPropertyIfNotEmpty(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, getSslKeystoreLocation());
addPropertyIfNotEmpty(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, getSslKeystorePassword());
addPropertyIfNotEmpty(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, getSslTruststoreLocation());
addPropertyIfNotEmpty(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, getSslTruststorePassword());
addPropertyIfNotEmpty(props, SslConfigs.SSL_CIPHER_SUITES_CONFIG, getSslCipherSuites());
String algo = getSslEndpointAlgorithm();
if (algo != null && !algo.equals("none") && !algo.equals("false")) {
addPropertyIfNotNull(props, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, algo);
}
addPropertyIfNotEmpty(props, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, getSslKeymanagerAlgorithm());
addPropertyIfNotEmpty(props, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, getSslTrustmanagerAlgorithm());
addPropertyIfNotEmpty(props, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, getSslEnabledProtocols());
addUpperCasePropertyIfNotEmpty(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, getSslKeystoreType());
addPropertyIfNotEmpty(props, SslConfigs.SSL_PROTOCOL_CONFIG, getSslProtocol());
addPropertyIfNotEmpty(props, SslConfigs.SSL_PROVIDER_CONFIG, getSslProvider());
addUpperCasePropertyIfNotEmpty(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, getSslTruststoreType());
addPropertyIfNotEmpty(props, ProducerConfig.SEND_BUFFER_CONFIG, getSendBufferBytes());
}
}
/**
* Uses the standard camel {@link SSLContextParameters} object to fill the Kafka SSL properties
*
* @param props Kafka properties
* @param sslContextParameters SSL configuration
*/
private void applySslConfigurationFromContext(Properties props, SSLContextParameters sslContextParameters) {
addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, sslContextParameters.getSecureSocketProtocol());
addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, sslContextParameters.getProvider());
CipherSuitesParameters cipherSuites = sslContextParameters.getCipherSuites();
if (cipherSuites != null) {
addCommaSeparatedList(props, SslConfigs.SSL_CIPHER_SUITES_CONFIG, cipherSuites.getCipherSuite());
}
SecureSocketProtocolsParameters secureSocketProtocols = sslContextParameters.getSecureSocketProtocols();
if (secureSocketProtocols != null) {
addCommaSeparatedList(props, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG,
secureSocketProtocols.getSecureSocketProtocol());
}
KeyManagersParameters keyManagers = sslContextParameters.getKeyManagers();
if (keyManagers != null) {
addPropertyIfNotNull(props, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, keyManagers.getAlgorithm());
addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyManagers.getKeyPassword());
KeyStoreParameters keyStore = keyManagers.getKeyStore();
if (keyStore != null) {
addUpperCasePropertyIfNotEmpty(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, keyStore.getType());
addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStore.getResource());
addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keyStore.getPassword());
}
}
TrustManagersParameters trustManagers = sslContextParameters.getTrustManagers();
if (trustManagers != null) {
addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, trustManagers.getAlgorithm());
KeyStoreParameters keyStore = trustManagers.getKeyStore();
if (keyStore != null) {
addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, keyStore.getType());
addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, keyStore.getResource());
addPropertyIfNotEmpty(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, keyStore.getPassword());
}
}
}
private void applyAdditionalProperties(final Properties props, final Map<String, Object> additionalProperties) {
if (!ObjectHelper.isEmpty(getAdditionalProperties())) {
additionalProperties.forEach((property, value) -> {
if (value != null) {
// value should be as-is
props.put(property, value);
}
});
}
}
private static void addPropertyIfNotFalse(Properties props, String key, boolean value) {
if (value) {
// value should be as-is
props.put(key, value);
}
}
private static <T> void addPropertyIfNotEmpty(Properties props, String key, T value) {
if (ObjectHelper.isNotEmpty(value)) {
// value should be as-is
props.put(key, value);
}
}
private static <T> void addUpperCasePropertyIfNotEmpty(Properties props, String key, T value) {
if (ObjectHelper.isNotEmpty(value)) {
props.put(key, String.valueOf(value).toUpperCase(Locale.ROOT));
}
}
private static <T> void addPropertyIfNotNull(Properties props, String key, T value) {
if (value != null) {
// value should be as-is
props.put(key, value);
}
}
private static void addCommaSeparatedList(Properties props, String key, List<String> values) {
if (values != null && !values.isEmpty()) {
props.put(key, values.stream().collect(Collectors.joining(",")));
}
}
public boolean isPreValidateHostAndPort() {
return preValidateHostAndPort;
}
/**
* Whether to eager validate that broker host:port is valid and can be DNS resolved to known host during starting
* this consumer. If the validation fails, then an exception is thrown, which makes Camel fail fast.
*
* Disabling this will postpone the validation after the consumer is started, and Camel will keep re-connecting in
* case of validation or DNS resolution error.
*/
public void setPreValidateHostAndPort(boolean preValidateHostAndPort) {
this.preValidateHostAndPort = preValidateHostAndPort;
}
public boolean isTopicIsPattern() {
return topicIsPattern;
}
/**
* Whether the topic is a pattern (regular expression). This can be used to subscribe to dynamic number of topics
* matching the pattern.
*/
public void setTopicIsPattern(boolean topicIsPattern) {
this.topicIsPattern = topicIsPattern;
}
public String getGroupId() {
return groupId;
}
/**
* A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the
* same group id, multiple processes can indicate that they are all part of the same consumer group. This option is
* required for consumers.
*/
public void setGroupId(String groupId) {
this.groupId = groupId;
}
public String getGroupInstanceId() {
return groupInstanceId;
}
/**
* A unique identifier of the consumer instance provided by the end user. Only non-empty strings are permitted. If
* set, the consumer is treated as a static member, which means that only one instance with this ID is allowed in
* the consumer group at any time. This can be used in combination with a larger session timeout to avoid group
* rebalances caused by transient unavailability (e.g., process restarts). If not set, the consumer will join the
* group as a dynamic member, which is the traditional behavior.
*/
public void setGroupInstanceId(String groupInstanceId) {
this.groupInstanceId = groupInstanceId;
}
public String getPartitioner() {
return partitioner;
}
/**
* The partitioner class for partitioning messages amongst sub-topics. The default partitioner is based on the hash
* of the key.
*/
public void setPartitioner(String partitioner) {
this.partitioner = partitioner;
}
/**
* Whether the message keys should be ignored when computing the partition. This setting has effect only when
* {@link #partitioner} is not set
*/
public boolean isPartitionerIgnoreKeys() {
return partitionerIgnoreKeys;
}
public void setPartitionerIgnoreKeys(boolean partitionerIgnoreKeys) {
this.partitionerIgnoreKeys = partitionerIgnoreKeys;
}
public String getTopic() {
return topic;
}
/**
* Name of the topic to use. On the consumer you can use comma to separate multiple topics. A producer can only send
* a message to a single topic.
*/
public void setTopic(String topic) {
this.topic = topic;
}
public int getConsumersCount() {
return consumersCount;
}
/**
* The number of consumers that connect to kafka server. Each consumer is run on a separate thread that retrieves
* and process the incoming data.
*/
public void setConsumersCount(int consumersCount) {
this.consumersCount = consumersCount;
}
public String getClientId() {
return clientId;
}
/**
* The client id is a user-specified string sent in each request to help trace calls. It should logically identify
* the application making the request.
*/
public void setClientId(String clientId) {
this.clientId = clientId;
}
public boolean isAutoCommitEnable() {
return offsetRepository == null && autoCommitEnable;
}
public boolean getAutoCommitEnable() {
if (!batching) {
return autoCommitEnable;
}
return false;
}
/**
* If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer. This committed
* offset will be used when the process fails as the position from which the new consumer will begin.
*/
public void setAutoCommitEnable(boolean autoCommitEnable) {
this.autoCommitEnable = autoCommitEnable;
}
public boolean isAllowManualCommit() {
return allowManualCommit;
}
/**
* Whether to allow doing manual commits via {@link KafkaManualCommit}.
* <p/>
* If this option is enabled then an instance of {@link KafkaManualCommit} is stored on the {@link Exchange} message
* header, which allows end users to access this API and perform manual offset commits via the Kafka consumer.
*/
public void setAllowManualCommit(boolean allowManualCommit) {
this.allowManualCommit = allowManualCommit;
}
public int getShutdownTimeout() {
return shutdownTimeout;
}
/**
* Timeout in milliseconds to wait gracefully for the consumer or producer to shut down and terminate its worker
* threads.
*/
public void setShutdownTimeout(int shutdownTimeout) {
this.shutdownTimeout = shutdownTimeout;
}
public StateRepository<String, String> getOffsetRepository() {
return offsetRepository;
}
/**
* The offset repository to use to locally store the offset of each partition of the topic. Defining one
* will disable the autocommit.
*/
public void setOffsetRepository(StateRepository<String, String> offsetRepository) {
this.offsetRepository = offsetRepository;
}
public Integer getAutoCommitIntervalMs() {
return autoCommitIntervalMs;
}
/**
* The frequency in ms that the consumer offsets are committed to zookeeper.
*/
public void setAutoCommitIntervalMs(Integer autoCommitIntervalMs) {
this.autoCommitIntervalMs = autoCommitIntervalMs;
}
public Integer getFetchMinBytes() {
return fetchMinBytes;
}
/**
* The minimum amount of data the server should return for a fetch request. If insufficient data is available, the
* request will wait for that much data to accumulate before answering the request.
*/
public void setFetchMinBytes(Integer fetchMinBytes) {
this.fetchMinBytes = fetchMinBytes;
}
/**
* The maximum amount of data the server should return for a fetch request This is not an absolute maximum, if the
* first message in the first non-empty partition of the fetch is larger than this value, the message will still be
* returned to ensure that the consumer can make progress. The maximum message size accepted by the broker is
* defined via message.max.bytes (broker config) or max.message.bytes (topic config). Note that the consumer
* performs multiple fetches in parallel.
*/
public Integer getFetchMaxBytes() {
return fetchMaxBytes;
}
public void setFetchMaxBytes(Integer fetchMaxBytes) {
this.fetchMaxBytes = fetchMaxBytes;
}
public Integer getFetchWaitMaxMs() {
return fetchWaitMaxMs;
}
/**
* The maximum amount of time the server will block before answering the fetch request if there isn't sufficient
* data to immediately satisfy fetch.min.bytes
*/
public void setFetchWaitMaxMs(Integer fetchWaitMaxMs) {
this.fetchWaitMaxMs = fetchWaitMaxMs;
}
public String getAutoOffsetReset() {
return autoOffsetReset;
}
/**
* What to do when there is no initial offset in ZooKeeper or if an offset is out of range: earliest : automatically
* reset the offset to the earliest offset latest: automatically reset the offset to the latest offset fail: throw
* exception to the consumer
*/
public void setAutoOffsetReset(String autoOffsetReset) {
this.autoOffsetReset = autoOffsetReset;
}
public boolean isBreakOnFirstError() {
return breakOnFirstError;
}
/**
* This options controls what happens when a consumer is processing an exchange and it fails. If the option is
* <tt>false</tt> then the consumer continues to the next message and processes it. If the option is <tt>true</tt>
* then the consumer breaks out.
*
* Using the default NoopCommitManager will cause the consumer to not commit the offset so that the message is
* re-attempted. The consumer should use the KafkaManualCommit to determine the best way to handle the message.
*
* Using either the SyncCommitManager or the AsyncCommitManager, the consumer will seek back to the offset of the
* message that caused a failure, and then re-attempt to process this message. However, this can lead to endless
* processing of the same message if it's bound to fail every time, e.g., a poison message. Therefore, it's recommended to
* deal with that, for example, by using Camel's error handler.
*/
public void setBreakOnFirstError(boolean breakOnFirstError) {
this.breakOnFirstError = breakOnFirstError;
}
public String getBrokers() {
return brokers;
}
/**
* URL of the Kafka brokers to use. The format is host1:port1,host2:port2, and the list can be a subset of brokers
* or a VIP pointing to a subset of brokers.
* <p/>
* This option is known as <tt>bootstrap.servers</tt> in the Kafka documentation.
*/
public void setBrokers(String brokers) {
this.brokers = brokers;
}
public String getSchemaRegistryURL() {
return schemaRegistryURL;
}
/**
* URL of the schema registry servers to use. The format is host1:port1,host2:port2. This is known as
* schema.registry.url in multiple Schema registries documentation. This option is only available externally (not
* standard Apache Kafka)
*/
public void setSchemaRegistryURL(String schemaRegistryURL) {
this.schemaRegistryURL = schemaRegistryURL;
}
public boolean isSpecificAvroReader() {
return specificAvroReader;
}
/**
* This enables the use of a specific Avro reader for use with the in multiple Schema registries documentation with
* Avro Deserializers implementation. This option is only available externally (not standard Apache Kafka)
*/
public void setSpecificAvroReader(boolean specificAvroReader) {
this.specificAvroReader = specificAvroReader;
}
public String getCompressionCodec() {
return compressionCodec;
}
/**
* This parameter allows you to specify the compression codec for all data generated by this producer. Valid values
* are "none", "gzip", "snappy", "lz4" and "zstd".
*/
public void setCompressionCodec(String compressionCodec) {
this.compressionCodec = compressionCodec;
}
public Integer getRetryBackoffMs() {
return retryBackoffMs;
}
/**
* Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been
* elected. Since the leader election takes a bit of time, this property specifies the amount of time that the
* producer waits before refreshing the metadata.
*/
public void setRetryBackoffMs(Integer retryBackoffMs) {
this.retryBackoffMs = retryBackoffMs;
}
public Integer getSendBufferBytes() {
return sendBufferBytes;
}
/**
* Socket write buffer size
*/
public void setSendBufferBytes(Integer sendBufferBytes) {
this.sendBufferBytes = sendBufferBytes;
}
public Integer getRequestTimeoutMs() {
return requestTimeoutMs;
}
/**
* The amount of time the broker will wait trying to meet the request.required.acks requirement before sending back
* an error to the client.
*/
public void setRequestTimeoutMs(Integer requestTimeoutMs) {
this.requestTimeoutMs = requestTimeoutMs;
}
public Integer getDeliveryTimeoutMs() {
return deliveryTimeoutMs;
}
/**
* An upper bound on the time to report success or failure after a call to send() returns. This limits the total
* time that a record will be delayed prior to sending, the time to await acknowledgement from the broker (if
* expected), and the time allowed for retriable send failures.
*/
public void setDeliveryTimeoutMs(Integer deliveryTimeoutMs) {
this.deliveryTimeoutMs = deliveryTimeoutMs;
}
public Integer getQueueBufferingMaxMessages() {
return queueBufferingMaxMessages;
}
/**
* The maximum number of unsent messages that can be queued up the producer when using async mode before either the
* producer must be blocked or data must be dropped.
*/
public void setQueueBufferingMaxMessages(Integer queueBufferingMaxMessages) {
this.queueBufferingMaxMessages = queueBufferingMaxMessages;
}
public String getValueSerializer() {
return valueSerializer;