-
Notifications
You must be signed in to change notification settings - Fork 224
/
KamonLogger.scala
147 lines (122 loc) · 5.46 KB
/
KamonLogger.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package filodb.coordinator
import scala.concurrent.Await
import com.typesafe.config.Config
import com.typesafe.scalalogging.StrictLogging
import kamon.Kamon
import kamon.metric.{MeasurementUnit, MetricSnapshot, PeriodSnapshot}
import kamon.metric.MeasurementUnit.{information, time}
import kamon.metric.MeasurementUnit.Dimension.{Information, Time}
import kamon.module.{MetricReporter, Module, ModuleFactory, SpanReporter}
import kamon.tag.TagSet
import kamon.trace.Span
import kamon.util.Clock
class KamonMetricsLogReporter extends MetricReporter with StrictLogging {
logger.info("Started the KamonMetricsLog reporter")
override def stop(): Unit = {}
override def reconfigure(config: Config): Unit = {}
override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {
logMetricCounters(snapshot.counters, "counter")
logMetricGauge(snapshot.gauges, "gauge")
logMetricDistribution(snapshot.histograms, "histograms")
logMetricDistribution(snapshot.timers, "timers")
logMetricDistribution(snapshot.rangeSamplers, "rangeSamplers")
}
private def logMetricCounters(metrics: Seq[MetricSnapshot.Values[Long]], metricType: String): Unit = {
for { c <- metrics } {
val name = normalizeMetricName(c.name, c.settings.unit)
c.instruments.foreach(instrument => {
val value = scale(instrument.value, c.settings.unit)
logger.debug(s"KAMON ${metricType} name=${name} ${formatTags(instrument.tags)} value=$value")
})
}
}
private def logMetricGauge(metrics: Seq[MetricSnapshot.Values[Double]], metricType: String): Unit = {
for { c <- metrics } {
val name = normalizeMetricName(c.name, c.settings.unit)
c.instruments.foreach(instrument => {
val value = scale(instrument.value, c.settings.unit)
logger.debug(s"KAMON ${metricType} name=${name} ${formatTags(instrument.tags)} value=$value")
})
}
}
private def logMetricDistribution(metrics: Seq[MetricSnapshot.Distributions], metricType: String): Unit = {
for { m <- metrics } {
val name = normalizeMetricName(m.name, m.settings.unit)
m.instruments.foreach(instrument => {
val h = instrument.value
def percentile(percentile: Double) = scale(h.percentile(25.0D).value, m.settings.unit)
if(h.count > 0) {
logger.debug(s"KAMON ${metricType} name=$name ${formatTags(instrument.tags)} " +
s"n=${h.count} sum=${h.sum} min=${scale(h.min, m.settings.unit)} " +
s"p50=${percentile(50.0D)} p90=${percentile(90.0D)} " +
s"p95=${percentile(95.0D)} p99=${percentile(99.0D)} " +
s"p999=${percentile(99.9D)} max=${scale(h.max, m.settings.unit)}")
}
})
}
}
private def formatTags(tags: TagSet) = tags.iterator(tagPair => tagPair.toString)
.map{case t => s"${t.key}=${t.value}"}.mkString(" ")
private def normalizeLabelName(label: String): String =
label.map(charOrUnderscore)
private def charOrUnderscore(char: Char): Char =
if (char.isLetterOrDigit || char == '_') char else '_'
private def normalizeMetricName(metricName: String, unit: MeasurementUnit): String = {
val normalizedMetricName = metricName.map(charOrUnderscore)
unit.dimension match {
case Time => normalizedMetricName + "_seconds"
case Information => normalizedMetricName + "_bytes"
case _ => normalizedMetricName
}
}
private def scale(value: Double, unit: MeasurementUnit): Double = unit.dimension match {
case Time if unit.magnitude != time.seconds.magnitude =>
MeasurementUnit.convert(value, unit, time.seconds)
case Information if unit.magnitude != information.bytes.magnitude =>
MeasurementUnit.convert(value, unit, information.bytes)
case _ => value
}
}
class KamonSpanLogReporter extends SpanReporter with StrictLogging {
logger.info("Started the KamonSpanLog reporter")
override def reportSpans(spans: Seq[Span.Finished]): Unit = {
spans.groupBy(_.operationName).foreach { case (name, spans) =>
val durations = spans.map { s => Math.floorDiv(Clock.nanosBetween(s.from, s.to), 1000) }
logger.debug(s"KAMON-TRACE name $name min=${durations.min} max=${durations.max} " +
s"avg=${durations.sum.toFloat / durations.size}")
}
}
override def stop(): Unit =
logger.info("Stopped the Zipkin reporter")
override def reconfigure(config: Config): Unit = {}
}
object KamonLogger {
class MetricsLogFactory extends ModuleFactory {
override def create(settings: ModuleFactory.Settings): Module =
new KamonMetricsLogReporter
}
class SpanLogFactory extends ModuleFactory {
override def create(settings: ModuleFactory.Settings): Module =
new KamonSpanLogReporter
}
}
object KamonShutdownHook extends StrictLogging {
import scala.concurrent.duration._
private val shutdownHookAdded = new java.util.concurrent.atomic.AtomicBoolean(false)
def registerShutdownHook(): Unit = {
if (shutdownHookAdded.compareAndSet(false, true)) {
logger.info(s"Registering Kamon Shutdown Hook...")
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
logger.info(s"Stopping Kamon modules - this will ensure that last few metrics are drained")
try {
Await.result(Kamon.stopModules(), 5.minutes)
logger.info(s"Finished stopping Kamon modules")
} catch { case e: Exception =>
logger.error(s"Exception when stopping Kamon Modules", e)
}
}
})
}
}
}