Skip to content

Commit

Permalink
fixes memleaks
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <oleh.dokuka@icloud.com>
  • Loading branch information
OlegDokuka committed Feb 23, 2024
1 parent 3285447 commit 748effe
Show file tree
Hide file tree
Showing 3 changed files with 364 additions and 46 deletions.
Expand Up @@ -3,6 +3,10 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.rsocket.core.StressSubscriber;
import io.rsocket.utils.FastLogger;

import java.util.Arrays;
import java.util.ConcurrentModificationException;
import org.openjdk.jcstress.annotations.Actor;
import org.openjdk.jcstress.annotations.Arbiter;
import org.openjdk.jcstress.annotations.Expect;
Expand All @@ -14,14 +18,17 @@
import org.openjdk.jcstress.infra.results.L_Result;
import reactor.core.Fuseable;
import reactor.core.publisher.Hooks;
import reactor.util.Logger;

public abstract class UnboundedProcessorStressTest {

static {
Hooks.onErrorDropped(t -> {});
}

final UnboundedProcessor unboundedProcessor = new UnboundedProcessor();
final Logger logger = new FastLogger(getClass().getName());

final UnboundedProcessor unboundedProcessor = new UnboundedProcessor(logger);

@JCStressTest
@Outcome(
Expand Down Expand Up @@ -145,6 +152,8 @@ public void arbiter(LLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -270,6 +279,8 @@ public void arbiter(LLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -375,6 +386,8 @@ public void arbiter(LLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -476,6 +489,8 @@ public void arbiter(LLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -578,6 +593,8 @@ public void arbiter(LLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -701,6 +718,8 @@ public void arbiter(LLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -781,6 +800,8 @@ public void arbiter(LLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -837,9 +858,15 @@ public void arbiter(LLL_Result r) {
+ stressSubscriber.onErrorCalls * 2
+ stressSubscriber.droppedErrors.size() * 3;

if (stressSubscriber.concurrentOnNext || stressSubscriber.concurrentOnComplete) {
throw new ConcurrentModificationException("boo");
}

stressSubscriber.values.forEach(ByteBuf::release);

r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -892,6 +919,8 @@ public void arbiter(LLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -1107,6 +1136,8 @@ public void arbiter(LLLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -1238,6 +1269,8 @@ public void arbiter(LLLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -1390,6 +1423,8 @@ public void arbiter(LLLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -1522,6 +1557,8 @@ public void arbiter(LLLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -1587,6 +1624,8 @@ public void arbiter(LLLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -1652,6 +1691,8 @@ public void arbiter(LLLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand All @@ -1678,6 +1719,16 @@ public void subscribe2() {
@Arbiter
public void arbiter(L_Result r) {
r.r1 = stressSubscriber1.onErrorCalls + stressSubscriber2.onErrorCalls;

checkOutcomes(this, r.toString(), logger);
}
}

static void checkOutcomes(Object instance, String result, Logger logger) {
if (Arrays.stream(instance.getClass().getDeclaredAnnotationsByType(Outcome.class))
.flatMap(o -> Arrays.stream(o.id()))
.noneMatch(s -> s.equalsIgnoreCase(result))) {
throw new RuntimeException(result + " " + logger);
}
}
}
137 changes: 137 additions & 0 deletions rsocket-core/src/jcstress/java/io/rsocket/utils/FastLogger.java
@@ -0,0 +1,137 @@
package io.rsocket.utils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import reactor.util.Logger;

/**
* Implementation of {@link Logger} which is based on the {@link ThreadLocal} based queue which
* collects all the events on the per-thread basis. </br> Such logger is designed to have all events
* stored during the stress-test run and then sorted and printed out once all the Threads completed
* execution (inside the {@link org.openjdk.jcstress.annotations.Arbiter} annotated method. </br>
* Note, this implementation only supports trace-level logs and ignores all others, it is intended
* to be used by {@link reactor.core.publisher.StateLogger}.
*/
public class FastLogger implements Logger {

final Map<Thread, List<String>> queues = new ConcurrentHashMap<>();

final ThreadLocal<List<String>> logsQueueLocal =
ThreadLocal.withInitial(
() -> {
final ArrayList<String> logs = new ArrayList<>(100);
queues.put(Thread.currentThread(), logs);
return logs;
});

private final String name;

public FastLogger(String name) {
this.name = name;
}

@Override
public String toString() {
return queues
.values()
.stream()
.flatMap(List::stream)
.sorted(
Comparator.comparingLong(
s -> {
Pattern pattern = Pattern.compile("\\[(.*?)]");
Matcher matcher = pattern.matcher(s);
matcher.find();
return Long.parseLong(matcher.group(1));
}))
.collect(Collectors.joining("\n"));
}

@Override
public String getName() {
return this.name;
}

@Override
public boolean isTraceEnabled() {
return true;
}

@Override
public void trace(String msg) {
logsQueueLocal.get().add(String.format("[%s] %s", System.nanoTime(), msg));
}

@Override
public void trace(String format, Object... arguments) {
trace(String.format(format, arguments));
}

@Override
public void trace(String msg, Throwable t) {
trace(String.format("%s, %s", msg, Arrays.toString(t.getStackTrace())));
}

@Override
public boolean isDebugEnabled() {
return false;
}

@Override
public void debug(String msg) {}

@Override
public void debug(String format, Object... arguments) {}

@Override
public void debug(String msg, Throwable t) {}

@Override
public boolean isInfoEnabled() {
return false;
}

@Override
public void info(String msg) {}

@Override
public void info(String format, Object... arguments) {}

@Override
public void info(String msg, Throwable t) {}

@Override
public boolean isWarnEnabled() {
return false;
}

@Override
public void warn(String msg) {}

@Override
public void warn(String format, Object... arguments) {}

@Override
public void warn(String msg, Throwable t) {}

@Override
public boolean isErrorEnabled() {
return false;
}

@Override
public void error(String msg) {}

@Override
public void error(String format, Object... arguments) {}

@Override
public void error(String msg, Throwable t) {}
}

0 comments on commit 748effe

Please sign in to comment.