Skip to content

Commit

Permalink
#39 LockDetectionIT
Browse files Browse the repository at this point in the history
  • Loading branch information
kpavlov committed Oct 21, 2018
1 parent 45508ba commit f6379e8
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 28 deletions.
@@ -0,0 +1,186 @@
package com.github.kpavlov.jreactive8583.example;


import com.github.kpavlov.jreactive8583.IsoMessageListener;
import com.github.kpavlov.jreactive8583.client.Iso8583Client;
import com.github.kpavlov.jreactive8583.server.Iso8583Server;
import com.solab.iso8583.IsoMessage;
import com.solab.iso8583.IsoType;
import io.netty.channel.ChannelHandlerContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.Assertions.assertThat;
import static org.slf4j.LoggerFactory.getLogger;

@ExtendWith(SpringExtension.class)
@ContextConfiguration(classes = TestConfig.class)
public class LockDetectionIT {

private static final int NUM_CLIENTS = 20;
private static final int NUM_MESSAGES = 100;
private static final Logger logger = getLogger(LockDetectionIT.class);
private static CountDownLatch latch = new CountDownLatch(NUM_CLIENTS * NUM_MESSAGES);
private static ThreadMXBean threadMXBean;
private static Thread monitoringThread;
private static AtomicInteger monitorDeadlockedCount = new AtomicInteger();
private static AtomicInteger deadlockedCount = new AtomicInteger();
@Autowired
private Iso8583Server<IsoMessage> server;
@Autowired
private ApplicationContext applicationContext;
private Iso8583Client[] clients = new Iso8583Client[NUM_CLIENTS];

@BeforeAll
public static void beforeAll() {
threadMXBean = ManagementFactory.getThreadMXBean();
assertThat(threadMXBean.isThreadContentionMonitoringSupported());
threadMXBean.setThreadContentionMonitoringEnabled(true);

monitoringThread = new Thread(() -> {
while (latch.getCount() > 0) {
detectThreadLocks();
}
});
}

private static void detectThreadLocks() {
try {
final long[] monitorDeadlockedThreads = threadMXBean.findMonitorDeadlockedThreads();
final long[] deadlockedThreads = threadMXBean.findDeadlockedThreads();

if (monitorDeadlockedThreads != null) {
monitorDeadlockedCount.addAndGet(monitorDeadlockedThreads.length);
for (long threadId : monitorDeadlockedThreads) {
logger.warn("MonitorDeadlocked: {}", threadMXBean.getThreadInfo(threadId));
}
}

if (deadlockedThreads != null) {
deadlockedCount.addAndGet(deadlockedThreads.length);
for (long threadId : deadlockedThreads) {
logger.warn("Deadlocked: {}", threadMXBean.getThreadInfo(threadId));
}
}

Thread.sleep(5);
} catch (Exception e) {
e.printStackTrace();
}
}

@BeforeEach
public void before() throws Exception {

configureServer(server);
server.start();

TestUtil.waitFor("server started", server::isStarted);

for (int i = 0; i < NUM_CLIENTS; i++) {
final Iso8583Client client = applicationContext.getBean(Iso8583Client.class);
configureClient(client);
clients[i] = client;
}
monitoringThread.start();
}

@AfterEach
public void shutdownServer() {
server.shutdown();
}

@Test
public void shouldProcessRequestsFromMultipleClientsWithoutDeadlocks() throws Exception {

for (Iso8583Client client : clients) {
new Thread(() -> {
try {

client.connect();
TestUtil.waitFor("client connected", client::isConnected);

for (int i = 0; i < NUM_MESSAGES; i++) {
final IsoMessage isoMessage = createRequest(client);
client.sendAsync(isoMessage);
}

} catch (InterruptedException e) {
//ok
}
}).start();
}

latch.await();
monitoringThread.join();

assertThat(monitorDeadlockedCount.get()).as("Monitor Deadlock Count").isEqualTo(0);
assertThat(deadlockedCount.get()).as("Deadlock Count").isEqualTo(0);
}

private void configureClient(Iso8583Client<IsoMessage> client) {
client.addMessageListener(new IsoMessageListener<IsoMessage>() {
@Override
public boolean applies(IsoMessage isoMessage) {
return isoMessage.getType() == 0x210;
}

@Override
public boolean onMessage(ChannelHandlerContext ctx, IsoMessage isoMessage) {
latch.countDown();
final long count = latch.getCount();
if (count % 100 == 0 || (count < 10 && count % 10 == 0)) {
logger.info("Responses left to process {}", count);
}
return false;
}
});
client.init();
}

private void configureServer(Iso8583Server<IsoMessage> server) {

server.addMessageListener(new IsoMessageListener<IsoMessage>() {

@Override
public boolean applies(IsoMessage isoMessage) {
return isoMessage.getType() == 0x200;
}

@Override
public boolean onMessage(ChannelHandlerContext ctx, IsoMessage isoMessage) {
// logger.info("{} Handling message {}", ctx.channel().id(), isoMessage);
final IsoMessage response = server.getIsoMessageFactory().createResponse(isoMessage);
response.setField(39, IsoType.ALPHA.value("00", 2));
response.setField(60, IsoType.LLLVAR.value("XXX", 3));
ctx.writeAndFlush(response);
try {
Thread.sleep(5);// to make it slow
} catch (InterruptedException e) {
//
}
return false;
}
});
server.init();
}

private IsoMessage createRequest(Iso8583Client client) {
final IsoMessage finMessage = client.getIsoMessageFactory().newMessage(0x0200);
finMessage.setField(60, IsoType.LLLVAR.value("foo", 3));
return finMessage;
}
}

This file was deleted.

Expand Up @@ -9,12 +9,15 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;

import static org.springframework.beans.factory.config.ConfigurableBeanFactory.SCOPE_PROTOTYPE;

@Configuration
public class Iso8583ClientConfig {

Expand All @@ -28,13 +31,15 @@ public class Iso8583ClientConfig {
private int idleTimeout;

@Bean
@Scope(SCOPE_PROTOTYPE)
public Iso8583Client<IsoMessage> iso8583Client() throws IOException {
SocketAddress socketAddress = new InetSocketAddress(host, port);

final ClientConfiguration configuration = ClientConfiguration.newBuilder()
// .addLoggingHandler()
.idleTimeout(idleTimeout)
.logSensitiveData(false)
.workerThreadsCount(4)
// .workerThreadsCount(4)
.build();

return new Iso8583Client<>(socketAddress, configuration, clientMessageFactory());
Expand Down
Expand Up @@ -21,8 +21,9 @@ public class Iso8583ServerConfig {
@Bean
public Iso8583Server<IsoMessage> iso8583Server() throws IOException {
final ServerConfiguration configuration = ServerConfiguration.newBuilder()
.logSensitiveData(false)
.workerThreadsCount(4)
// .addLoggingHandler()
// .logSensitiveData(false)
.workerThreadsCount(1)
.build();

return new Iso8583Server<>(port, configuration, serverMessageFactory());
Expand Down
20 changes: 2 additions & 18 deletions src/test/resources/simplelogger.properties
@@ -1,22 +1,6 @@
#
# Copyright 2013 The FIX.io Project
#
# The FIX.io Project licenses this file to you under the Apache License,
# version 2.0 (the "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#

#org.slf4j.simpleLogger.defaultLogLevel=debug
org.slf4j.simpleLogger.showDateTime = true
org.slf4j.simpleLogger.showShortLogName = true
org.slf4j.simpleLogger.showThreadName=true

org.slf4j.simpleLogger.log.org.jreactive.iso8583 = debug
org.slf4j.simpleLogger.log.com.github.kpavlov.jreactive8583=debug
org.slf4j.simpleLogger.log.com.github.kpavlov.jreactive8583.netty.pipeline.CompositeIsoMessageHandler=info

0 comments on commit f6379e8

Please sign in to comment.