Skip to content

Commit

Permalink
#39 Add ConnectorConfiguration.getWorkerThreadsCount(), refactor buil…
Browse files Browse the repository at this point in the history
…der methods.
  • Loading branch information
kpavlov committed Mar 2, 2018
1 parent cbdb241 commit f1daf76
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 39 deletions.
Expand Up @@ -133,7 +133,8 @@ protected EventLoopGroup getBossEventLoopGroup() {
}

protected EventLoopGroup createWorkerEventLoopGroup() {
return new NioEventLoopGroup();
logger.debug("Creating worker EventLoopGroup with thread pool of {} threads", configuration.getWorkerThreadsCount());
return new NioEventLoopGroup(configuration.getWorkerThreadsCount());
}

protected EventLoopGroup getWorkerEventLoopGroup() {
Expand Down
Expand Up @@ -3,31 +3,32 @@
import com.github.kpavlov.jreactive8583.netty.pipeline.CompositeIsoMessageHandler;
import com.github.kpavlov.jreactive8583.netty.pipeline.EchoMessageListener;
import com.github.kpavlov.jreactive8583.netty.pipeline.IsoMessageLoggingHandler;
import io.netty.channel.EventLoopGroup;

public abstract class ConnectorConfiguration {

/**
* Default read/write idle timeout in seconds (ping interval) = 30 sec.
*
* @see #setIdleTimeout(int)
* @see #getIdleTimeout()
*/
public static final int DEFAULT_IDLE_TIMEOUT_SECONDS = 30;
static final int DEFAULT_IDLE_TIMEOUT_SECONDS = 30;

/**
* Default {@link #maxFrameLength} (max message length) = 8192
*
* @see #setMaxFrameLength(int)
* @see #getMaxFrameLength()
*/
public static final int DEFAULT_MAX_FRAME_LENGTH = 8192;
static final int DEFAULT_MAX_FRAME_LENGTH = 8192;
private final boolean addEchoMessageListener;
private int maxFrameLength = DEFAULT_MAX_FRAME_LENGTH;
private int idleTimeout = DEFAULT_IDLE_TIMEOUT_SECONDS;
private boolean replyOnError = false;
private boolean addLoggingHandler = true;
private boolean logSensitiveData = true;
private boolean logFieldDescription = true;
private final int workerThreadsCount;
private boolean replyOnError;
private boolean addLoggingHandler;
private boolean logSensitiveData;
private int[] sensitiveDataFields;

private boolean logFieldDescription;

protected ConnectorConfiguration(Builder builder) {
addLoggingHandler = builder.addLoggingHandler;
Expand All @@ -37,7 +38,8 @@ protected ConnectorConfiguration(Builder builder) {
maxFrameLength = builder.maxFrameLength;
replyOnError = builder.replyOnError;
sensitiveDataFields = builder.sensitiveDataFields;
this.addEchoMessageListener = builder.addEchoMessageListener;
addEchoMessageListener = builder.addEchoMessageListener;
workerThreadsCount = builder.workerThreadsCount;
}

/**
Expand Down Expand Up @@ -86,8 +88,8 @@ public void setMaxFrameLength(int maxFrameLength) {
}

/**
* @deprecated Use {@link Builder}
* @param addLoggingHandler should logging handler be added to pipeline
* @deprecated Use {@link Builder}
*/
@Deprecated
public void setAddLoggingHandler(boolean addLoggingHandler) {
Expand Down Expand Up @@ -148,8 +150,8 @@ public boolean logFieldDescription() {
}

/**
* @deprecated Use {@link Builder}
* @param logFieldDescription Should field descriptions be printed in log. Useful for when testing system integration.
* @deprecated Use {@link Builder}
*/
@Deprecated
public void setLogFieldDescription(boolean logFieldDescription) {
Expand Down Expand Up @@ -177,60 +179,143 @@ public void setSensitiveDataFields(int[] sensitiveDataFields) {
this.sensitiveDataFields = sensitiveDataFields;
}

/**
* Returns number of threads in worker {@link EventLoopGroup}.
*
* @implNote Default value is <code>Runtime.getRuntime().availableProcessors() * 16</code>
*/
public int getWorkerThreadsCount() {
return workerThreadsCount;
}

@SuppressWarnings({"unchecked", "unused"})
protected abstract static class Builder<B extends Builder> {
private int maxFrameLength = DEFAULT_MAX_FRAME_LENGTH;

private int idleTimeout = DEFAULT_IDLE_TIMEOUT_SECONDS;
private boolean replyOnError = false;

private boolean addLoggingHandler = true;
private boolean logSensitiveData = true;
private boolean addEchoMessageListener = false;
private boolean logFieldDescription = true;
private boolean logSensitiveData = true;
private boolean replyOnError = false;
private int idleTimeout = DEFAULT_IDLE_TIMEOUT_SECONDS;
private int maxFrameLength = DEFAULT_MAX_FRAME_LENGTH;
private int workerThreadsCount = Runtime.getRuntime().availableProcessors() * 16;
private int[] sensitiveDataFields;
private boolean addEchoMessageListener;

public B addEchoMessageListener() {
this.addEchoMessageListener = true;
return (B) this;
}

/**
* @deprecated Use {@link #addEchoMessageListener()} instead
*/
@Deprecated
public B withEchoMessageListener(boolean shouldAddEchoMessageListener) {
this.addEchoMessageListener = shouldAddEchoMessageListener;
return (B) this;
}

public B withMaxFrameLength(int maxFrameLength) {
this.maxFrameLength = maxFrameLength;
public B maxFrameLength(int length) {
this.maxFrameLength = length;
return (B) this;
}

public B withIdleTimeout(int idleTimeout) {
this.idleTimeout = idleTimeout;
/**
* @deprecated Use {@link #maxFrameLength(int)} instead
*/
@Deprecated
public B withMaxFrameLength(int length) {
return maxFrameLength(length);
}

public B idleTimeout(int timeout) {
this.idleTimeout = timeout;
return (B) this;
}

public B withReplyOnError(boolean replyOnError) {
this.replyOnError = replyOnError;
/**
* Use {@link #idleTimeout(int)} instead
*/
@Deprecated
public B withIdleTimeout(int timeout) {
return idleTimeout(timeout);
}

public B replyOnError(boolean doReply) {
this.replyOnError = doReply;
return (B) this;
}

/**
* @deprecated Use {@link #replyOnError(boolean)} instead
*/
@Deprecated
public B withReplyOnError(boolean doReply) {
return replyOnError(doReply);
}

public B addLoggingHandler() {
this.addLoggingHandler = true;
return (B) this;
}

/**
* @deprecated Use {@link #addLoggingHandler()} instead
*/
public B withAddLoggingHandler(boolean addLoggingHandler) {
this.addLoggingHandler = addLoggingHandler;
return (B) this;
}

/**
* Should log sensitive data (unmasked) or not.
* <p>
* Don't use on production!
*/
public B logSensitiveData(boolean logSensitiveData) {
this.logSensitiveData = logSensitiveData;
return (B) this;
}

/**
* @deprecated Use {@link #logSensitiveData(boolean)} instead
*/
public B withLogSensitiveData(boolean logSensitiveData) {
this.logSensitiveData = logSensitiveData;
return (B) this;
}

public B describeFieldsInLog() {
this.logFieldDescription = true;
return (B) this;
}

/**
* @param logFieldDescription
* @return
* @deprecated Use {@link #describeFieldsInLog()}
*/
@Deprecated
public B withLogFieldDescription(boolean logFieldDescription) {
this.logFieldDescription = logFieldDescription;
return (B) this;
}

public B withSensitiveDataFields(int... sensitiveDataFields) {
public B sensitiveDataFields(int... sensitiveDataFields) {
this.sensitiveDataFields = sensitiveDataFields;
return (B) this;
}

/**
* @deprecated Use {@link #sensitiveDataFields(int...)} instead
*/
@Deprecated
public B withSensitiveDataFields(int... sensitiveDataFields) {
return sensitiveDataFields(sensitiveDataFields);
}

public B workerThreadsCount(int numberOfThreads) {
this.workerThreadsCount = numberOfThreads;
return (B) this;
}
}
}
Expand Up @@ -53,11 +53,19 @@ public void setReconnectInterval(int reconnectInterval) {
public static class Builder extends ConnectorConfiguration.Builder<Builder> {
private int reconnectInterval = DEFAULT_RECONNECT_INTERVAL;

public Builder withReconnectInterval(int reconnectInterval) {
public Builder reconnectInterval(int reconnectInterval) {
this.reconnectInterval = reconnectInterval;
return this;
}

/**
* @deprecated Use {@link #reconnectInterval(int)} instead
*/
@Deprecated
public Builder withReconnectInterval(int reconnectInterval) {
return reconnectInterval(reconnectInterval);
}

public ClientConfiguration build() {
return new ClientConfiguration(this);
}
Expand Down
Expand Up @@ -10,9 +10,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.junit.Assert.assertThat;
import static org.assertj.core.api.Assertions.assertThat;

public class ClientServerIT extends AbstractIT {

Expand All @@ -28,9 +26,12 @@ public boolean applies(IsoMessage isoMessage) {

@Override
public boolean onMessage(ChannelHandlerContext ctx, IsoMessage isoMessage) {
final Integer stan = Integer.valueOf(isoMessage.getObjectValue(11));
receivedMessages.put(stan, isoMessage);
return true;
if (isoMessage.hasField(11)) {
final Integer stan = Integer.valueOf(isoMessage.getObjectValue(11));
receivedMessages.put(stan, isoMessage);
return true;
}
return false;
}
});
server.addMessageListener(new IsoMessageListener<IsoMessage>() {
Expand Down Expand Up @@ -66,8 +67,8 @@ public void shouldSendAsyncCaptureRequest() {
TestUtil.waitFor("capture request received", () -> receivedMessages.containsKey(stan));

IsoMessage capturedRequest = receivedMessages.remove(stan);
assertThat("fin request", capturedRequest, notNullValue());
assertThat("fin request", capturedRequest.debugString(), equalTo(finMessage.debugString()));
assertThat(capturedRequest).as("fin request").isNotNull();
assertThat(capturedRequest.debugString()).as("fin request string").isEqualTo(finMessage.debugString());
}


Expand Down
Expand Up @@ -32,8 +32,9 @@ public Iso8583Client<IsoMessage> iso8583Client() throws IOException {
SocketAddress socketAddress = new InetSocketAddress(host, port);

final ClientConfiguration configuration = ClientConfiguration.newBuilder()
.withIdleTimeout(idleTimeout)
.withLogSensitiveData(false)
.idleTimeout(idleTimeout)
.logSensitiveData(false)
.workerThreadsCount(4)
.build();

return new Iso8583Client<>(socketAddress, configuration, clientMessageFactory());
Expand Down
Expand Up @@ -21,7 +21,8 @@ public class Iso8583ServerConfig {
@Bean
public Iso8583Server<IsoMessage> iso8583Server() throws IOException {
final ServerConfiguration configuration = ServerConfiguration.newBuilder()
.withLogSensitiveData(false)
.logSensitiveData(false)
.workerThreadsCount(4)
.build();

return new Iso8583Server<>(port, configuration, serverMessageFactory());
Expand All @@ -34,4 +35,6 @@ private MessageFactory<IsoMessage> serverMessageFactory() throws IOException {
messageFactory.setAssignDate(true);
return messageFactory;
}


}

0 comments on commit f1daf76

Please sign in to comment.