Skip to content

Commit

Permalink
fix deprecations
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme committed Apr 23, 2024
1 parent 07cd889 commit 3ba8cb7
Show file tree
Hide file tree
Showing 25 changed files with 107 additions and 86 deletions.
3 changes: 2 additions & 1 deletion dependency-versions.gradle
Expand Up @@ -55,6 +55,7 @@ dependencyManagement {
dependency('io.opentelemetry:opentelemetry-extension-trace-propagators:1.2.0')
dependency('io.opentelemetry:opentelemetry-proto:1.7.1-alpha')
dependency('io.opentelemetry:opentelemetry-sdk:1.36.0')
dependency('io.opentelemetry.semconv:opentelemetry-semconv-incubating:1.24.0-alpha')
dependency('io.opentelemetry:opentelemetry-sdk-trace:1.36.0')
dependency('io.opentelemetry:opentelemetry-sdk-metrics:1.36.0')
dependency('io.opentelemetry:opentelemetry-sdk-testing:1.36.0')
Expand Down Expand Up @@ -90,7 +91,7 @@ dependencyManagement {
entry 'bcpkix-jdk15on'
entry 'bcprov-jdk15on'
}
dependencySet(group: 'org.eclipse.jetty', version: '12.0.7') {
dependencySet(group: 'org.eclipse.jetty', version: '11.0.20') {
entry 'jetty-server'
entry 'jetty-servlet'
entry 'jetty-servlets'
Expand Down
Expand Up @@ -6,12 +6,11 @@ import io.vertx.core.Vertx
import io.vertx.core.buffer.Buffer
import io.vertx.core.net.NetServer
import io.vertx.core.net.NetSocket
import io.vertx.kotlin.coroutines.await
import io.vertx.kotlin.coroutines.coAwait
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import org.apache.tuweni.bytes.Bytes
import org.apache.tuweni.concurrent.coroutines.await
import org.slf4j.LoggerFactory
import kotlin.coroutines.CoroutineContext

Expand All @@ -34,7 +33,7 @@ class TcpDownstream(
val server = vertx.createNetServer()
server.connectHandler {
handleSocket(it)
}.listen(port, host).await()
}.listen(port, host).coAwait()
tcpServer = server
logger.info("Started downstream proxy server on $host:$port")
}
Expand Down
Expand Up @@ -6,6 +6,7 @@ import io.vertx.core.Vertx
import io.vertx.core.buffer.Buffer
import io.vertx.core.net.NetClient
import io.vertx.kotlin.coroutines.await
import io.vertx.kotlin.coroutines.coAwait
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
Expand Down Expand Up @@ -33,7 +34,7 @@ class TcpUpstream(
}

override suspend fun handleRequest(message: Bytes): Bytes {
val socket = tcpclient!!.connect(port, host).await()
val socket = tcpclient!!.connect(port, host).coAwait()

val result = AsyncResult.incomplete<Bytes>()
socket.handler {
Expand All @@ -44,7 +45,7 @@ class TcpUpstream(
}.exceptionHandler {
result.completeExceptionally(it)
}
socket.write(Buffer.buffer(message.toArrayUnsafe())).await()
socket.write(Buffer.buffer(message.toArrayUnsafe())).coAwait()

return result.await()
}
Expand Down
Expand Up @@ -9,6 +9,7 @@ import io.vertx.core.buffer.Buffer
import io.vertx.core.datagram.DatagramPacket
import io.vertx.core.net.SocketAddress
import io.vertx.kotlin.coroutines.await
import io.vertx.kotlin.coroutines.coAwait
import io.vertx.kotlin.coroutines.dispatcher
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineExceptionHandler
Expand Down Expand Up @@ -335,7 +336,7 @@ internal class CoroutineDiscoveryService constructor(
}

fun start() = launch {
server.handler { receiveDatagram(it) }.listen(bindAddress.port(), bindAddress.host()).await()
server.handler { receiveDatagram(it) }.listen(bindAddress.port(), bindAddress.host()).coAwait()
val endpoint = Endpoint(
advertiseAddress ?: (server.localAddress()).host(),
advertiseUdpPort ?: server.localAddress().port(),
Expand Down Expand Up @@ -428,7 +429,7 @@ internal class CoroutineDiscoveryService constructor(
override suspend fun shutdown() {
if (shutdown.compareAndSet(false, true)) {
logger.info("{}: shutdown", serviceDescriptor)
server.close().await()
server.close().coAwait()
for (pending in awaitingPongs.values) {
pending.complete(null)
}
Expand Down Expand Up @@ -916,6 +917,6 @@ internal class CoroutineDiscoveryService constructor(
}

private suspend fun sendPacket(address: SocketAddress, packet: Packet) {
server.send(Buffer.buffer(packet.encode().toArrayUnsafe()), address.port(), address.host()).await()
server.send(Buffer.buffer(packet.encode().toArrayUnsafe()), address.port(), address.host()).coAwait()
}
}
5 changes: 3 additions & 2 deletions devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PacketType.kt
Expand Up @@ -6,6 +6,8 @@ import org.apache.tuweni.bytes.Bytes
import org.apache.tuweni.bytes.Bytes32
import org.apache.tuweni.crypto.SECP256K1

private const val MAX_VALUE: Byte = 0x7f

/**
* DevP2P discovery packet types
* @param typeId the byte representing the type
Expand Down Expand Up @@ -86,7 +88,6 @@ internal enum class PacketType(
}, ;

companion object {
private const val MAX_VALUE: Byte = 0x7f
private val INDEX = arrayOfNulls<PacketType?>(MAX_VALUE.toInt())

init {
Expand All @@ -100,7 +101,7 @@ internal enum class PacketType(
}

init {
require(typeId <= PacketType.MAX_VALUE) { "Packet typeId must be in range [0x00, 0x80)" }
require(typeId <= MAX_VALUE) { "Packet typeId must be in range [0x00, 0x80)" }
}

abstract fun decode(
Expand Down
Expand Up @@ -7,6 +7,7 @@ import io.vertx.core.buffer.Buffer
import io.vertx.core.datagram.DatagramPacket
import io.vertx.core.net.SocketAddress
import io.vertx.kotlin.coroutines.await
import io.vertx.kotlin.coroutines.coAwait
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
Expand Down Expand Up @@ -176,38 +177,38 @@ internal class DefaultDiscoveryV5Service(
private lateinit var receiveJob: Job

override suspend fun start(): AsyncCompletion {
server.handler(this::receiveDatagram).listen(bindAddress.port, bindAddress.hostString).await()
server.handler(this::receiveDatagram).listen(bindAddress.port, bindAddress.hostString).coAwait()
return bootstrap()
}

override suspend fun terminate() {
if (started.compareAndSet(true, false)) {
receiveJob.cancel()
server.close().await()
server.close().coAwait()
}
}

override fun enr(): EthereumNodeRecord = selfEnr

override suspend fun addPeer(enr: EthereumNodeRecord, address: SocketAddress): AsyncCompletion {
val session = sessions[address]
if (session == null) {
return if (session == null) {
logger.trace("Creating new session for peer {}", enr)
val handshakeSession = handshakes.computeIfAbsent(address) { addr -> createHandshake(addr, enr.publicKey(), enr) }
return asyncCompletion {
asyncCompletion {
logger.trace("Handshake connection start {}", enr)
handshakeSession.connect().await()
logger.trace("Handshake connection done {}", enr)
}
} else {
logger.trace("Session found for peer {}", enr)
return AsyncCompletion.completed()
AsyncCompletion.completed()
}
}

private fun send(addr: SocketAddress, message: Bytes) {
launch {
server.send(Buffer.buffer(message.toArrayUnsafe()), addr.port(), addr.host()).await()
server.send(Buffer.buffer(message.toArrayUnsafe()), addr.port(), addr.host()).coAwait()
}
}

Expand All @@ -227,7 +228,7 @@ internal class DefaultDiscoveryV5Service(
var session = sessions.get(packet.sender())
val size = Math.min(Packet.MAX_SIZE, packet.data().length())
val buffer = ByteBuffer.allocate(size)
packet.data().byteBuf.readBytes(buffer)
buffer.put(packet.data().bytes)
buffer.flip()
val message = Bytes.wrapByteBuffer(buffer)
if (message.slice(0, 32) == whoAreYouHeader && session != null) {
Expand Down
Expand Up @@ -6,6 +6,7 @@ import io.vertx.core.Vertx
import io.vertx.core.buffer.Buffer
import io.vertx.core.net.SocketAddress
import io.vertx.kotlin.coroutines.await
import io.vertx.kotlin.coroutines.coAwait
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.apache.tuweni.bytes.Bytes
Expand Down Expand Up @@ -61,7 +62,7 @@ internal class DiscoveryServiceTest {
val reference = AsyncResult.incomplete<Buffer>()
val client = vertx.createDatagramSocket().handler { res ->
reference.complete(res.data())
}.listen(0, "localhost").await()
}.listen(0, "localhost").coAwait()
val clientEndpoint = Endpoint("192.168.1.1", 5678, 7654)
val ping = PingPacket.create(
clientKeyPair,
Expand All @@ -70,10 +71,10 @@ internal class DiscoveryServiceTest {
Endpoint(address),
null,
)
client.send(Buffer.buffer(ping.encode().toArrayUnsafe()), address.port(), address.host()).await()
client.send(Buffer.buffer(ping.encode().toArrayUnsafe()), address.port(), address.host()).coAwait()
val datagram = reference.await()
val buffer = ByteBuffer.allocate(datagram.length())
datagram.byteBuf.readBytes(buffer)
buffer.put(datagram.bytes)
val pong = Packet.decodeFrom(buffer) as PongPacket
assertEquals(discoveryService.nodeId, pong.nodeId)
assertEquals(ping.hash, pong.pingHash)
Expand All @@ -92,7 +93,7 @@ internal class DiscoveryServiceTest {
reference.set(AsyncResult.incomplete())
val bootstrapClient = vertx.createDatagramSocket().handler { res ->
reference.get().complete(res.data())
}.listen(0, "127.0.0.1").await()
}.listen(0, "127.0.0.1").coAwait()

val serviceKeyPair = SECP256K1.KeyPair.random()
val peerRepository = EphemeralPeerRepository()
Expand All @@ -113,7 +114,7 @@ internal class DiscoveryServiceTest {

val datagram = reference.get().await()
val buffer = ByteBuffer.allocate(datagram.length())
datagram.byteBuf.readBytes(buffer)
buffer.put(datagram.bytes)
val ping = Packet.decodeFrom(buffer) as PingPacket
assertEquals(discoveryService.nodeId, ping.nodeId)
assertEquals(
Expand All @@ -132,7 +133,7 @@ internal class DiscoveryServiceTest {
)
reference.set(AsyncResult.incomplete())
val address = SocketAddress.inetSocketAddress(discoveryService.localPort, "127.0.0.1")
bootstrapClient.send(Buffer.buffer(pong.encode().toArrayUnsafe()), address.port(), address.host()).await()
bootstrapClient.send(Buffer.buffer(pong.encode().toArrayUnsafe()), address.port(), address.host()).coAwait()

val findNodesDatagram = reference.get().await()

Expand Down Expand Up @@ -164,7 +165,7 @@ internal class DiscoveryServiceTest {
val reference = AsyncResult.incomplete<Buffer>()
val bootstrapClient = vertx.createDatagramSocket().handler { res ->
reference.complete(res.data())
}.listen(0, "localhost").await()
}.listen(0, "localhost").coAwait()

val serviceKeyPair = SECP256K1.KeyPair.random()
val peerRepository = EphemeralPeerRepository()
Expand All @@ -184,7 +185,7 @@ internal class DiscoveryServiceTest {
)
val datagram = reference.await()
val buffer = ByteBuffer.allocate(datagram.length())
datagram.byteBuf.readBytes(buffer)
buffer.put(datagram.bytes)
val ping = Packet.decodeFrom(buffer) as PingPacket
assertEquals(discoveryService.nodeId, ping.nodeId)
assertEquals(
Expand All @@ -202,7 +203,7 @@ internal class DiscoveryServiceTest {
null,
)
val address = SocketAddress.inetSocketAddress(discoveryService.localPort, "127.0.0.1")
bootstrapClient.send(Buffer.buffer(pong.encode().toArrayUnsafe()), address.port(), address.host()).await()
bootstrapClient.send(Buffer.buffer(pong.encode().toArrayUnsafe()), address.port(), address.host()).coAwait()

delay(1000)
val bootstrapPeer =
Expand All @@ -225,7 +226,7 @@ internal class DiscoveryServiceTest {
val reference = AsyncResult.incomplete<Buffer>()
val bootstrapClient = vertx.createDatagramSocket().handler { res ->
reference.complete(res.data())
}.listen(0, "localhost").await()
}.listen(0, "localhost").coAwait()

val discoveryService = DiscoveryService.open(
vertx,
Expand All @@ -244,7 +245,7 @@ internal class DiscoveryServiceTest {

val datagram = reference.await()
val buffer = ByteBuffer.allocate(datagram.length())
datagram.byteBuf.readBytes(buffer)
buffer.put(datagram.bytes)
val ping = Packet.decodeFrom(buffer) as PingPacket
assertEquals(discoveryService.nodeId, ping.nodeId)
assertEquals(
Expand All @@ -264,7 +265,7 @@ internal class DiscoveryServiceTest {
reference.set(AsyncResult.incomplete())
val bootstrapClient = vertx.createDatagramSocket().handler { res ->
reference.get().complete(res.data())
}.listen(0, "localhost").await()
}.listen(0, "localhost").coAwait()

val discoveryService = DiscoveryService.open(
vertx,
Expand All @@ -280,7 +281,7 @@ internal class DiscoveryServiceTest {
val datagram1 = reference.get().await()
reference.set(AsyncResult.incomplete())
val buffer1 = ByteBuffer.allocate(datagram1.length())
datagram1.byteBuf.readBytes(buffer1)
buffer1.put(datagram1.bytes)
val ping1 = Packet.decodeFrom(buffer1) as PingPacket
assertEquals(discoveryService.nodeId, ping1.nodeId)
assertEquals(
Expand All @@ -290,7 +291,7 @@ internal class DiscoveryServiceTest {
val datagram2 = reference.get().await()
reference.set(AsyncResult.incomplete())
val buffer2 = ByteBuffer.allocate(datagram2.length())
datagram2.byteBuf.readBytes(buffer2)
buffer2.put(datagram2.bytes)
val ping2 = Packet.decodeFrom(buffer2) as PingPacket
assertEquals(discoveryService.nodeId, ping2.nodeId)
assertEquals(
Expand All @@ -300,7 +301,7 @@ internal class DiscoveryServiceTest {
val datagram3 = reference.get().await()
reference.set(AsyncResult.incomplete())
val buffer3 = ByteBuffer.allocate(datagram3.length())
datagram3.byteBuf.readBytes(buffer3)
buffer3.put(datagram3.bytes)
val ping3 = Packet.decodeFrom(buffer3) as PingPacket
assertEquals(discoveryService.nodeId, ping3.nodeId)
assertEquals(
Expand Down Expand Up @@ -328,18 +329,18 @@ internal class DiscoveryServiceTest {
reference.set(AsyncResult.incomplete())
val client = vertx.createDatagramSocket().handler { res ->
reference.get().complete(res.data())
}.listen(0, "localhost").await()
}.listen(0, "localhost").coAwait()
val findNodes =
FindNodePacket.create(
clientKeyPair,
System.currentTimeMillis(),
SECP256K1.KeyPair.random().publicKey(),
)
client.send(Buffer.buffer(findNodes.encode().toArrayUnsafe()), address.port(), address.host()).await()
client.send(Buffer.buffer(findNodes.encode().toArrayUnsafe()), address.port(), address.host()).coAwait()

val datagram = reference.get().await()
val buffer = ByteBuffer.allocate(datagram.length())
datagram.byteBuf.readBytes(buffer)
buffer.put(datagram.bytes)
val ping = Packet.decodeFrom(buffer) as PingPacket
assertEquals(discoveryService.nodeId, ping.nodeId)

Expand All @@ -355,11 +356,11 @@ internal class DiscoveryServiceTest {
)

reference.set(AsyncResult.incomplete())
client.send(Buffer.buffer(pong.encode().toArrayUnsafe()), address.port(), address.host()).await()
client.send(Buffer.buffer(pong.encode().toArrayUnsafe()), address.port(), address.host()).coAwait()

val datagram2 = reference.get().await()
val buffer2 = ByteBuffer.allocate(datagram2.length())
datagram2.byteBuf.readBytes(buffer2)
buffer2.put(datagram2.bytes)
val neighbors = Packet.decodeFrom(buffer2) as NeighborsPacket
assertEquals(discoveryService.nodeId, neighbors.nodeId)

Expand Down
Expand Up @@ -5,6 +5,7 @@ package org.apache.tuweni.devp2p.v5
import io.vertx.core.Vertx
import io.vertx.core.buffer.Buffer
import io.vertx.kotlin.coroutines.await
import io.vertx.kotlin.coroutines.coAwait
import kotlinx.coroutines.runBlocking
import org.apache.tuweni.bytes.Bytes
import org.apache.tuweni.concurrent.AsyncResult
Expand Down Expand Up @@ -52,7 +53,7 @@ class DefaultDiscoveryV5ServiceTest {
val reference = AsyncResult.incomplete<Buffer>()
val client = vertx.createDatagramSocket().handler { res ->
reference.complete(res.data())
}.listen(19001, "localhost").await()
}.listen(19001, "localhost").coAwait()
val discoveryV5Service: DiscoveryV5Service =
DiscoveryService.open(
vertx,
Expand All @@ -64,7 +65,7 @@ class DefaultDiscoveryV5ServiceTest {

val datagram = reference.await()
val buffer = ByteBuffer.allocate(datagram.length())
datagram.byteBuf.readBytes(buffer)
buffer.put(datagram.bytes)
buffer.flip()
val receivedBytes = Bytes.wrapByteBuffer(buffer)
val content = receivedBytes.slice(45)
Expand Down

0 comments on commit 3ba8cb7

Please sign in to comment.