Skip to content

Commit

Permalink
provides fixes to WrappedDirectBufferByteBuf and changes test to use IPC
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
  • Loading branch information
OlegDokuka committed Jan 7, 2021
1 parent 77de699 commit 0753b6e
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 23 deletions.
Expand Up @@ -231,10 +231,10 @@ public static AeronClientTransport createUdp(
final Supplier<IdleStrategy> idleStrategySupplier =
() ->
new BackoffIdleStrategy(
/* maxSpins */ 100, /* maxYields */
1000, /* minParkPeriodNs */
10000, /* maxParkPeriodNs */
100000);
/* maxSpins */ 100,
/* maxYields */ 1000,
/* minParkPeriodNs */ 10000,
/* maxParkPeriodNs */ 100000);
return new AeronClientTransport(
aeron,
new ChannelUriStringBuilder()
Expand All @@ -247,6 +247,26 @@ public static AeronClientTransport createUdp(
ByteBufAllocator.DEFAULT,
256,
256,
Duration.ofSeconds(50).toNanos());
Duration.ofSeconds(5).toNanos());
}

public static AeronClientTransport createIpc(Aeron aeron, EventLoopGroup resources) {
final Supplier<IdleStrategy> idleStrategySupplier =
() ->
new BackoffIdleStrategy(
/* maxSpins */ 100,
/* maxYields */ 1000,
/* minParkPeriodNs */ 10000,
/* maxParkPeriodNs */ 100000);
return new AeronClientTransport(
aeron,
new ChannelUriStringBuilder().media(CommonContext.IPC_MEDIA).build(),
Schedulers.boundedElastic(),
resources,
idleStrategySupplier.get(),
ByteBufAllocator.DEFAULT,
256,
256,
Duration.ofSeconds(5).toNanos());
}
}
Expand Up @@ -205,6 +205,21 @@ public static AeronServerTransport createUdp(
ByteBufAllocator.DEFAULT,
256,
256,
Duration.ofSeconds(50).toNanos());
Duration.ofSeconds(5).toNanos());
}

public static AeronServerTransport createIpc(Aeron aeron, EventLoopGroup resources) {
final Supplier<IdleStrategy> idleStrategySupplier =
() -> new BackoffIdleStrategy(100, 1000, 10000, 100000);
return new AeronServerTransport(
aeron,
new ChannelUriStringBuilder().media(CommonContext.IPC_MEDIA).build(),
Schedulers.boundedElastic(),
resources,
idleStrategySupplier.get(),
ByteBufAllocator.DEFAULT,
256,
256,
Duration.ofSeconds(5).toNanos());
}
}
Expand Up @@ -25,8 +25,6 @@
import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import org.agrona.BitUtil;
import org.agrona.BufferUtil;
import org.agrona.DirectBuffer;

class WrappedDirectBufferByteBuf extends AbstractByteBuf {
Expand Down Expand Up @@ -245,21 +243,23 @@ public int nioBufferCount() {

@Override
public ByteBuffer nioBuffer(int index, int length) {
final ByteBuffer byteBuffer =
BufferUtil.allocateDirectAligned(length, BitUtil.CACHE_LINE_LENGTH);
directBuffer.getBytes(index, byteBuffer, length);
byteBuffer.flip();
return byteBuffer;
final ByteBuffer buffer = directBuffer.byteBuffer();
if (buffer != null) {
return buffer.duplicate().position(index).limit(index + length);
} else {
final byte[] bytes = directBuffer.byteArray();
return ByteBuffer.wrap(bytes, index, length);
}
}

@Override
public ByteBuffer internalNioBuffer(int index, int length) {
return directBuffer.byteBuffer();
return nioBuffer(index, length);
}

@Override
public ByteBuffer[] nioBuffers(int index, int length) {
return new ByteBuffer[] {directBuffer.byteBuffer().duplicate()};
return new ByteBuffer[] {nioBuffer(index, length)};
}

@Override
Expand Down
Expand Up @@ -28,9 +28,7 @@ final class AeronTransportTest implements TransportTest {

static final MediaDriver mediaDriver =
MediaDriver.launch(
new MediaDriver.Context()
.threadingMode(ThreadingMode.SHARED_NETWORK)
.dirDeleteOnStart(true));
new MediaDriver.Context().threadingMode(ThreadingMode.DEDICATED).dirDeleteOnStart(true));

static final Aeron clientAeron = Aeron.connect();
static final Aeron serverAeron = Aeron.connect();
Expand Down Expand Up @@ -61,11 +59,8 @@ public AeronTransportPair(MediaDriver driver, Aeron clientAeron, Aeron serverAer
InetSocketAddress.createUnresolved(
"127.0.0.1", ThreadLocalRandom.current().nextInt(20000) + 5000),
(address, server, allocator) ->
AeronClientTransport.createUdp(
clientAeron, address.getHostName(), address.getPort(), eventLoopGroup),
(address, allocator) ->
AeronServerTransport.createUdp(
serverAeron, address.getHostName(), address.getPort(), eventLoopGroup),
AeronClientTransport.createIpc(clientAeron, eventLoopGroup),
(address, allocator) -> AeronServerTransport.createIpc(serverAeron, eventLoopGroup),
false,
false,
false);
Expand Down

0 comments on commit 0753b6e

Please sign in to comment.