Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/jdk upgrade 8 to 11 latest #52

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ out

# Version
/VERSION
*.class
/bin
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ Maven:
<dependency>
<groupId>com.opendxl</groupId>
<artifactId>dxldatabusclient</artifactId>
<version>2.4.10</version>
<version>2.5.0</version>
</dependency>
```
or Gradle:
```groovy
compile 'com.opendxl:dxldatabusclient:2.4.10'
compile 'com.opendxl:dxldatabusclient:2.5.0'
```

## Bugs and Feedback
Expand Down
40 changes: 28 additions & 12 deletions broker/src/broker/ClusterHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
package broker;

import kafka.admin.TopicCommand;
import kafka.admin.TopicCommand.TopicService;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.utils.SystemTime;
import scala.runtime.AbstractFunction0;

import java.io.File;
import java.nio.file.Files;
Expand All @@ -33,7 +33,8 @@ public class ClusterHelper {
private static int zookeeperPort = 2181;
private static Zookeeper zkNode;
private static List<KafkaBroker> brokers = new ArrayList<>();
private static final String ZKHOST = "localhost";
private static final String KAFKAHOST = "localhost";
private static int kafkaPort = 2181;
private static final int SESSION_TIMEOUT_MS = 30000;
private static final int CONNECTION_TIMEOUT_MS = 30000;
private static final int MAX_IN_FLIGHT_REQUESTS = 1000;
Expand All @@ -51,21 +52,38 @@ public void addNewKafkaTopic(final String topicName, final int replicationFactor
int partitions) throws Exception {
String[] arguments = {
"--create",
"--zookeeper", ZKHOST.concat(":").concat(String.valueOf(zookeeperPort)),
"--bootstrap-server", KAFKAHOST.concat(":").concat(String.valueOf(kafkaPort)),
"--replication-factor", String.valueOf(replicationFactor),
"--partitions", String.valueOf(partitions),
"--topic", topicName
};

TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions(arguments);
try (KafkaZkClient zkUtils = getZkClient(opts)) {
new TopicCommand.ZookeeperTopicService(zkUtils).createTopic(opts);
TopicService topicService=null;
try {
final AdminClient adminClient = createAdminClient();
topicService = new TopicCommand.TopicService(adminClient);
topicService.createTopic(opts);
} catch (Exception e) {
// In case of exceptions, abort topic creation.
throw new Exception("Error creating a new Kafka topic");
}finally{
if (topicService!=null) {
topicService.close();
}
}
}

public AdminClient createAdminClient() {
final Map<String, Object> props = new HashMap<>();
final Properties brokerConfig = brokers.get(0).getBrokerConfig();
final String bootstrapServer = brokerConfig.getProperty("host.name")
.concat(":")
.concat(brokerConfig.getProperty("port"));
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
return AdminClient.create(props);
}

public ClusterHelper addBroker(final int port) {
checkCluster();
Properties config = getConfig(port);
Expand Down Expand Up @@ -166,19 +184,17 @@ private Collection<Node> describe() {
}

private KafkaZkClient getZkClient(TopicCommand.TopicCommandOptions opts) {
final String connectString = opts.zkConnect().getOrElse(new AbstractFunction0<String>() {
@Override
public String apply() {
return "";
} });
final String connectString = "";

return KafkaZkClient.apply(connectString,
JaasUtils.isZkSecurityEnabled(),
JaasUtils.isZkSaslEnabled(),
SESSION_TIMEOUT_MS,
CONNECTION_TIMEOUT_MS,
MAX_IN_FLIGHT_REQUESTS,
new SystemTime(),
"", null,
METRIC_GROUP,
METRIC_TYPE, null);
METRIC_TYPE, false);
}

}
37 changes: 23 additions & 14 deletions broker/src/broker/KafkaBroker.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
package broker;

import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import kafka.server.KafkaServer;
import scala.Option;

import org.apache.commons.io.FileUtils;
import org.apache.kafka.common.utils.SystemTime;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;

Expand All @@ -17,8 +20,7 @@
public class KafkaBroker {

private Properties brokerConfig;
private Zookeeper zookeeper;
private KafkaServerStartable broker;
private KafkaServer kafkaServer;
private static final Logger LOG = LoggerFactory.getLogger(KafkaBroker.class);

public KafkaBroker(final Properties brokerConfig) {
Expand All @@ -35,12 +37,19 @@ public void run() {
}
}));

try {
final KafkaConfig kafkaConfig = new KafkaConfig(brokerConfig);
kafkaServer = new KafkaServer(kafkaConfig, new SystemTime(),
Option.apply(this.getClass().getName()), true);
kafkaServer.startup();

LOG.info("Kafka broker started: " + brokerConfig.getProperty("host.name")
.concat(":")
.concat(brokerConfig.getProperty("port")));

broker = new KafkaServerStartable(new KafkaConfig(brokerConfig));
broker.startup();
LOG.info("Kafka broker started: " + brokerConfig.getProperty("host.name")
.concat(":")
.concat(brokerConfig.getProperty("port")));
} catch (Exception e) {
System.out.println(e.getMessage());
}
}

private Runnable getDeleteLogDirectoryAction() {
Expand All @@ -63,13 +72,13 @@ public void run() {
}

public synchronized void shutdown() {
if (broker != null) {
broker.shutdown();
broker.awaitShutdown();
if(kafkaServer != null){
kafkaServer.shutdown();
kafkaServer.awaitShutdown();
LOG.info("Kafka broker stopped: " + brokerConfig.getProperty("host.name")
.concat(":")
.concat(brokerConfig.getProperty("port")));
broker = null;
.concat(":")
.concat(brokerConfig.getProperty("port")));
kafkaServer = null;
}
}

Expand Down
39 changes: 22 additions & 17 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ plugins {

group 'com.opendxl'

sourceCompatibility = 1.8
sourceCompatibility = 11

apply plugin: 'base'
apply plugin: 'java'
Expand Down Expand Up @@ -54,44 +54,48 @@ jacocoTestReport {
configurations {
kafkaInMemory
sampleJars.extendsFrom testImplementation
// Added below to get away of VSCode complaints - This might need only on local
// Comment it out before generating final artifacts
implementation.extendsFrom kafkaInMemory
}

dependencies {
implementation ('org.apache.kafka:kafka-clients:2.3.1') {
implementation ('org.apache.kafka:kafka-clients:3.1.2') {
exclude group: 'org.scala-lang', module: 'scala-reflect'
exclude group: 'org.lz4', module: 'lz4-java'
}
implementation ('org.apache.kafka:kafka-streams:2.3.1') {
implementation ('org.apache.kafka:kafka-streams:3.1.2') {
exclude group: 'org.scala-lang', module: 'scala-reflect'
}
implementation 'org.scala-lang:scala-reflect:2.12.11'
implementation 'org.lz4:lz4-java:1.7.1'
implementation 'org.scala-lang:scala-reflect:2.13.8'
implementation 'org.lz4:lz4-java:1.8.0'
implementation 'org.apache.avro:avro:1.11.0'
implementation 'com.google.code.gson:gson:2.8.9'
implementation 'com.google.code.gson:gson:2.9.0'
implementation 'org.apache.commons:commons-configuration2:2.8.0'
implementation 'commons-lang:commons-lang:2.6'
implementation 'org.slf4j:slf4j-api:1.7.30'
implementation 'org.slf4j:slf4j-api:1.7.36'
implementation 'net.sf.jopt-simple:jopt-simple:5.0.4'
testImplementation('org.apache.kafka:kafka_2.12:2.3.1') {
testImplementation('org.apache.kafka:kafka_2.13:3.1.2') {
exclude group: 'org.scala-lang', module: 'scala-reflect'
}
testImplementation 'org.scala-lang:scala-reflect:2.12.11'
testImplementation 'org.scala-lang:scala-reflect:2.13.8'
testImplementation 'org.apache.zookeeper:zookeeper:3.5.7'
testImplementation('io.netty:netty-all:4.1.43.Final') {
testImplementation('io.netty:netty-all:4.1.91.Final') {
force = true
}
testImplementation 'commons-io:commons-io:2.7'
testImplementation 'commons-io:commons-io:2.11.0'
testImplementation 'junit:junit:4.12'
testImplementation 'com.github.stefanbirkner:system-rules:1.19.0'
testImplementation 'com.e-movimento.tinytools:privilegedaccessor:1.2.2'

kafkaInMemory ('org.apache.kafka:kafka_2.12:2.3.1') {
kafkaInMemory ('org.apache.kafka:kafka_2.13:3.1.2') {
exclude group: 'org.scala-lang', module: 'scala-reflect'
exclude group: 'org.lz4', module: 'lz4-java'
exclude group: 'io.netty'
}
kafkaInMemory 'org.scala-lang:scala-reflect:2.12.11'
kafkaInMemory 'org.lz4:lz4-java:1.7.1'
kafkaInMemory 'commons-io:commons-io:2.7'
kafkaInMemory 'org.scala-lang:scala-reflect:2.13.8'
kafkaInMemory 'org.lz4:lz4-java:1.8.0'
kafkaInMemory 'commons-io:commons-io:2.11.0'

// This following section mitigates OWASP vulnerabilities report.
// It enforces to use specific transitive dependency versions
Expand All @@ -108,10 +112,11 @@ dependencies {
// http://web.nvd.nist.gov/view/vuln/detail?vulnId=CVE-2019-16942
// http://web.nvd.nist.gov/view/vuln/detail?vulnId=CVE-2019-16943
// CVE-2020-25649
compile ('com.fasterxml.jackson.core:jackson-databind:2.13.2.2') {
compile ('com.fasterxml.jackson.core:jackson-databind:2.14.2') {
force = true
}
kafkaInMemory ('com.fasterxml.jackson.core:jackson-databind:2.13.2.2') {
compile 'com.fasterxml.jackson.module:jackson-module-scala_3:2.14.2'
kafkaInMemory ('com.fasterxml.jackson.core:jackson-databind:2.14.2') {
force = true
}

Expand Down
9 changes: 4 additions & 5 deletions docs/CLI-Example.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ library with no arguments displays help information:

::

$ java -jar dxldatabusclient-2.4.10.jar
$ java -jar dxldatabusclient-2.5.0.jar

ERROR: There are not options
Option (* = required) Description
Expand Down Expand Up @@ -49,8 +49,7 @@ few CLI operations. Operations arguments are placed after
``--operation`` option. For instance:

::

$ java -jar dxldatabusclient-2.4.10.jar --operation <OPERATION_ARGUMENT> ...
$ java -jar dxldatabusclient-2.5.0.jar --operation <OPERATION_ARGUMENT> ...

Operation Arguments
^^^^^^^^^^^^^^^^^^^
Expand Down Expand Up @@ -123,7 +122,7 @@ example

::

$ java -jar dxldatabusclient-2.4.10.jar \
$ java -jar dxldatabusclient-2.5.0.jar \
--operation produce \
--to-topic <TOPIC_NAME> \
--brokers <0.0.0.0>:<PORT> \
Expand Down Expand Up @@ -230,7 +229,7 @@ example

::

java -jar dxldatabusclient-2.4.10.jar \
java -jar dxldatabusclient-2.5.0.jar \
--operation consume \
--from-topic <TOPIC_1,TOPIC_2,...,TOPIC_N> \
--brokers <BROKER_1_IP:BROKER_1_PORT,BROKER_2_PORT:BROKER_2_PORT,...> \
Expand Down
4 changes: 2 additions & 2 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ Maven:
<dependency>
<groupId>com.opendxl</groupId>
<artifactId>dxldatabusclient</artifactId>
<version>2.4.10</version>
<version>2.5.0</version>
</dependency>

or Gradle:

.. code:: groovy

compile 'com.opendxl:dxldatabusclient:2.4.10'
compile 'com.opendxl:dxldatabusclient:2.5.0'

API Documentation
-----------------
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=2.4.10
version=2.5.0
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.10.3-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-5.6-all.zip
11 changes: 6 additions & 5 deletions sample/src/sample/BasicStreamingExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,13 @@ private Runnable stateQuery() {
return () -> {

while(true) {
ReadOnlyKeyValueStore<String, DatabusMessage> keyValueStore =
this.stream.store("keyvaluestore", QueryableStoreTypes.keyValueStore());
StoreQueryParameters<ReadOnlyKeyValueStore<String, DatabusMessage>> storeQryParam = StoreQueryParameters
.fromNameAndType("keyvaluestore", QueryableStoreTypes.keyValueStore());
ReadOnlyKeyValueStore<String, DatabusMessage> keyValueStore = this.stream.store(storeQryParam);

KeyValueIterator<String, DatabusMessage> iter = keyValueStore.all();
while (iter.hasNext()) {
KeyValue<String, DatabusMessage> entry = iter.next();
KeyValueIterator<String, DatabusMessage> keyValueStoreIter = keyValueStore.all();
while(keyValueStoreIter.hasNext()){
KeyValue<String, DatabusMessage> entry = keyValueStoreIter.next();
LOG.info(entry.key + entry.value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ public DatabusProducerRecordAdapter(final Serializer<P> messageSerializer) {
sourceProducerRecord.getRoutingData().getTenantGroup());
final List<Header> kafkaHeaders = produceKafkaHeaders
? generateKafkaHeaders(databusMessage.getHeaders()) : null;
System.out.println("produceKafkaHeaders: " + produceKafkaHeaders);
System.out.println("kafkaHeaders: " + kafkaHeaders);
final org.apache.kafka.clients.producer.ProducerRecord<String, DatabusMessage> targetProducerRecord =
new org.apache.kafka.clients.producer.ProducerRecord<>(targetTopic,
sourceProducerRecord.getRoutingData().getPartition(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.opendxl.databus.common.internal.util;

import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.concurrent.TimeUnit;

public final class TimeUnitUtil {
private TimeUnitUtil() {
//not called
}
public static TemporalUnit convert(final TimeUnit timeUnit) {
switch (timeUnit) {
case NANOSECONDS:
return ChronoUnit.NANOS;
case MICROSECONDS:
return ChronoUnit.MICROS;
case MILLISECONDS:
return ChronoUnit.MILLIS;
case SECONDS:
return ChronoUnit.SECONDS;
case MINUTES:
return ChronoUnit.MINUTES;
case HOURS:
return ChronoUnit.HOURS;
case DAYS:
return ChronoUnit.DAYS;
default:
return ChronoUnit.SECONDS;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Collections;
Expand Down