Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merge from master into opendistro-1.10 (#194)
Browse files Browse the repository at this point in the history
* Update PerformanceAnalyzerIT to work with opendistro-security

* Fix PerformanceAnalyzerIT causing build failure (#185)

* Fix PerformanceAnalyzerIT causing build failure

- Fix checkBit() logic in PerformanceAnalyzerClusterSettingHandler
- Remove unused imports from PerformanceAnalyzerIT
- Update PA artifacts to 1.10

* Update the Cache Hit, Miss and Eviction metrics to be emitted as delta increase/decrease (#174)

Update the Cache Hit, Miss and Eviction metrics to be emitted as delta increase/decrease.

Emitting the cache hit, miss and eviction metrics as delta rather than absolute values. ES reports these metrics from beginning of process/post an invalidateAll() event/ post cache clear. This change calculated the metrics count delta between each sampling to correctly reflect the increase/decrease in the metric count for the evaluation Interval (5seconds for us). This is done for FieldData, Shard Request and Node Query Cache.

Co-authored-by: Jindal <aditjind@3c22fbd31611.ant.amazon.com>
Co-authored-by: Aditya Jindal <aditjind@amazon.com>

Co-authored-by: Sid Narayan <sidnaray@amazon.com>
Co-authored-by: Aditya Jindal <adityajindal1194@gmail.com>
Co-authored-by: Jindal <aditjind@3c22fbd31611.ant.amazon.com>
Co-authored-by: Aditya Jindal <aditjind@amazon.com>
  • Loading branch information
5 people committed Aug 27, 2020
1 parent 7849650 commit 978118e
Show file tree
Hide file tree
Showing 9 changed files with 384 additions and 62 deletions.
34 changes: 33 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,15 @@ String rcaDir
String rcaArtifactsDir

static def propEnabled(property) {
return System.getProperty(property) != null
return System.getProperty(property) != null && System.getProperty(property).toLowerCase().equals("true")
}

// The following Gradle tasks are used to create a PA/RCA enabled Elasticsearch cluster
// Pass the -Dtests.enableIT property to Gradle to run ITs
/**
* cloneGitRepo clones the performance-analyzer-rca repo if the -Dtests.enableIT=true flag is passed
* to the Gradle JVM
*/
task cloneGitRepo(type: GitClone) {
rcaDir = Paths.get(getProject().getBuildDir().toString(), "performance-analyzer-rca").toString()
def destination = file(rcaDir)
Expand Down Expand Up @@ -273,6 +277,10 @@ bundlePlugin {
}
}

/**
* setupESCluster spins up a local 2 node ES cluster using the enableRca task in the performance-analyzer-rca
* repo. The performance-analyzer-rca repo is cloned as part of the cloneGitRepo task.
*/
task setupEsCluster() {
dependsOn(cloneGitRepo)
onlyIf = {
Expand All @@ -287,6 +295,30 @@ task setupEsCluster() {
}
}

/**
* integTestRunner is a task provided by the ES test framework, which allows us to spin up clients
* and test API calls against a local or remote Elasticsearch cluster.
*
* The simplest way to run this task in a way that "just works" is to invoke
* ./gradlew integTest -Dtests.enableIT=true -Dtests.useDockerCluster=true
* which will spin up a local 2 node ES cluster on your machine, then execute the test suite against it
*
* A full list of options is provided below. Check our gradle.properties file for the defaults for
* each of these properties.
*
* -Dtests.rest.cluster the Elasticsearch REST endpoint that test clients should hit
* -Dtests.cluster the Elasticsearch <a href="https://discuss.elastic.co/t/transport-client-vs-rest-client/13936">transport</a>
* endpoint that test clients should hit
* -Dtests.enableIT a flag to enable integration testing, by default this is false
* -Dtests.useDockerCluster if true, spin up a local 2 node cluster before executing tests
* NOTE: if you specify this, don't specify -Dtests.rest.cluster or -Dtests.cluster
* -Dtests.pa.port the port number of the PerformanceAnalyzer REST endpoint
* -Dtests.https either true or false, if true, then instantiate REST and transport clients using
* the https:// protocol and basic authentication via the -Dtests.user and -Dtests.password properties
* -Dtests.user the username of the admin user, this is used in conjunction with -Dtests.https and
* -Dtests.password to authenticate requests in the opendistro-security context
* -Dtests.password the password of the admin user specified by -Dtests.user
*/
integTestRunner {
onlyIf = {
propEnabled("tests.enableIT")
Expand Down
22 changes: 19 additions & 3 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,24 @@
systemProp.tests.rest.cluster=localhost:9200
# The Elasticsearch cluster node communication endpoint
systemProp.tests.cluster=localhost:9300

# The Elasticsearch cluster name for integ tests
systemProp.tests.clustername=IntegTestCluster
# Whether or not to spin up a new Elasticsearch cluster for integration testing
# Comment this out if you don't want a cluster spun up
systemProp.tests.useDockerCluster=

# Set this to true if you want a cluster spun up for integration testing
systemProp.tests.useDockerCluster=false

# Set this to true if you want to enable integration testing
systemProp.tests.enableIT=false

# The port number for the PerformanceAnalyzer WebService
systemProp.tests.pa.port=9600

# Whether or not to use https for REST and transport clients
systemProp.tests.https=false

# The username of the admin user (or any user able to auth requests against opendistro-security)
# NOTE: this only does something if tests.https is set to true
systemProp.tests.user=admin
# The password of the user specified above
systemProp.tests.password=admin
2 changes: 1 addition & 1 deletion licenses/performanceanalyzer-rca-1.10.jar.sha1
Original file line number Diff line number Diff line change
@@ -1 +1 @@
9c4d17774058b44a6f2ee6b9752c4f4254414784
ec2401715f3267a7ce97a971ae0270bfad6d0985
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.NodeIndicesStats;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.ESResources;
Expand All @@ -42,23 +43,44 @@
* on the performance of the node.
*/

/**
* currentShards: Contains the Mapping of the Shard ID to the Shard for the shards currently present on
* the cluster in this run of the collector.
* currentPerShardStats: Contains the mapping of the Shard Stats and the shards present in this run
* of the collector.
* prevPerShardStats: Contains the mapping of the Shard Stats and the shards present in the previous
* run of the collector.
* The diff is calculated between (currentPerShardStats and prevPerShardStats) for each shard in the
* currentShards and for shards not present in the prevPerShardStat absolute value of the
* currentPerShardStats is updated.
*/


@SuppressWarnings("unchecked")
public class NodeStatsAllShardsMetricsCollector extends PerformanceAnalyzerMetricsCollector implements MetricsProcessor {
public static final int SAMPLING_TIME_INTERVAL = MetricsConfiguration.CONFIG_MAP.get(
NodeStatsAllShardsMetricsCollector.class).samplingInterval;
private static final int KEYS_PATH_LENGTH = 2;
private static final Logger LOG = LogManager.getLogger(NodeStatsAllShardsMetricsCollector.class);
private HashMap<String, IndexShard> currentShards;
private HashMap<ShardId, IndexShard> currentShards;
private HashMap<ShardId, ShardStats> currentPerShardStats;
private HashMap<ShardId, ShardStats> prevPerShardStats;
private final PerformanceAnalyzerController controller;


public NodeStatsAllShardsMetricsCollector(final PerformanceAnalyzerController controller) {
super(SAMPLING_TIME_INTERVAL, "NodeStatsMetrics");
currentShards = new HashMap<>();
prevPerShardStats = new HashMap<>();
currentPerShardStats = new HashMap<>();
this.controller = controller;
}

private void populateCurrentShards() {
if (!currentShards.isEmpty()) {
prevPerShardStats.putAll(currentPerShardStats);
currentPerShardStats.clear();
}
currentShards.clear();
currentShards = Utils.getShards();
}
Expand Down Expand Up @@ -100,25 +122,28 @@ public void collectMetrics(long startTime) {

try {
populateCurrentShards();
// Metrics populated for all shards in every collection.
for (HashMap.Entry currentShard : currentShards.entrySet() ){
IndexShard currentIndexShard = (IndexShard)currentShard.getValue();
IndexShardStats currentIndexShardStats = Utils.indexShardStats(indicesService,
currentIndexShard, new CommonStatsFlags(CommonStatsFlags.Flag.QueryCache,
CommonStatsFlags.Flag.FieldData,
CommonStatsFlags.Flag.RequestCache));
for (ShardStats shardStats : currentIndexShardStats.getShards()) {
StringBuilder value = new StringBuilder();

value.append(PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds());
// Populate the result with cache specific metrics only.
value.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor)
.append(new NodeStatsMetricsAllShardsPerCollectionStatus(shardStats).serialize());
saveMetricValues(value.toString(), startTime, currentIndexShardStats.getShardId().getIndexName(),
String.valueOf(currentIndexShardStats.getShardId().id()));
populatePerShardStats(indicesService);

for (HashMap.Entry currentShard : currentPerShardStats.entrySet()) {
ShardId shardId = (ShardId) currentShard.getKey();
ShardStats currentShardStats = (ShardStats) currentShard.getValue();
if (prevPerShardStats.size() == 0) {
// Populating value for the first run.
populateMetricValue(currentShardStats, startTime, shardId.getIndexName(), shardId.id());
continue;
}
ShardStats prevShardStats = prevPerShardStats.get(shardId);
if (prevShardStats == null) {
// Populate value for shards which are new and were not present in the previous run.
populateMetricValue(currentShardStats, startTime, shardId.getIndexName(), shardId.id());
continue;
}
NodeStatsMetricsAllShardsPerCollectionStatus prevValue = new
NodeStatsMetricsAllShardsPerCollectionStatus(prevShardStats);
NodeStatsMetricsAllShardsPerCollectionStatus currValue = new
NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats);
populateDiffMetricValue(prevValue, currValue, startTime, shardId.getIndexName(), shardId.id());
}

} catch (Exception ex) {
LOG.debug("Exception in Collecting NodesStats Metrics: {} for startTime {} with ExceptionCode: {}",
() -> ex.toString(), () -> startTime, () -> StatExceptionCode.NODESTATS_COLLECTION_ERROR.toString());
Expand All @@ -133,6 +158,55 @@ Field getNodeIndicesStatsByShardField() throws Exception {
return field;
}

public void populatePerShardStats(IndicesService indicesService) {

//Populate the shard stats per shard.
for (HashMap.Entry currentShard : currentShards.entrySet() ){
IndexShard currentIndexShard = (IndexShard)currentShard.getValue();
IndexShardStats currentIndexShardStats = Utils.indexShardStats(indicesService,
currentIndexShard, new CommonStatsFlags(CommonStatsFlags.Flag.QueryCache,
CommonStatsFlags.Flag.FieldData,
CommonStatsFlags.Flag.RequestCache));
for (ShardStats shardStats : currentIndexShardStats.getShards()) {
currentPerShardStats.put(currentIndexShardStats.getShardId(), shardStats);
}
}
return;
}

public void populateMetricValue(ShardStats shardStats, long startTime, String IndexName, int ShardId) {
StringBuilder value = new StringBuilder();
value.append(PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds());
// Populate the result with cache specific metrics only.
value.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor)
.append(new NodeStatsMetricsAllShardsPerCollectionStatus(shardStats).serialize());
saveMetricValues(value.toString(), startTime, IndexName,
String.valueOf(ShardId));
}

public void populateDiffMetricValue(NodeStatsMetricsAllShardsPerCollectionStatus prevValue,
NodeStatsMetricsAllShardsPerCollectionStatus currValue,
long startTime, String IndexName, int ShardId) {
StringBuilder value = new StringBuilder();

NodeStatsMetricsAllShardsPerCollectionStatus nodeStatsMetrics = new NodeStatsMetricsAllShardsPerCollectionStatus(
Math.max((currValue.queryCacheHitCount - prevValue.queryCacheHitCount), 0),
Math.max((currValue.queryCacheMissCount - prevValue.queryCacheMissCount), 0),
Math.max((currValue.queryCacheInBytes - prevValue.queryCacheInBytes), 0),
Math.max((currValue.fieldDataEvictions - prevValue.fieldDataEvictions), 0),
Math.max((currValue.fieldDataInBytes - prevValue.fieldDataInBytes), 0),
Math.max((currValue.requestCacheHitCount - prevValue.requestCacheHitCount), 0),
Math.max((currValue.requestCacheMissCount - prevValue.requestCacheMissCount), 0),
Math.max((currValue.requestCacheEvictions - prevValue.requestCacheEvictions), 0),
Math.max((currValue.requestCacheInBytes - prevValue.requestCacheInBytes), 0));

value.append(PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds())
.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor)
.append(nodeStatsMetrics.serialize());
saveMetricValues(value.toString(), startTime, IndexName,
String.valueOf(ShardId));
}

public class NodeStatsMetricsAllShardsPerCollectionStatus extends MetricStatus {

@JsonIgnore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.NodeIndicesStats;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.ESResources;
Expand All @@ -51,8 +52,8 @@ public class NodeStatsFixedShardsMetricsCollector extends PerformanceAnalyzerMet
NodeStatsAllShardsMetricsCollector.class).samplingInterval;
private static final int KEYS_PATH_LENGTH = 2;
private static final Logger LOG = LogManager.getLogger(NodeStatsFixedShardsMetricsCollector.class);
private HashMap<String, IndexShard> currentShards;
private Iterator<HashMap.Entry<String, IndexShard>> currentShardsIter;
private HashMap<ShardId, IndexShard> currentShards;
private Iterator<HashMap.Entry<ShardId, IndexShard>> currentShardsIter;
private final PerformanceAnalyzerController controller;

public NodeStatsFixedShardsMetricsCollector(final PerformanceAnalyzerController controller) {
Expand Down Expand Up @@ -105,7 +106,7 @@ private void populateCurrentShards() {
} };

private long getIndexBufferBytes(ShardStats shardStats) {
IndexShard shard = currentShards.get(Utils.getUniqueShardIdKey(shardStats.getShardRouting().shardId()));
IndexShard shard = currentShards.get(shardStats.getShardRouting().shardId());

if (shard == null) {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ public class PerformanceAnalyzerClusterSettingHandler implements ClusterSettingL
private static final int BIT_ONE = 1;
private static final int CLUSTER_SETTING_DISABLED_VALUE = 0;
private static final int ENABLED_VALUE = 1;
private static final int MAX_ALLOWED_BIT_POS = Math.min(PerformanceAnalyzerFeatureBits.values().length, Integer.SIZE - 1);
private static final int RCA_ENABLED_BIT_POS = PerformanceAnalyzerFeatureBits.RCA_BIT.ordinal();
private static final int PA_ENABLED_BIT_POS = PerformanceAnalyzerFeatureBits.PA_BIT.ordinal();
private static final int LOGGING_ENABLED_BIT_POS = PerformanceAnalyzerFeatureBits.LOGGING_BIT.ordinal();
private static final int MAX_ALLOWED_BIT_POS = Math.min(PerformanceAnalyzerFeatureBits.values().length, Integer.SIZE - 1);

private final PerformanceAnalyzerController controller;
private final ClusterSettingsManager clusterSettingsManager;
Expand Down Expand Up @@ -219,7 +219,7 @@ private int resetBit(int number, int bitPosition) {
* @param bitPosition The position of the bit in the clusterSettingValue
* @return true if the bit is set, false otherwise.
*/
private boolean checkBit(int clusterSettingValue, int bitPosition) {
return ((bitPosition < MAX_ALLOWED_BIT_POS) & (clusterSettingValue & (1 << bitPosition)) == ENABLED_VALUE);
public static boolean checkBit(int clusterSettingValue, int bitPosition) {
return ((bitPosition < MAX_ALLOWED_BIT_POS) && (clusterSettingValue & (1 << bitPosition)) > 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,23 +85,19 @@ public static IndexShardStats indexShardStats(final IndicesService indicesServic
});
}

public static HashMap<String, IndexShard> getShards() {
HashMap<String, IndexShard> shards = new HashMap<>();
public static HashMap<ShardId, IndexShard> getShards() {
HashMap<ShardId, IndexShard> shards = new HashMap<>();
Iterator<IndexService> indexServices = ESResources.INSTANCE.getIndicesService().iterator();
while (indexServices.hasNext()) {
Iterator<IndexShard> indexShards = indexServices.next().iterator();
while (indexShards.hasNext()) {
IndexShard shard = indexShards.next();
shards.put(getUniqueShardIdKey(shard.shardId()), shard);
shards.put(shard.shardId(), shard);
}
}
return shards;
}

public static String getUniqueShardIdKey(ShardId shardId) {
return "[" + shardId.hashCode() + "][" + shardId.getId() + "]";
}

public static final EnumSet<IndexShardState> CAN_WRITE_INDEX_BUFFER_STATES = EnumSet.of(
IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED);

Expand Down

0 comments on commit 978118e

Please sign in to comment.