Skip to content

Commit

Permalink
Better instrumenting (#440)
Browse files Browse the repository at this point in the history
* Switch etcd Node Directory to use leases instead of a polling approach to expiration.

* Log messages for connection and disconnection of clients to server, debug information about current connections.

Co-authored-by: Brett Morien <bmorien@ea.com>
  • Loading branch information
brettmorien and Brett Morien committed Aug 20, 2020
1 parent f02df9d commit a4173df
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 43 deletions.
16 changes: 16 additions & 0 deletions .idea/codeStyles/Project.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,27 @@ import io.etcd.jetcd.Client
import io.etcd.jetcd.op.Op
import io.etcd.jetcd.options.DeleteOption
import io.etcd.jetcd.options.GetOption
import io.etcd.jetcd.options.PutOption
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.future.await
import kotlinx.coroutines.withTimeout
import mu.KotlinLogging
import orbit.server.mesh.NodeDirectory
import orbit.shared.mesh.NodeId
import orbit.shared.mesh.NodeInfo
import orbit.shared.proto.Node
import orbit.shared.proto.toAddressableLeaseProto
import orbit.shared.proto.toNodeInfo
import orbit.shared.proto.toNodeInfoProto
import orbit.util.di.ExternallyConfigured
import orbit.util.time.Clock
import orbit.util.time.Timestamp
import orbit.util.time.stopwatch
import orbit.util.time.toInstant
import java.nio.charset.Charset
import java.time.Duration
import java.util.concurrent.ExecutionException
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong
import kotlin.random.Random

Expand All @@ -40,42 +48,73 @@ class EtcdNodeDirectory(config: EtcdNodeDirectoryConfig, private val clock: Cloc

private val logger = KotlinLogging.logger { }

private val client = Client.builder().endpoints(config.url).build().kvClient
private val lastCleanup = AtomicLong(clock.currentTime)
private val cleanupIntervalMs =
Random.nextLong(config.cleanupFrequencyRange.first.toMillis(), config.cleanupFrequencyRange.second.toMillis())
private val client = Client.builder().endpoints(config.url).build()
private val kvClient = client.kvClient
private val leaseClient = client.leaseClient

private val lastHealthCheckTime = AtomicLong(0)
private val lastHealthCheck = AtomicBoolean(false)

override suspend fun isHealthy(): Boolean {
if (lastHealthCheckTime.get() + 5000 > clock.currentTime) {
return lastHealthCheck.get()
}
try {
lastHealthCheckTime.set(clock.currentTime)
withTimeout(3000) {
getLease(Timestamp.now())
}
lastHealthCheck.set(true)
return true
} catch (e: TimeoutCancellationException) {
lastHealthCheck.set(false)
return false
} catch (e: ExecutionException) {
lastHealthCheck.set(false)
return false
}
}

suspend fun getLease(time: Timestamp): PutOption {
val lease = leaseClient.grant(clock.until(time).seconds).await()
return PutOption.newBuilder().withLeaseId(lease.id).build()
}

override suspend fun get(key: NodeId): NodeInfo? {
val response = client.get(toByteKey(key)).await()
val response = kvClient.get(toByteKey(key)).await()
return response.kvs.firstOrNull()?.value?.let {
Node.NodeInfoProto.parseFrom(it.bytes).toNodeInfo()
}
}

override suspend fun remove(key: NodeId): Boolean {
client.delete(toByteKey(key)).await()
kvClient.delete(toByteKey(key)).await()
return true
}

override suspend fun compareAndSet(key: NodeId, initialValue: NodeInfo?, newValue: NodeInfo?): Boolean {
val byteKey = toByteKey(key)
val oldValue = client.get(byteKey).await().kvs.firstOrNull()?.value?.bytes?.let {
val oldValue = kvClient.get(byteKey).await().kvs.firstOrNull()?.value?.bytes?.let {
Node.NodeInfoProto.parseFrom(it).toNodeInfo()
}

if (initialValue == oldValue) {
if (newValue != null) {
client.put(byteKey, ByteSequence.from(newValue.toNodeInfoProto().toByteArray())).await()
kvClient.put(
byteKey,
ByteSequence.from(newValue.toNodeInfoProto().toByteArray()),
getLease(newValue.lease.expiresAt)
).await()
} else {
client.delete(byteKey).await()
kvClient.delete(byteKey).await()
}
return true
}
return false
}

override suspend fun count() =
client.get(
kvClient.get(
allKey, GetOption.newBuilder()
.withSortField(GetOption.SortTarget.KEY)
.withSortOrder(GetOption.SortOrder.DESCEND)
Expand All @@ -93,7 +132,7 @@ class EtcdNodeDirectory(config: EtcdNodeDirectoryConfig, private val clock: Cloc
.withRange(allKey)
.build()

val response = client.get(allKey, option).await()
val response = kvClient.get(allKey, option).await()

return response.kvs.map { kv ->
Pair(
Expand All @@ -103,33 +142,6 @@ class EtcdNodeDirectory(config: EtcdNodeDirectoryConfig, private val clock: Cloc
}
}

override suspend fun tick() {
if (lastCleanup.get() + cleanupIntervalMs < clock.currentTime) {
val (time, cleanupResult) = stopwatch(clock) {
lastCleanup.set(clock.currentTime)

val (expiredLeases, validLeases) =
entries().partition { (_, node) -> clock.inPast(node.lease.expiresAt) }

if (expiredLeases.any()) {
val txn = client.txn()
txn.Then(*expiredLeases.map { (id) ->
Op.delete(toByteKey(id), DeleteOption.DEFAULT)
}.toTypedArray()).commit().await()
}

object {
val expired = expiredLeases.count()
val valid = validLeases.count()
}
}

logger.info {
"Node Directory cleanup took ${time}ms. Removed ${cleanupResult.expired} entries, ${cleanupResult.valid} remain valid."
}
}
}

private fun toByteKey(nodeId: NodeId): ByteSequence {
return ByteSequence.from("$keyPrefix/${nodeId.namespace}/${nodeId.key}".toByteArray())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@

package orbit.server.mesh

import orbit.server.service.HealthCheck
import orbit.shared.mesh.NodeId
import orbit.shared.mesh.NodeInfo
import orbit.util.concurrent.AsyncMap

interface NodeDirectory : AsyncMap<NodeId, NodeInfo> {
interface NodeDirectory : AsyncMap<NodeId, NodeInfo>, HealthCheck {
suspend fun entries(): Iterable<Pair<NodeId, NodeInfo>>
suspend fun tick() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ class LocalNodeDirectory(private val clock: Clock) :
}
}

override suspend fun isHealthy(): Boolean {
return true
}

override suspend fun tick() {
// Cull expired
globalMap.values.filter { clock.inPast(it.lease.expiresAt) }.also { toDelete ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.micrometer.core.instrument.Metrics
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.launch
import mu.KotlinLogging
import orbit.server.auth.AuthSystem
import orbit.server.concurrent.RuntimeScopes
import orbit.server.mesh.ClusterManager
Expand All @@ -35,6 +36,7 @@ class ConnectionManager(
private val authSystem: AuthSystem,
container: ComponentContainer
) {
private val logger = KotlinLogging.logger { }
private val connectedClients = ConcurrentHashMap<NodeId, ClientConnection>()

init {
Expand All @@ -46,8 +48,6 @@ class ConnectionManager(

fun getClient(nodeId: NodeId) = connectedClients[nodeId]

val clients get() = connectedClients.values.map { c -> c.nodeId }.toList()

fun onNewClient(
nodeId: NodeId,
incomingChannel: ReceiveChannel<Messages.MessageProto>,
Expand Down Expand Up @@ -79,6 +79,9 @@ class ConnectionManager(
// Update the visible nodes
updateDirectoryClients()

logger.info { "Client ${nodeId} connected to Mesh Node ${localNodeInfo.info.id}" }
logger.debug { "${localNodeInfo.info.id} -> ${connectedClients.map { c -> c.key }}"}

// Consume messages, suspends here until connection drops
clientConnection.consumeMessages()
} catch (t: Throwable) {
Expand All @@ -96,6 +99,9 @@ class ConnectionManager(

// Remove client
connectedClients.remove(nodeId)

logger.info { "Client ${nodeId} disconnected from Mesh Node ${localNodeInfo.info.id}" }
logger.debug { "${localNodeInfo.info.id} -> ${connectedClients.map { c -> c.key }}"}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@ class RemoteMeshNodeManager(
meshNodes.forEach { node ->
logger.info("Connecting to peer ${node.id.key} @${node.url}...")
this.connections[node.id] = RemoteMeshNodeConnection(localNode, node)
logger.debug { "${localNode.info.id} -> ${connections.map { c -> c.key }}"}
}

connections.values.forEach { node ->
if (allNodes.none { it.id == node.id }) {
logger.info("Removing peer ${node.id.key}...")
connections[node.id]!!.disconnect()
connections.remove(node.id)
logger.debug { "${localNode.info.id} -> ${connections.map { c -> c.key }}"}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,19 @@ package orbit.server.service
import orbit.server.OrbitServer
import orbit.server.mesh.AddressableDirectory
import orbit.server.mesh.LocalNodeInfo
import orbit.server.mesh.NodeDirectory

class HealthCheckList(
private val server: OrbitServer,
private val localNodeInfo: LocalNodeInfo,
private val addressableDirectory: AddressableDirectory
private val addressableDirectory: AddressableDirectory,
private val nodeDirectory: NodeDirectory
) {
val checks = listOf(
this.server,
this.localNodeInfo,
this.addressableDirectory
this.addressableDirectory,
this.nodeDirectory
)

fun getChecks(): Iterable<HealthCheck> {
Expand Down

0 comments on commit a4173df

Please sign in to comment.