Skip to content

Commit

Permalink
Merge pull request #19 from ibm-messaging/1.0.2
Browse files Browse the repository at this point in the history
Version 1.0.2
  • Loading branch information
AndrewJSchofield committed Jan 23, 2019
2 parents 0ca9325 + e796e00 commit 9cf3711
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 69 deletions.
15 changes: 4 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,6 @@ You will need to put the public part of the client's certificate in the queue ma
For troubleshooting, or to better understand the handshake performed by the IBM MQ Java client application in combination with your specific JSSE provider, you can enable debugging by setting `javax.net.debug=ssl` in the JVM environment.


## Performance and syncpoint limit
The connector uses a transacted JMS session to receive messages from MQ in syncpoint and periodically commits the in-flight transaction. This has the effect of batching messages together for improved efficiency. However, the frequency of committing transactions is controlled by the Kafka Connect framework rather than the connector. The connector is only able to receive up to the queue manager's maximum uncommitted message limit (typically 10000 messages) before committing.

By default, Kafka Connect only commits every 60 seconds (10 seconds for the standalone worker), meaning that each task is limited to a rate of about 166 messages per second. You can increase the frequency of committing by using the `offset.flush.interval.ms` configuration in the worker configuration file. For example, if you set `offset.flush.interval.ms=5000`, the connector commits every 5 seconds increasing the maximum rate per task to about 2000 messages per second.

If messages are being received faster than they can be committed, the connector prints a message `Uncommitted message limit reached` and sleeps for a short delay. You should use this as an indication to set the `offset.flush.interval.ms` to a lower value, or increase the number of tasks.


## Configuration
The configuration options for the Kafka Connect source connector for IBM MQ are as follows:

Expand All @@ -181,10 +173,11 @@ The configuration options for the Kafka Connect source connector for IBM MQ are
| mq.record.builder.key.header | The JMS message header to use as the Kafka record key | string | | JMSMessageID, JMSCorrelationID, JMSCorrelationIDAsBytes |
| mq.ssl.cipher.suite | The name of the cipher suite for TLS (SSL) connection | string | | Blank or valid cipher suite |
| mq.ssl.peer.name | The distinguished name pattern of the TLS (SSL) peer | string | | Blank or DN pattern |
| mq.batch.size | The maximum number of messages in a batch (unit of work) | integer | 250 | 1 or greater |
| topic | The name of the target Kafka topic | string | | Topic name |

### Using a CCDT file
Some of the connection details for MQ can be provided in a [CCDT file](https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.1.0/com.ibm.mq.con.doc/q016730_.htm) by setting `mq.ccdt.url` in the Kafka Connect source connector configuration file. If using a CCDT file the `mq.connection.name.list` and `mq.channel.name` configuration options are not required.
Some of the connection details for MQ can be provided in a [CCDT file](https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.1.0/com.ibm.mq.con.doc/q016730_.htm) by setting `mq.ccdt.url` in the MQ source connector configuration file. If using a CCDT file the `mq.connection.name.list` and `mq.channel.name` configuration options are not required.

### Externalizing secrets
[KIP 297](https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations) introduced a mechanism to externalize secrets to be used as configuration for Kafka connectors.
Expand Down Expand Up @@ -215,7 +208,7 @@ mq.password=${file:mq-secret.properties:secret-key}

### Unable to connect to Kafka

You may receive an `org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed` error when trying to run the MQ Source Connector using SSL to connect to your Kafka cluster. In the case that the error is caused by the following exception: `Caused by: java.security.cert.CertificateException: No subject alternative DNS name matching XXXXX found.`, Java may be replacing the IP address of your cluster with the corresponding hostname in your `/etc/hosts` file. For example, to push Docker images to a custom Docker repository, you may add an entry in this file which corresponds to the IP of your repository e.g. `123.456.78.90 mycluster.icp`. To fix this, you can comment out this line in your `/etc/hosts` file.
You may receive an `org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed` error when trying to run the MQ source connector using SSL to connect to your Kafka cluster. In the case that the error is caused by the following exception: `Caused by: java.security.cert.CertificateException: No subject alternative DNS name matching XXXXX found.`, Java may be replacing the IP address of your cluster with the corresponding hostname in your `/etc/hosts` file. For example, to push Docker images to a custom Docker repository, you may add an entry in this file which corresponds to the IP of your repository e.g. `123.456.78.90 mycluster.icp`. To fix this, you can comment out this line in your `/etc/hosts` file.


## Support
Expand All @@ -227,7 +220,7 @@ For issues relating specifically to this connector, please use the [GitHub issue


## License
Copyright 2017, 2018 IBM Corporation
Copyright 2017, 2018, 2019 IBM Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<groupId>com.ibm.eventstreams.connect</groupId>
<artifactId>kafka-connect-mq-source</artifactId>
<packaging>jar</packaging>
<version>1.0.2-beta</version>
<version>1.0.2</version>
<name>kafka-connect-mq-source</name>
<organization>
<name>IBM Corporation</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,17 @@ public class MQSourceConnector extends SourceConnector {
public static final String CONFIG_DOCUMENTATION_MQ_SSL_PEER_NAME = "The distinguished name pattern of the TLS (SSL) peer.";
public static final String CONFIG_DISPLAY_MQ_SSL_PEER_NAME = "SSL peer name";

public static final String CONFIG_NAME_MQ_BATCH_SIZE = "mq.batch.size";
public static final String CONFIG_DOCUMENTATION_MQ_BATCH_SIZE = "The maximum number of messages in a batch. A batch uses a single unit of work.";
public static final String CONFIG_DISPLAY_MQ_BATCH_SIZE = "Batch size";
public static final int CONFIG_VALUE_MQ_BATCH_SIZE_DEFAULT = 250;
public static final int CONFIG_VALUE_MQ_BATCH_SIZE_MINIMUM = 1;

public static final String CONFIG_NAME_TOPIC = "topic";
public static final String CONFIG_DOCUMENTATION_TOPIC = "The name of the target Kafka topic.";
public static final String CONFIG_DISPLAY_TOPIC = "Target Kafka topic";

public static String VERSION = "1.0.2-beta";
public static String VERSION = "1.0.2";

private Map<String, String> configProps;

Expand Down Expand Up @@ -229,6 +235,11 @@ public class MQSourceConnector extends SourceConnector {
CONFIG_DOCUMENTATION_MQ_SSL_PEER_NAME, CONFIG_GROUP_MQ, 13, Width.MEDIUM,
CONFIG_DISPLAY_MQ_SSL_PEER_NAME);

config.define(CONFIG_NAME_MQ_BATCH_SIZE, Type.INT, CONFIG_VALUE_MQ_BATCH_SIZE_DEFAULT,
ConfigDef.Range.atLeast(CONFIG_VALUE_MQ_BATCH_SIZE_MINIMUM), Importance.LOW,
CONFIG_DOCUMENTATION_MQ_BATCH_SIZE, CONFIG_GROUP_MQ, 14, Width.MEDIUM,
CONFIG_DISPLAY_MQ_BATCH_SIZE);

config.define(CONFIG_NAME_TOPIC, Type.STRING, null, Importance.HIGH,
CONFIG_DOCUMENTATION_TOPIC, null, 0, Width.MEDIUM,
CONFIG_DISPLAY_TOPIC);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@
public class MQSourceTask extends SourceTask {
private static final Logger log = LoggerFactory.getLogger(MQSourceTask.class);

private static int BATCH_SIZE = 250; // The maximum number of records returned per call to poll()
// The maximum number of records returned per call to poll()
private int batchSize = MQSourceConnector.CONFIG_VALUE_MQ_BATCH_SIZE_DEFAULT;
private CountDownLatch batchCompleteSignal = null; // Used to signal completion of a batch
private AtomicInteger pollCycle = new AtomicInteger(1); // Incremented each time poll() is called
private int lastCommitPollCycle = 0; // The value of pollCycle the last time commit() was called
private AtomicBoolean receivingMessages = new AtomicBoolean(); // Whether currently receiving messages
private AtomicBoolean stopNow = new AtomicBoolean(); // Whether stop has been requested

private JMSReader reader;
Expand Down Expand Up @@ -69,6 +71,11 @@ public MQSourceTask() {
log.debug("Task props entry {} : {}", entry.getKey(), value);
}

String strBatchSize = props.get(MQSourceConnector.CONFIG_NAME_MQ_BATCH_SIZE);
if (strBatchSize != null) {
batchSize = Integer.parseInt(strBatchSize);
}

// Construct a reader to interface with MQ
reader = new JMSReader();
reader.configure(props);
Expand Down Expand Up @@ -106,22 +113,40 @@ public MQSourceTask() {
int currentPollCycle = pollCycle.incrementAndGet();
log.debug("Starting poll cycle {}", currentPollCycle);

if (!stopNow.get()) {
log.info("Polling for records");
SourceRecord src;
do {
// For the first message in the batch, wait a while if no message
src = reader.receive(messageCount == 0);
if (src != null) {
msgs.add(src);
messageCount++;
}
} while ((src != null) && (messageCount < BATCH_SIZE) && !stopNow.get());
try {
receivingMessages.set(true);

if (!stopNow.get()) {
log.info("Polling for records");
SourceRecord src;
do {
// For the first message in the batch, wait a while if no message
src = reader.receive(messageCount == 0);
if (src != null) {
msgs.add(src);
messageCount++;
}
} while ((src != null) && (messageCount < batchSize) && !stopNow.get());
}
else {
log.info("Stopping polling for records");
}
}
finally {
receivingMessages.set(false);
}

synchronized(this) {
if (messageCount > 0) {
batchCompleteSignal = new CountDownLatch(messageCount);
if (!stopNow.get()) {
batchCompleteSignal = new CountDownLatch(messageCount);
}
else {
// Discard this batch - we've rolled back when the connection to MQ was closed in stop()
log.debug("Discarding a batch of {} records as task is stopping", messageCount);
msgs.clear();
batchCompleteSignal = null;
}
}
else {
batchCompleteSignal = null;
Expand Down Expand Up @@ -157,7 +182,6 @@ public void commit() throws InterruptedException {
// batch complete signal directly.
int currentPollCycle = pollCycle.get();
log.debug("Commit starting in poll cycle {}", currentPollCycle);
boolean willShutdown = false;

if (lastCommitPollCycle == currentPollCycle)
{
Expand All @@ -171,25 +195,10 @@ public void commit() throws InterruptedException {
batchCompleteSignal.countDown();
}
}
else if (stopNow.get()) {
log.debug("Shutting down with empty batch after delay");
willShutdown = true;
}
}
}
else {
lastCommitPollCycle = currentPollCycle;

synchronized (this) {
if ((batchCompleteSignal == null) && stopNow.get()) {
log.debug("Shutting down with empty batch");
willShutdown = true;
}
}
}

if (willShutdown) {
shutdown();
}

log.trace("[{}] Exit {}.commit", Thread.currentThread().getId(), this.getClass().getName());
Expand All @@ -210,16 +219,20 @@ else if (stopNow.get()) {

stopNow.set(true);

boolean willShutdown = false;
boolean willClose = false;

synchronized(this) {
if (batchCompleteSignal == null) {
willShutdown = true;
if (receivingMessages.get()) {
log.debug("Will close connection");
willClose = true;
}
}

if (willShutdown) {
shutdown();
if (willClose) {
// Close the connection to MQ to clean up
if (reader != null) {
reader.close();
}
}

log.trace("[{}] Exit {}.stop", Thread.currentThread().getId(), this.getClass().getName());
Expand Down Expand Up @@ -247,20 +260,4 @@ else if (stopNow.get()) {

log.trace("[{}] Exit {}.commitRecord", Thread.currentThread().getId(), this.getClass().getName());
}

/**
* <p>
* Shuts down the task, releasing any resource held by the task.
* </p>
*/
private void shutdown() {
log.trace("[{}] Entry {}.shutdown", Thread.currentThread().getId(), this.getClass().getName());

// Close the connection to MQ to clean up
if (reader != null) {
reader.close();
}

log.trace("[{}] Exit {}.shutdown", Thread.currentThread().getId(), this.getClass().getName());
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2018 IBM Corporation
* Copyright 2018, 2019 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -115,6 +115,8 @@ SchemaAndValue getKey(JMSContext context, String topic, Message message) throws
keySchema = Schema.OPTIONAL_BYTES_SCHEMA;
key = message.getJMSCorrelationIDAsBytes();
break;
default:
break;
}

return new SchemaAndValue(keySchema, key);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017, 2018 IBM Corporation
* Copyright 2017, 2018, 2019 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,7 +24,6 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017, 2018 IBM Corporation
* Copyright 2017, 2018, 2019 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,11 +24,9 @@
import javax.jms.Message;
import javax.jms.TextMessage;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.source.SourceRecord;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down

0 comments on commit 9cf3711

Please sign in to comment.