Skip to content

Commit

Permalink
[AWS ECS container receiver]Add new metric for stopped container (#2383)
Browse files Browse the repository at this point in the history
* [AWS ECS container receiver]Add new metric for stopped container

* [AWS ECS container receiver]Add new metric for stopped container

* variable name chang

* add more test cases

* Add unit tests

* re-run unit test

* Solve the file conflict

* Update the layout
  • Loading branch information
JohnWu20 committed Mar 11, 2021
1 parent 792079f commit 0358399
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 126 deletions.
Expand Up @@ -43,15 +43,25 @@ func (acc *metricDataAccumulator) getMetricsData(containerStatsMap map[string]*C
stats, ok := containerStatsMap[containerMetadata.DockerID]

if ok && !isEmptyStats(stats) {

containerMetrics := convertContainerMetrics(stats, logger, containerMetadata)
acc.accumulate(convertToOTLPMetrics(ContainerPrefix, containerMetrics, containerResource, timestamp))
aggregateTaskMetrics(&taskMetrics, containerMetrics)
overrideWithTaskLevelLimit(&taskMetrics, metadata)
acc.accumulate(convertToOTLPMetrics(TaskPrefix, taskMetrics, taskResource, timestamp))
} else {
acc.accumulate(convertResourceToRMS(containerResource))

} else if containerMetadata.FinishedAt != "" && containerMetadata.StartedAt != "" {

duration, err := calculateDuration(containerMetadata.StartedAt, containerMetadata.FinishedAt)

if err != nil {
logger.Warn("Error time format error found for this container:" + containerMetadata.ContainerName)
}

acc.accumulate(convertStoppedContainerDataToOTMetrics(ContainerPrefix, containerResource, timestamp, duration))

}
}
overrideWithTaskLevelLimit(&taskMetrics, metadata)
acc.accumulate(convertToOTLPMetrics(TaskPrefix, taskMetrics, taskResource, timestamp))
}

func (acc *metricDataAccumulator) accumulate(rms pdata.ResourceMetricsSlice) {
Expand All @@ -64,14 +74,6 @@ func isEmptyStats(stats *ContainerStats) bool {
return stats == nil || stats.ID == ""
}

func convertResourceToRMS(containerResource pdata.Resource) pdata.ResourceMetricsSlice {
rms := pdata.NewResourceMetricsSlice()
rms.Resize(1)
rm := rms.At(0)
containerResource.CopyTo(rm.Resource())
return rms
}

func convertContainerMetrics(stats *ContainerStats, logger *zap.Logger, containerMetadata ContainerMetadata) ECSMetrics {
containerMetrics := getContainerMetrics(stats, logger)
if containerMetadata.Limits.Memory != nil {
Expand Down Expand Up @@ -109,3 +111,16 @@ func overrideWithTaskLevelLimit(taskMetrics *ECSMetrics, metadata TaskMetadata)
taskMetrics.CPUUtilized = ((taskMetrics.CPUUsageInVCPU / taskMetrics.CPUReserved) * 100)
}
}

func calculateDuration(startTime, endTime string) (float64, error) {
start, err := time.Parse(time.RFC3339Nano, startTime)
if err != nil {
return 0, err
}
end, err := time.Parse(time.RFC3339Nano, endTime)
if err != nil {
return 0, err
}
duration := end.Sub(start)
return duration.Seconds(), nil
}
Expand Up @@ -118,9 +118,20 @@ func TestGetMetricsDataMissingContainerStats(t *testing.T) {
require.Less(t, 0, len(acc.mds))
}

func TestGetMetricsDataNilContainerStats(t *testing.T) {
func TestGetMetricsDataForStoppedContainer(t *testing.T) {
cstats = map[string]*ContainerStats{"001": nil}
tm.Containers = []ContainerMetadata{
{ContainerName: "container-1", DockerID: "001", DockerName: "docker-container-1", CreatedAt: "2020-07-30T22:12:29.842610987Z", StartedAt: "2020-07-30T22:12:31.842610987Z", FinishedAt: "2020-07-31T22:10:29.842610987Z", KnownStatus: "STOPPED", Limits: Limit{CPU: &f, Memory: &v}},
}
acc.getMetricsData(cstats, tm, logger)
require.Less(t, 0, len(acc.mds))
}

func TestWrongFormatTimeDataForStoppedContainer(t *testing.T) {
cstats = map[string]*ContainerStats{"001": nil}
tm.Containers = []ContainerMetadata{
{ContainerName: "container-1", DockerID: "001", DockerName: "docker-container-1", CreatedAt: "2020-07-30T22:12:29.842610987Z", StartedAt: "2020-07-30T22:12:31.842610987Z", FinishedAt: "2020-07-31 22:10:29", KnownStatus: "STOPPED", Limits: Limit{CPU: &f, Memory: &v}},
}
acc.getMetricsData(cstats, tm, logger)
require.Less(t, 0, len(acc.mds))
}
Expand Down Expand Up @@ -222,3 +233,31 @@ func TestIsEmptyStats(t *testing.T) {
cstats = map[string]*ContainerStats{"001": {}}
require.EqualValues(t, true, isEmptyStats(cstats["001"]))
}

func TestCalculateDuration(t *testing.T) {

startTime := "2020-10-02T00:15:07.620912337Z"
endTime := "2020-10-03T15:14:06.620913372Z"
result, err := calculateDuration(startTime, endTime)
require.EqualValues(t, 140339.000001035, result)
require.NoError(t, err)

startTime = "2010-10-02T00:15:07.620912337Z"
endTime = "2020-10-03T15:14:06.620913372Z"
result, err = calculateDuration(startTime, endTime)
require.EqualValues(t, 3.15759539000001e+08, result)
require.NoError(t, err)

startTime = "2010-10-02 00:15:07"
endTime = "2020-10-03T15:14:06.620913372Z"
result, err = calculateDuration(startTime, endTime)
require.NotNil(t, err)
require.EqualValues(t, 0, result)

startTime = "2010-10-02T00:15:07.620912337Z"
endTime = "2020-10-03 15:14:06 +800"
result, err = calculateDuration(startTime, endTime)
require.NotNil(t, err)
require.EqualValues(t, 0, result)

}
Expand Up @@ -74,11 +74,14 @@ const (
AttributeStorageRead = "storage.read_bytes"
AttributeStorageWrite = "storage.write_bytes"

AttributeDuration = "duration"

UnitBytes = "Bytes"
UnitMegaBytes = "Megabytes"
UnitNanoSecond = "Nanoseconds"
UnitBytesPerSec = "Bytes/Second"
UnitCount = "Count"
UnitVCpu = "vCPU"
UnitPercent = "Percent"
UnitSecond = "Seconds"
)
Expand Up @@ -61,7 +61,22 @@ func convertToOTLPMetrics(prefix string, m ECSMetrics, r pdata.Resource, timesta
return rms
}

func convertStoppedContainerDataToOTMetrics(prefix string, containerResource pdata.Resource, timestamp pdata.Timestamp, duration float64) pdata.ResourceMetricsSlice {
rms := pdata.NewResourceMetricsSlice()
rms.Resize(1)
rm := rms.At(0)

containerResource.CopyTo(rm.Resource())

ilms := rm.InstrumentationLibraryMetrics()

ilms.Append(doubleGauge(prefix+AttributeDuration, UnitSecond, duration, timestamp))

return rms
}

func intGauge(metricName string, unit string, value int64, ts pdata.Timestamp) pdata.InstrumentationLibraryMetrics {

ilm := pdata.NewInstrumentationLibraryMetrics()

metric := initMetric(ilm, metricName, unit)
Expand Down
Expand Up @@ -59,3 +59,11 @@ func TestIntSum(t *testing.T) {
m := intSum("cpu_utilized", "Count", intValue, timestamp)
require.NotNil(t, m)
}

func TestConvertStoppedContainerDataToOTMetrics(t *testing.T) {
timestamp := pdata.TimestampFromTime(time.Now())
resource := pdata.NewResource()
duration := 1200000000.32132
rms := convertStoppedContainerDataToOTMetrics("container.", resource, timestamp, duration)
require.EqualValues(t, 1, rms.At(0).InstrumentationLibraryMetrics().Len())
}
Expand Up @@ -85,13 +85,15 @@
"com.amazonaws.ecs.task-definition-version": "1"
},
"DesiredStatus": "RUNNING",
"KnownStatus": "RUNNING",
"KnownStatus": "STOPPED",
"Limits": {
"CPU": 0,
"Memory": 128
},
"CreatedAt": "2020-07-30T22:12:29.842610987Z",
"StartedAt": "2020-07-30T22:12:30.95668701Z",
"FinishedAt": "2020-08-30T20:11:29.358701Z",
"ExitCode": 3,
"Type": "NORMAL",
"Networks": [
{
Expand Down
113 changes: 1 addition & 112 deletions receiver/awsecscontainermetricsreceiver/testdata/task_stats.json
Expand Up @@ -264,116 +264,5 @@
"tx_bytes_per_sec": 55
}
},
"fffb51bc2ca1f0205be9579b893372e728cd3bf6823c006f417323565b8cb7d1": {
"read": "2020-07-31T06:47:31.383361599Z",
"preread": "2020-07-31T06:47:30.3812936Z",
"pids_stats": {
"current": 3
},
"blkio_stats": {
"io_service_bytes_recursive": [],
"io_serviced_recursive": [],
"io_queue_recursive": [],
"io_service_time_recursive": [],
"io_wait_time_recursive": [],
"io_merged_recursive": [],
"io_time_recursive": [],
"sectors_recursive": []
},
"num_procs": 0,
"storage_stats": {},
"cpu_stats": {
"cpu_usage": {
"total_usage": 419400720,
"percpu_usage": [
198234436,
221166284
],
"usage_in_kernelmode": 40000000,
"usage_in_usermode": 360000000
},
"system_cpu_usage": 61968880000000,
"online_cpus": 2,
"throttling_data": {
"periods": 0,
"throttled_periods": 0,
"throttled_time": 0
}
},
"precpu_stats": {
"cpu_usage": {
"total_usage": 419400720,
"percpu_usage": [
198234436,
221166284
],
"usage_in_kernelmode": 40000000,
"usage_in_usermode": 360000000
},
"system_cpu_usage": 61966870000000,
"online_cpus": 2,
"throttling_data": {
"periods": 0,
"throttled_periods": 0,
"throttled_time": 0
}
},
"memory_stats": {
"usage": 3305472,
"max_usage": 6369280,
"stats": {
"active_anon": 1953792,
"active_file": 57344,
"cache": 73728,
"dirty": 0,
"hierarchical_memory_limit": 134217728,
"hierarchical_memsw_limit": 268435456,
"inactive_anon": 4096,
"inactive_file": 12288,
"mapped_file": 4096,
"pgfault": 6799,
"pgmajfault": 0,
"pgpgin": 4187,
"pgpgout": 3692,
"rss": 1953792,
"rss_huge": 0,
"total_active_anon": 1953792,
"total_active_file": 57344,
"total_cache": 73728,
"total_dirty": 0,
"total_inactive_anon": 4096,
"total_inactive_file": 12288,
"total_mapped_file": 4096,
"total_pgfault": 6799,
"total_pgmajfault": 0,
"total_pgpgin": 4187,
"total_pgpgout": 3692,
"total_rss": 1953792,
"total_rss_huge": 0,
"total_unevictable": 0,
"total_writeback": 0,
"unevictable": 0,
"writeback": 0
},
"limit": 134217728
},
"name": "/ecs-three-nginx-1-nginx200-9ef593decba69cf7b501",
"id": "fffb51bc2ca1f0205be9579b893372e728cd3bf6823c006f417323565b8cb7d1",
"networks": {
"eth0": {
"rx_bytes": 2156,
"rx_packets": 28,
"rx_errors": 0,
"rx_dropped": 0,
"tx_bytes": 0,
"tx_packets": 0,
"tx_errors": 0,
"tx_dropped": 0
}
},
"network_rate_stats": {
"rx_bytes_per_sec": 10,
"tx_bytes_per_sec": 55
}
}
"fffb51bc2ca1f0205be9579b893372e728cd3bf6823c006f417323565b8cb7d1": {}
}

0 comments on commit 0358399

Please sign in to comment.