Skip to content

Commit

Permalink
reworks TransportTest abstraction
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <oleh.dokuka@icloud.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <oleh.dokuka@icloud.com>
  • Loading branch information
OlegDokuka committed Jun 9, 2021
1 parent 6047692 commit 6aa3d8a
Show file tree
Hide file tree
Showing 16 changed files with 593 additions and 557 deletions.
495 changes: 266 additions & 229 deletions rsocket-test/src/main/java/io/rsocket/test/TransportTest.java

Large diffs are not rendered by default.

Expand Up @@ -24,7 +24,7 @@
import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;

final class AeronTransportTest implements TransportTest {
final class AeronTransportTest extends TransportTest<InetSocketAddress, AeronServer> {

static final MediaDriver mediaDriver =
MediaDriver.launch(
Expand All @@ -34,17 +34,9 @@ final class AeronTransportTest implements TransportTest {
static final Aeron serverAeron = Aeron.connect();
static final EventLoopGroup eventLoopGroup = EventLoopGroup.create(4);

final AeronTransportPair transportPair =
new AeronTransportPair(mediaDriver, clientAeron, serverAeron);

@Override
public Duration getTimeout() {
return Duration.ofMinutes(2);
}

@Override
public TransportPair getTransportPair() {
return transportPair;
protected TransportPair<InetSocketAddress, AeronServer> createTransportPair() {
return new AeronTransportPair(mediaDriver, clientAeron, serverAeron);
}

static class AeronTransportPair extends TransportPair<InetSocketAddress, AeronServer> {
Expand All @@ -57,13 +49,14 @@ public AeronTransportPair(MediaDriver driver, Aeron clientAeron, Aeron serverAer
super(
() ->
InetSocketAddress.createUnresolved(
"127.0.0.1", ThreadLocalRandom.current().nextInt(20000) + 5000),
"0.0.0.0", ThreadLocalRandom.current().nextInt(20000) + 5000),
(address, server, allocator) ->
AeronClientTransport.createIpc(clientAeron, eventLoopGroup),
(address, allocator) -> AeronServerTransport.createIpc(serverAeron, eventLoopGroup),
false,
false,
false);
false,
Duration.ofMinutes(2));
this.mediaDriver = driver;
this.clientAeron = clientAeron;
this.serverAeron = serverAeron;
Expand Down
@@ -0,0 +1,74 @@
/*
* Copyright 2015-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.transport.aeron;

import io.aeron.Aeron;
import io.aeron.driver.MediaDriver;
import io.aeron.driver.ThreadingMode;
import io.rsocket.test.TransportTest;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;
import org.junit.jupiter.api.Disabled;

@Disabled
final class AeronWithFragmentationTransportTest
extends TransportTest<InetSocketAddress, AeronServer> {

static final MediaDriver mediaDriver =
MediaDriver.launch(
new MediaDriver.Context().threadingMode(ThreadingMode.DEDICATED).dirDeleteOnStart(true));

static final Aeron clientAeron = Aeron.connect();
static final Aeron serverAeron = Aeron.connect();
static final EventLoopGroup eventLoopGroup = EventLoopGroup.create(4);

@Override
protected TransportPair<InetSocketAddress, AeronServer> createTransportPair() {
return new AeronTransportPair(mediaDriver, clientAeron, serverAeron);
}

static class AeronTransportPair extends TransportPair<InetSocketAddress, AeronServer> {

final MediaDriver mediaDriver;
final Aeron clientAeron;
final Aeron serverAeron;

public AeronTransportPair(MediaDriver driver, Aeron clientAeron, Aeron serverAeron) {
super(
() ->
InetSocketAddress.createUnresolved(
"0.0.0.0", ThreadLocalRandom.current().nextInt(20000) + 5000),
(address, server, allocator) ->
AeronClientTransport.createIpc(clientAeron, eventLoopGroup),
(address, allocator) -> AeronServerTransport.createIpc(serverAeron, eventLoopGroup),
true,
false,
false,
Duration.ofMinutes(2));
this.mediaDriver = driver;
this.clientAeron = clientAeron;
this.serverAeron = serverAeron;
}

@Override
public void dispose() {
super.dispose();
// CloseHelper.quietCloseAll(clientAeron, serverAeron);
}
}
}
Expand Up @@ -16,27 +16,21 @@

package io.rsocket.transport.local;

import io.rsocket.Closeable;
import io.rsocket.test.TransportTest;
import java.time.Duration;
import java.util.UUID;

final class LocalResumableTransportTest implements TransportTest {

private final TransportPair transportPair =
new TransportPair<>(
() -> "test-" + UUID.randomUUID(),
(address, server, allocator) -> LocalClientTransport.create(address, allocator),
(address, allocator) -> LocalServerTransport.create(address),
false,
true);

@Override
public Duration getTimeout() {
return Duration.ofSeconds(10);
}
final class LocalResumableTransportTest extends TransportTest<String, Closeable> {

@Override
public TransportPair getTransportPair() {
return transportPair;
protected TransportPair createTransportPair() {
return new TransportPair<>(
() -> "test-" + UUID.randomUUID(),
(address, server, allocator) -> LocalClientTransport.create(address, allocator),
(address, allocator) -> LocalServerTransport.create(address),
false,
true,
Duration.ofSeconds(10));
}
}
Expand Up @@ -16,27 +16,21 @@

package io.rsocket.transport.local;

import io.rsocket.Closeable;
import io.rsocket.test.TransportTest;
import java.time.Duration;
import java.util.UUID;

final class LocalResumableWithFragmentationTransportTest implements TransportTest {

private final TransportPair transportPair =
new TransportPair<>(
() -> "test-" + UUID.randomUUID(),
(address, server, allocator) -> LocalClientTransport.create(address, allocator),
(address, allocator) -> LocalServerTransport.create(address),
true,
true);

@Override
public Duration getTimeout() {
return Duration.ofSeconds(10);
}
final class LocalResumableWithFragmentationTransportTest extends TransportTest<String, Closeable> {

@Override
public TransportPair getTransportPair() {
return transportPair;
protected TransportPair<String, Closeable> createTransportPair() {
return new TransportPair<>(
() -> "test-" + UUID.randomUUID(),
(address, server, allocator) -> LocalClientTransport.create(address, allocator),
(address, allocator) -> LocalServerTransport.create(address),
true,
true,
Duration.ofSeconds(10));
}
}
Expand Up @@ -16,25 +16,19 @@

package io.rsocket.transport.local;

import io.rsocket.Closeable;
import io.rsocket.test.TransportTest;
import java.time.Duration;
import java.util.UUID;

final class LocalTransportTest implements TransportTest {

private final TransportPair transportPair =
new TransportPair<>(
() -> "test-" + UUID.randomUUID(),
(address, server, allocator) -> LocalClientTransport.create(address, allocator),
(address, allocator) -> LocalServerTransport.create(address));

@Override
public Duration getTimeout() {
return Duration.ofSeconds(10);
}
final class LocalTransportTest extends TransportTest<String, Closeable> {

@Override
public TransportPair getTransportPair() {
return transportPair;
protected TransportPair<String, Closeable> createTransportPair() {
return new TransportPair<>(
() -> "test-" + UUID.randomUUID(),
(address, server, allocator) -> LocalClientTransport.create(address, allocator),
(address, allocator) -> LocalServerTransport.create(address),
Duration.ofSeconds(10));
}
}
Expand Up @@ -16,26 +16,20 @@

package io.rsocket.transport.local;

import io.rsocket.Closeable;
import io.rsocket.test.TransportTest;
import java.time.Duration;
import java.util.UUID;

final class LocalTransportWithFragmentationTest implements TransportTest {

private final TransportPair transportPair =
new TransportPair<>(
() -> "test-" + UUID.randomUUID(),
(address, server, allocator) -> LocalClientTransport.create(address, allocator),
(address, allocator) -> LocalServerTransport.create(address),
true);

@Override
public Duration getTimeout() {
return Duration.ofSeconds(10);
}
final class LocalTransportWithFragmentationTest extends TransportTest<String, Closeable> {

@Override
public TransportPair getTransportPair() {
return transportPair;
protected TransportPair<String, Closeable> createTransportPair() {
return new TransportPair<>(
() -> "test-" + UUID.randomUUID(),
(address, server, allocator) -> LocalClientTransport.create(address, allocator),
(address, allocator) -> LocalServerTransport.create(address),
true,
Duration.ofSeconds(10));
}
}
Expand Up @@ -19,36 +19,31 @@
import io.netty.channel.ChannelOption;
import io.rsocket.test.TransportTest;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import java.net.InetSocketAddress;
import java.time.Duration;
import reactor.netty.tcp.TcpClient;
import reactor.netty.tcp.TcpServer;

final class TcpFragmentationTransportTest implements TransportTest {

private final TransportPair transportPair =
new TransportPair<>(
() -> InetSocketAddress.createUnresolved("localhost", 0),
(address, server, allocator) ->
TcpClientTransport.create(
TcpClient.create()
.remoteAddress(server::address)
.option(ChannelOption.ALLOCATOR, allocator)),
(address, allocator) ->
TcpServerTransport.create(
TcpServer.create()
.bindAddress(() -> address)
.option(ChannelOption.ALLOCATOR, allocator)),
true);

@Override
public Duration getTimeout() {
return Duration.ofMinutes(2);
}
final class TcpFragmentationTransportTest
extends TransportTest<InetSocketAddress, CloseableChannel> {

@Override
public TransportPair getTransportPair() {
return transportPair;
protected TransportPair<InetSocketAddress, CloseableChannel> createTransportPair() {
return new TransportPair<>(
() -> InetSocketAddress.createUnresolved("localhost", 0),
(address, server, allocator) ->
TcpClientTransport.create(
TcpClient.create()
.remoteAddress(server::address)
.option(ChannelOption.ALLOCATOR, allocator)),
(address, allocator) ->
TcpServerTransport.create(
TcpServer.create()
.bindAddress(() -> address)
.option(ChannelOption.ALLOCATOR, allocator)),
true,
Duration.ofMinutes(2));
}
}
Expand Up @@ -19,37 +19,31 @@
import io.netty.channel.ChannelOption;
import io.rsocket.test.TransportTest;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import java.net.InetSocketAddress;
import java.time.Duration;
import reactor.netty.tcp.TcpClient;
import reactor.netty.tcp.TcpServer;

final class TcpResumableTransportTest implements TransportTest {

private final TransportPair transportPair =
new TransportPair<>(
() -> InetSocketAddress.createUnresolved("localhost", 0),
(address, server, allocator) ->
TcpClientTransport.create(
TcpClient.create()
.remoteAddress(server::address)
.option(ChannelOption.ALLOCATOR, allocator)),
(address, allocator) ->
TcpServerTransport.create(
TcpServer.create()
.bindAddress(() -> address)
.option(ChannelOption.ALLOCATOR, allocator)),
false,
true);

@Override
public Duration getTimeout() {
return Duration.ofMinutes(3);
}
final class TcpResumableTransportTest extends TransportTest<InetSocketAddress, CloseableChannel> {

@Override
public TransportPair getTransportPair() {
return transportPair;
protected TransportPair<InetSocketAddress, CloseableChannel> createTransportPair() {
return new TransportPair<>(
() -> InetSocketAddress.createUnresolved("localhost", 0),
(address, server, allocator) ->
TcpClientTransport.create(
TcpClient.create()
.remoteAddress(server::address)
.option(ChannelOption.ALLOCATOR, allocator)),
(address, allocator) ->
TcpServerTransport.create(
TcpServer.create()
.bindAddress(() -> address)
.option(ChannelOption.ALLOCATOR, allocator)),
false,
true,
Duration.ofMinutes(2));
}
}

0 comments on commit 6aa3d8a

Please sign in to comment.