Skip to content

Commit

Permalink
Merge pull request #114 from jhughes24816/fix-multiple-jms-connections
Browse files Browse the repository at this point in the history
Fix multiple jms connections
  • Loading branch information
jhughes24816 committed May 26, 2023
2 parents ebc736a + d3f4838 commit 724eabc
Show file tree
Hide file tree
Showing 7 changed files with 13 additions and 15 deletions.
4 changes: 2 additions & 2 deletions .github/ISSUE_TEMPLATE/BUG-REPORT.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ body:
label: Version
description: What version of our software are you running?
options:
- 1.3.2 (Default)
- older (<1.3.2)
- 1.3.3 (Default)
- older (<1.3.3)
validations:
required: true
- type: textarea
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ COPY --chown=esuser:esgroup --from=builder /opt/kafka/libs/ /opt/kafka/libs/
COPY --chown=esuser:esgroup --from=builder /opt/kafka/config/connect-distributed.properties /opt/kafka/config/
COPY --chown=esuser:esgroup --from=builder /opt/kafka/config/connect-log4j.properties /opt/kafka/config/
RUN mkdir /opt/kafka/logs && chown esuser:esgroup /opt/kafka/logs
COPY --chown=esuser:esgroup target/kafka-connect-mq-source-1.3.2-jar-with-dependencies.jar /opt/kafka/libs/
COPY --chown=esuser:esgroup target/kafka-connect-mq-source-1.3.3-jar-with-dependencies.jar /opt/kafka/libs/

WORKDIR /opt/kafka

Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ curl -X POST -H "Content-Type: application/json" http://localhost:8083/connector
This repository includes an example Dockerfile to run Kafka Connect in distributed mode. It also adds in the MQ source connector as an available connector plugin. It uses the default `connect-distributed.properties` and `connect-log4j.properties` files.

1. `mvn clean package`
1. `docker build -t kafkaconnect-with-mq-source:1.3.2 .`
1. `docker run -p 8083:8083 kafkaconnect-with-mq-source:1.3.2`
1. `docker build -t kafkaconnect-with-mq-source:1.3.3 .`
1. `docker run -p 8083:8083 kafkaconnect-with-mq-source:1.3.3`

**NOTE:** To provide custom properties files create a folder called `config` containing the `connect-distributed.properties` and `connect-log4j.properties` files and use a Docker volume to make them available when running the container like this:

``` shell
docker run -v $(pwd)/config:/opt/kafka/config -p 8083:8083 kafkaconnect-with-mq-source:1.3.2
docker run -v $(pwd)/config:/opt/kafka/config -p 8083:8083 kafkaconnect-with-mq-source:1.3.3
```

To start the MQ connector, you can use `config/mq-source.json` in this repository after replacing all placeholders and use a command like this:
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<groupId>com.ibm.eventstreams.connect</groupId>
<artifactId>kafka-connect-mq-source</artifactId>
<packaging>jar</packaging>
<version>1.3.2</version>
<version>1.3.3</version>
<name>kafka-connect-mq-source</name>
<organization>
<name>IBM Corporation</name>
Expand Down Expand Up @@ -63,7 +63,7 @@
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.allclient</artifactId>
<version>9.1.5.0</version>
<version>9.3.0.0</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public class JMSReader {
private AtomicBoolean closeNow = new AtomicBoolean(); // Whether close has been requested
private long reconnectDelayMillis = reconnectDelayMillisMin; // Delay between repeated reconnect attempts

private static long receiveTimeout = 30000L;
private static long receiveTimeout = 2000L;
private static long reconnectDelayMillisMin = 64L;
private static long reconnectDelayMillisMax = 8192L;

Expand Down Expand Up @@ -258,10 +258,8 @@ public SourceRecord receive(final boolean wait) {
SourceRecord sr = null;
try {
if (wait) {
while (m == null && !closeNow.get()) {
log.debug("Waiting {} ms for message", receiveTimeout);
m = jmsCons.receive(receiveTimeout);
}
log.debug("Waiting {} ms for message", receiveTimeout);
m = jmsCons.receive(receiveTimeout);

if (m == null) {
log.debug("No message received");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public class MQSourceConnector extends SourceConnector {
public static final String CONFIG_DOCUMENTATION_TOPIC = "The name of the target Kafka topic.";
public static final String CONFIG_DISPLAY_TOPIC = "Target Kafka topic";

public static String version = "1.3.2";
public static String version = "1.3.3";

private Map<String, String> configProps;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public List<SourceRecord> poll() throws InterruptedException {

try {
if (!stopNow.get()) {
log.info("Polling for records");
log.debug("Polling for records");
SourceRecord src;
do {
// For the first message in the batch, wait a while if no message
Expand Down

0 comments on commit 724eabc

Please sign in to comment.