Skip to content

Commit

Permalink
Merge #1106 into 1.2.0-M1
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <oleh.dokuka@icloud.com>

# Conflicts:
#	build.gradle
#	gradle.properties
  • Loading branch information
OlegDokuka committed Feb 27, 2024
2 parents 6d07389 + f591f9d commit 6e59179
Show file tree
Hide file tree
Showing 6 changed files with 381 additions and 63 deletions.
28 changes: 14 additions & 14 deletions build.gradle
Expand Up @@ -16,11 +16,11 @@

plugins {
id 'com.github.sherter.google-java-format' version '0.9' apply false
id 'me.champeau.jmh' version '0.6.7' apply false
id 'io.spring.dependency-management' version '1.0.15.RELEASE' apply false
id 'me.champeau.jmh' version '0.7.1' apply false
id 'io.spring.dependency-management' version '1.1.0' apply false
id 'io.morethan.jmhreport' version '0.9.0' apply false
id 'io.github.reyerizo.gradle.jcstress' version '0.8.13' apply false
id 'com.github.vlsi.gradle-extensions' version '1.76' apply false
id 'io.github.reyerizo.gradle.jcstress' version '0.8.15' apply false
id 'com.github.vlsi.gradle-extensions' version '1.89' apply false
}

boolean isCiServer = ["CI", "CONTINUOUS_INTEGRATION", "TRAVIS", "CIRCLECI", "bamboo_planKey", "GITHUB_ACTION"].with {
Expand All @@ -34,20 +34,20 @@ subprojects {
apply plugin: 'com.github.vlsi.gradle-extensions'

ext['reactor-bom.version'] = '2022.0.7-SNAPSHOT'
ext['logback.version'] = '1.2.10'
ext['netty-bom.version'] = '4.1.90.Final'
ext['netty-boringssl.version'] = '2.0.59.Final'
ext['logback.version'] = '1.3.14'
ext['netty-bom.version'] = '4.1.106.Final'
ext['netty-boringssl.version'] = '2.0.62.Final'
ext['hdrhistogram.version'] = '2.1.12'
ext['mockito.version'] = '4.4.0'
ext['mockito.version'] = '4.11.0'
ext['slf4j.version'] = '1.7.36'
ext['jmh.version'] = '1.35'
ext['junit.version'] = '5.8.1'
ext['micrometer.version'] = '1.10.6'
ext['micrometer-tracing.version'] = '1.0.4'
ext['assertj.version'] = '3.22.0'
ext['jmh.version'] = '1.36'
ext['junit.version'] = '5.9.3'
ext['micrometer.version'] = '1.11.0'
ext['micrometer-tracing.version'] = '1.1.1'
ext['assertj.version'] = '3.24.2'
ext['netflix.limits.version'] = '0.3.6'
ext['bouncycastle-bcpkix.version'] = '1.70'
ext['awaitility.version'] = '4.1.1'
ext['awaitility.version'] = '4.2.0'

group = "io.rsocket"

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Expand Up @@ -12,4 +12,4 @@
# limitations under the License.
#
version=1.2.0
perfBaselineVersion=1.1.3
perfBaselineVersion=1.1.4
5 changes: 3 additions & 2 deletions rsocket-core/build.gradle
Expand Up @@ -41,13 +41,14 @@ dependencies {
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'

jcstressImplementation(project(":rsocket-test"))
jcstressImplementation 'org.slf4j:slf4j-api'
jcstressImplementation "ch.qos.logback:logback-classic"
jcstressImplementation 'io.projectreactor:reactor-test'
}

jcstress {
mode = 'quick' //quick, default, tough
jcstressDependency = "org.openjdk.jcstress:jcstress-core:0.15"
mode = 'sanity' //sanity, quick, default, tough
jcstressDependency = "org.openjdk.jcstress:jcstress-core:0.16"
}

jar {
Expand Down
Expand Up @@ -3,6 +3,9 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.rsocket.core.StressSubscriber;
import io.rsocket.utils.FastLogger;
import java.util.Arrays;
import java.util.ConcurrentModificationException;
import org.openjdk.jcstress.annotations.Actor;
import org.openjdk.jcstress.annotations.Arbiter;
import org.openjdk.jcstress.annotations.Expect;
Expand All @@ -14,14 +17,17 @@
import org.openjdk.jcstress.infra.results.L_Result;
import reactor.core.Fuseable;
import reactor.core.publisher.Hooks;
import reactor.util.Logger;

public abstract class UnboundedProcessorStressTest {

static {
Hooks.onErrorDropped(t -> {});
}

final UnboundedProcessor unboundedProcessor = new UnboundedProcessor();
final Logger logger = new FastLogger(getClass().getName());

final UnboundedProcessor unboundedProcessor = new UnboundedProcessor(logger);

@JCStressTest
@Outcome(
Expand Down Expand Up @@ -145,6 +151,8 @@ public void arbiter(LLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -270,6 +278,8 @@ public void arbiter(LLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -375,6 +385,8 @@ public void arbiter(LLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -476,6 +488,8 @@ public void arbiter(LLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -578,6 +592,8 @@ public void arbiter(LLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -701,6 +717,8 @@ public void arbiter(LLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -781,6 +799,8 @@ public void arbiter(LLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -837,9 +857,15 @@ public void arbiter(LLL_Result r) {
+ stressSubscriber.onErrorCalls * 2
+ stressSubscriber.droppedErrors.size() * 3;

if (stressSubscriber.concurrentOnNext || stressSubscriber.concurrentOnComplete) {
throw new ConcurrentModificationException("boo");
}

stressSubscriber.values.forEach(ByteBuf::release);

r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -892,6 +918,8 @@ public void arbiter(LLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -1107,6 +1135,8 @@ public void arbiter(LLLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -1238,6 +1268,8 @@ public void arbiter(LLLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -1390,6 +1422,8 @@ public void arbiter(LLLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -1522,6 +1556,8 @@ public void arbiter(LLLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -1587,6 +1623,8 @@ public void arbiter(LLLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -1652,6 +1690,8 @@ public void arbiter(LLLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand All @@ -1678,6 +1718,16 @@ public void subscribe2() {
@Arbiter
public void arbiter(L_Result r) {
r.r1 = stressSubscriber1.onErrorCalls + stressSubscriber2.onErrorCalls;

checkOutcomes(this, r.toString(), logger);
}
}

static void checkOutcomes(Object instance, String result, Logger logger) {
if (Arrays.stream(instance.getClass().getDeclaredAnnotationsByType(Outcome.class))
.flatMap(o -> Arrays.stream(o.id()))
.noneMatch(s -> s.equalsIgnoreCase(result))) {
throw new RuntimeException(result + " " + logger);
}
}
}
137 changes: 137 additions & 0 deletions rsocket-core/src/jcstress/java/io/rsocket/utils/FastLogger.java
@@ -0,0 +1,137 @@
package io.rsocket.utils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import reactor.util.Logger;

/**
* Implementation of {@link Logger} which is based on the {@link ThreadLocal} based queue which
* collects all the events on the per-thread basis. </br> Such logger is designed to have all events
* stored during the stress-test run and then sorted and printed out once all the Threads completed
* execution (inside the {@link org.openjdk.jcstress.annotations.Arbiter} annotated method. </br>
* Note, this implementation only supports trace-level logs and ignores all others, it is intended
* to be used by {@link reactor.core.publisher.StateLogger}.
*/
public class FastLogger implements Logger {

final Map<Thread, List<String>> queues = new ConcurrentHashMap<>();

final ThreadLocal<List<String>> logsQueueLocal =
ThreadLocal.withInitial(
() -> {
final ArrayList<String> logs = new ArrayList<>(100);
queues.put(Thread.currentThread(), logs);
return logs;
});

private final String name;

public FastLogger(String name) {
this.name = name;
}

@Override
public String toString() {
return queues
.values()
.stream()
.flatMap(List::stream)
.sorted(
Comparator.comparingLong(
s -> {
Pattern pattern = Pattern.compile("\\[(.*?)]");
Matcher matcher = pattern.matcher(s);
matcher.find();
return Long.parseLong(matcher.group(1));
}))
.collect(Collectors.joining("\n"));
}

@Override
public String getName() {
return this.name;
}

@Override
public boolean isTraceEnabled() {
return true;
}

@Override
public void trace(String msg) {
logsQueueLocal.get().add(String.format("[%s] %s", System.nanoTime(), msg));
}

@Override
public void trace(String format, Object... arguments) {
trace(String.format(format, arguments));
}

@Override
public void trace(String msg, Throwable t) {
trace(String.format("%s, %s", msg, Arrays.toString(t.getStackTrace())));
}

@Override
public boolean isDebugEnabled() {
return false;
}

@Override
public void debug(String msg) {}

@Override
public void debug(String format, Object... arguments) {}

@Override
public void debug(String msg, Throwable t) {}

@Override
public boolean isInfoEnabled() {
return false;
}

@Override
public void info(String msg) {}

@Override
public void info(String format, Object... arguments) {}

@Override
public void info(String msg, Throwable t) {}

@Override
public boolean isWarnEnabled() {
return false;
}

@Override
public void warn(String msg) {}

@Override
public void warn(String format, Object... arguments) {}

@Override
public void warn(String msg, Throwable t) {}

@Override
public boolean isErrorEnabled() {
return false;
}

@Override
public void error(String msg) {}

@Override
public void error(String format, Object... arguments) {}

@Override
public void error(String msg, Throwable t) {}
}

0 comments on commit 6e59179

Please sign in to comment.