Skip to content

Commit

Permalink
Handle ClassCastException and update version
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Schofield authored and Andrew Schofield committed Oct 24, 2019
1 parent 4680e2d commit 0684079
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 36 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ COPY --chown=esuser:esgroup --from=builder /opt/kafka/libs/ /opt/kafka/libs/
COPY --chown=esuser:esgroup --from=builder /opt/kafka/config/connect-distributed.properties /opt/kafka/config/
COPY --chown=esuser:esgroup --from=builder /opt/kafka/config/connect-log4j.properties /opt/kafka/config/
RUN mkdir /opt/kafka/logs && chown esuser:esgroup /opt/kafka/logs
COPY --chown=esuser:esgroup target/kafka-connect-mq-source-1.1.0-jar-with-dependencies.jar /opt/kafka/libs/
COPY --chown=esuser:esgroup target/kafka-connect-mq-source-1.1.1-jar-with-dependencies.jar /opt/kafka/libs/

WORKDIR /opt/kafka

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ You need an instance of Kafka Connect running in distributed mode. The Kafka dis

To start the MQ connector, you can use `config/mq-source.json` in this repository after replacing all placeholders and use a command like this:

```shell
``` shell
curl -X POST -H "Content-Type: application/json" http://localhost:8083/connectors \
--data "@./config/mq-source.json"
```
Expand All @@ -103,7 +103,7 @@ The deployment assumes the existence of a Secret called `connect-distributed-con
Create Secret for Kafka Connect configuration:
1. `cp kafka/config/connect-distributed.properties connect-distributed.properties.orig`
1. `sed '/^#/d;/^[[:space:]]*$/d' < connect-distributed.properties.orig > connect-distributed.properties`
1. `kubectl -n <namespace> create secret connect-distributed-config --from-file=connect-distributed.properties`
1. `kubectl -n <namespace> create secret generic connect-distributed-config --from-file=connect-distributed.properties`

Create ConfigMap for Kafka Connect Log4j configuration:
1. `cp kafka/config/connect-log4j.properties connect-log4j.properties.orig`
Expand Down
47 changes: 23 additions & 24 deletions UsingMQwithKafkaConnect.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,61 +33,60 @@ It is assumed that you have installed MQ, you're logged in as a user authorized
crtmqm -p 1414 MYQM
```

1. Start the queue manager:
2. Start the queue manager:
``` shell
strmqm MYQM
```

1. Start the `runmqsc` tool to configure the queue manager:
3. Start the `runmqsc` tool to configure the queue manager:
``` shell
runmqsc MYQM
```

1. In `runmqsc`, create a server-connection channel:
4. In `runmqsc`, create a server-connection channel:
```
DEFINE CHANNEL(MYSVRCONN) CHLTYPE(SVRCONN)
```

1. Set the channel authentication rules to accept connections requiring userid and password:
5. Set the channel authentication rules to accept connections requiring userid and password:
```
SET CHLAUTH(MYSVRCONN) TYPE(BLOCKUSER) USERLIST('nobody')
SET CHLAUTH('*') TYPE(ADDRESSMAP) ADDRESS('*') USERSRC(NOACCESS)
SET CHLAUTH(MYSVRCONN) TYPE(ADDRESSMAP) ADDRESS('*') USERSRC(CHANNEL) CHCKCLNT(REQUIRED)
```

1. Set the identity of the client connections based on the supplied context, the user ID:
6. Set the identity of the client connections based on the supplied context, the user ID:
```
ALTER AUTHINFO(SYSTEM.DEFAULT.AUTHINFO.IDPWOS) AUTHTYPE(IDPWOS) ADOPTCTX(YES)
```

1. Refresh the connection authentication information:
7. Refresh the connection authentication information:
```
REFRESH SECURITY TYPE(CONNAUTH)
```

1. Create a queue for the connector to use:
8. Create a queue for the connector to use:
```
DEFINE QLOCAL(MYQSOURCE)
```

1. Authorize `alice` to connect to and inquire the queue manager:
9. Authorize `alice` to connect to and inquire the queue manager:
```
SET AUTHREC OBJTYPE(QMGR) PRINCIPAL('alice') AUTHADD(CONNECT,INQ)
```

1. Finally authorize `alice` to use the queue:
10. Finally authorize `alice` to use the queue:
```
SET AUTHREC PROFILE(MYQSOURCE) OBJTYPE(QUEUE) PRINCIPAL('alice') AUTHADD(ALLMQI)
```

1. End `runmqsc`:
11. End `runmqsc`:
```
END
```

The queue manager is now ready to accept connection from Kafka Connect connectors.


### Set up Apache Kafka
These instructions assume you have Apache Kafka downloaded and running locally. See the [Apache Kafka quickstart guide](https://kafka.apache.org/quickstart) for more details.

Expand All @@ -96,11 +95,11 @@ These instructions assume you have Apache Kafka downloaded and running locally.
bin/zookeeper-server-start.sh config/zookeeper.properties
```

1. In another terminal, start a Kafka server:
2. In another terminal, start a Kafka server:
``` shell
bin/kafka-server-start.sh config/server.properties
```
1. Create a topic called `TSOURCE` for the connector to send events to:
3. Create a topic called `TSOURCE` for the connector to send events to:
``` shell
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic TSOURCE --partitions 1 --replication-factor 1
```
Expand Down Expand Up @@ -133,18 +132,18 @@ Configure and run the connector:
cp config/mq-source.properties ~
```

1. Edit the following properties in the `~/mq-source.properties` file to match the configuration so far:
```
mq.queue.manager=MYQM
mq.connection.name.list=localhost(1414)
mq.channel.name=MYSVRCONN
mq.queue=MYQSOURCE
mq.user.name=alice
mq.password=passw0rd
topic=TSOURCE
```
2. Edit the following properties in the `~/mq-source.properties` file to match the configuration so far:
```
topic=TSOURCE
mq.queue.manager=MYQM
mq.connection.name.list=localhost(1414)
mq.channel.name=MYSVRCONN
mq.queue=MYQSOURCE
mq.user.name=alice
mq.password=passw0rd
```

1. Change directory to the Kafka root directory. Start the connector worker replacing `<connector-root-directory>` and `<version>` with your directory and the connector version:
3. Change directory to the Kafka root directory. Start the connector worker replacing `<connector-root-directory>` and `<version>` with your directory and the connector version:
``` shell
CLASSPATH=<connector-root-directory>/target/kafka-connect-mq-source-<version>-jar-with-dependencies.jar bin/connect-standalone.sh config/connect-standalone.properties ~/mq-source.properties
```
Expand Down
5 changes: 3 additions & 2 deletions config/mq-source.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
{
"name": "mq-source",
"config":{
"config":
{
"connector.class": "com.ibm.eventstreams.connect.mqsource.MQSourceConnector",
"tasks.max": "1",
"topic": "<TOPIC>",

"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",

"mq.queue.manager": "<QUEUE_MANAGER>",
Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<groupId>com.ibm.eventstreams.connect</groupId>
<artifactId>kafka-connect-mq-source</artifactId>
<packaging>jar</packaging>
<version>1.1.0</version>
<version>1.1.1</version>
<name>kafka-connect-mq-source</name>
<organization>
<name>IBM Corporation</name>
Expand All @@ -45,13 +45,13 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>2.0.0</version>
<version>2.3.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>2.0.0</version>
<version>2.3.0</version>
<scope>provided</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ else if (connectionMode.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE
builder = c.newInstance();
builder.configure(props);
}
catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NullPointerException exc) {
catch (ClassNotFoundException | ClassCastException | IllegalAccessException | InstantiationException | NullPointerException exc) {
log.error("Could not instantiate message builder {}", builderClass);
throw new ConnectException("Could not instantiate message builder", exc);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public class MQSourceConnector extends SourceConnector {
public static final String CONFIG_DOCUMENTATION_TOPIC = "The name of the target Kafka topic.";
public static final String CONFIG_DISPLAY_TOPIC = "Target Kafka topic";

public static String VERSION = "1.1.0";
public static String VERSION = "1.1.1";

private Map<String, String> configProps;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017 IBM Corporation
* Copyright 2017, 2018, 2019 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright 2017 IBM Corporation
# Copyright 2017, 2019, 2019 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down

0 comments on commit 0684079

Please sign in to comment.