Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Support ES version 7.8.0 (#246)
Browse files Browse the repository at this point in the history
* Support ES version 7.8.0

* Bump ODFE version to 1.9.0
* Bump Elasticsearch version to 7.8.0
* Bump Gradle version to 6.5
* Changes to make compatible with Elasticsearch 7.8.0
* Fix jacocoagent path to work with gradle 6.5 for integration tests.

* Add release notes for ODFE 1.9.0

* Updating Release notes, rebasing on snapshot fixes

* Release notes for set index priority action
  • Loading branch information
setiah committed Jun 25, 2020
1 parent c011253 commit 49902de
Show file tree
Hide file tree
Showing 28 changed files with 104 additions and 87 deletions.
2 changes: 1 addition & 1 deletion build-tools/esplugin-coverage.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ allprojects{
jacocoTestReport.dependsOn integTest.runner

testClusters.integTest {
jvmArgs " ${dummyIntegTest.jacoco.getAsJvmArg()}"
jvmArgs " ${dummyIntegTest.jacoco.getAsJvmArg()}".replace('javaagent:','javaagent:/')
systemProperty 'com.sun.management.jmxremote', "true"
systemProperty 'com.sun.management.jmxremote.authenticate', "false"
systemProperty 'com.sun.management.jmxremote.port', "7777"
Expand Down
7 changes: 4 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

buildscript {
ext {
es_version = System.getProperty("es.version", "7.7.0")
es_version = System.getProperty("es.version", "7.8.0")
kotlin_version = System.getProperty("kotlin.version", "1.3.72")
}

Expand Down Expand Up @@ -79,12 +79,12 @@ configurations.testCompile {

dependencies {
compileOnly "org.elasticsearch:elasticsearch:${es_version}"
compileOnly "com.amazon.opendistroforelasticsearch:opendistro-job-scheduler-spi:1.8.0.0"
compileOnly "com.amazon.opendistroforelasticsearch:opendistro-job-scheduler-spi:1.9.0.0"
compile "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
compile "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.2.1'
compile "org.jetbrains:annotations:13.0"
compile "com.amazon.opendistroforelasticsearch:notification:1.8.0.0"
compile "com.amazon.opendistroforelasticsearch:notification:1.9.0.0"

testCompile "org.elasticsearch.test:framework:${es_version}"
testCompile "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
Expand Down Expand Up @@ -125,6 +125,7 @@ javadoc.enabled = false // turn off javadoc as it barfs on Kotlin code
licenseHeaders.enabled = true
dependencyLicenses.enabled = false
thirdPartyAudit.enabled = false
validateNebulaPom.enabled = false

def es_tmp_dir = rootProject.file('build/private/es_tmp').absoluteFile
es_tmp_dir.mkdirs()
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# permissions and limitations under the License.
#

version = 1.8.0
version = 1.9.0
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#Tue May 19 22:58:14 PDT 2020
distributionUrl=https\://services.gradle.org/distributions/gradle-6.4-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-6.5-all.zip
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStorePath=wrapper/dists
Expand Down
14 changes: 14 additions & 0 deletions release-notes/opendistro-elasticsearch-index-management-1.9.0.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@

## Version 1.9.0.0 (2020-6-25)

Compatible with Elasticsearch 7.8.0, Adds support for ODFE 1.9.0

### New Features
* Adds support for Elasticsearch 7.8.0 [PR #246](https://github.com/opendistro-for-elasticsearch/index-management/pull/246)

### Enhancement
* Implement set index priority action [PR #241](https://github.com/opendistro-for-elasticsearch/index-management/pull/241)

### Bug Fixes
* Fixes snapshot bugs [PR #244](https://github.com/opendistro-for-elasticsearch/index-management/pull/244)

Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class IndexStateManagementHistory(
.mapping(_DOC, IndexStateManagementIndices.indexStateManagementHistoryMappings, XContentType.JSON)
request.addMaxIndexDocsCondition(historyMaxDocs)
request.addMaxIndexAgeCondition(historyMaxAge)
val response = client.admin().indices().rolloversIndex(request).actionGet()
val response = client.admin().indices().rolloverIndex(request).actionGet()
if (!response.isRolledOver) {
logger.info("${IndexStateManagementIndices.HISTORY_WRITE_INDEX_ALIAS} not rolled over. Conditions were: ${response.conditionStatus}")
}
Expand All @@ -135,13 +135,13 @@ class IndexStateManagementHistory(
val clusterStateRequest = ClusterStateRequest()
.clear()
.indices(IndexStateManagementIndices.HISTORY_ALL)
.metaData(true)
.metadata(true)
.local(true)
.indicesOptions(IndicesOptions.strictExpand())

val clusterStateResponse = client.admin().cluster().state(clusterStateRequest).actionGet()

for (entry in clusterStateResponse.state.metaData.indices()) {
for (entry in clusterStateResponse.state.metadata.indices()) {
val indexMetaData = entry.value
val creationTime = indexMetaData.creationDate

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class IndexStateManagementIndices(
/**
* ============== History =============
*/
fun indexStateManagementIndexHistoryExists(): Boolean = clusterService.state().metaData.hasAlias(HISTORY_WRITE_INDEX_ALIAS)
fun indexStateManagementIndexHistoryExists(): Boolean = clusterService.state().metadata.hasAlias(HISTORY_WRITE_INDEX_ALIAS)

suspend fun initHistoryIndex() {
if (!indexStateManagementIndexHistoryExists())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import org.elasticsearch.env.Environment
import org.elasticsearch.env.NodeEnvironment
import org.elasticsearch.plugins.ActionPlugin
import org.elasticsearch.plugins.Plugin
import org.elasticsearch.repositories.RepositoriesService
import org.elasticsearch.rest.RestController
import org.elasticsearch.rest.RestHandler
import org.elasticsearch.script.ScriptService
Expand Down Expand Up @@ -139,7 +140,8 @@ internal class IndexStateManagementPlugin : JobSchedulerExtension, ActionPlugin,
environment: Environment,
nodeEnvironment: NodeEnvironment,
namedWriteableRegistry: NamedWriteableRegistry,
indexNameExpressionResolver: IndexNameExpressionResolver
indexNameExpressionResolver: IndexNameExpressionResolver,
repositoriesServiceSupplier: Supplier<RepositoriesService>
): Collection<Any> {
val settings = environment.settings()
this.clusterService = clusterService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ class ManagedIndexCoordinator(
if (!event.localNodeMaster()) return

// TODO: Look into event.isNewCluster, can we return early if true?
if (event.isNewCluster) { }
// if (event.isNewCluster) { }

if (!event.metaDataChanged()) return
if (!event.metadataChanged()) return

launch { sweepClusterChangedEvent(event) }
}
Expand Down Expand Up @@ -200,7 +200,7 @@ class ManagedIndexCoordinator(
private suspend fun reenableJobs() {
val clusterStateRequest = ClusterStateRequest()
.clear()
.metaData(true)
.metadata(true)
.local(false)
.indices("*")
.indicesOptions(IndicesOptions.strictExpand())
Expand All @@ -214,7 +214,7 @@ class ManagedIndexCoordinator(
* 2. Does not have a completed Policy
* 3. Does not have a failed Policy
*/
val updateManagedIndicesRequests: List<DocWriteRequest<*>> = response.state.metaData.indices.mapNotNull {
val updateManagedIndicesRequests: List<DocWriteRequest<*>> = response.state.metadata.indices.mapNotNull {
val managedIndexMetaData = it.value.getManagedIndexMetaData()
if (!(managedIndexMetaData == null || managedIndexMetaData.isPolicyCompleted || managedIndexMetaData.isFailed)) {
updateEnableManagedIndexRequest(it.value.indexUUID)
Expand All @@ -231,7 +231,7 @@ class ManagedIndexCoordinator(
@OpenForTesting
suspend fun sweepClusterChangedEvent(event: ClusterChangedEvent) {
val indicesDeletedRequests = event.indicesDeleted()
.filter { event.previousState().metaData().index(it)?.getPolicyID() != null }
.filter { event.previousState().metadata().index(it)?.getPolicyID() != null }
.map { deleteManagedIndexRequest(it.uuid) }

/*
Expand All @@ -245,8 +245,8 @@ class ManagedIndexCoordinator(
var hasCreateRequests = false
val updateManagedIndicesRequests = mutableListOf<DocWriteRequest<*>>()
val indicesToRemoveManagedIndexMetaDataFrom = mutableListOf<Index>()
event.state().metaData().indices().forEach {
val previousIndexMetaData = event.previousState().metaData().index(it.value.index)
event.state().metadata().indices().forEach {
val previousIndexMetaData = event.previousState().metadata().index(it.value.index)
val policyID = it.value.getPolicyID()
val request: DocWriteRequest<*>? = when {
it.value.shouldCreateManagedIndexConfig(previousIndexMetaData) && policyID != null -> {
Expand Down Expand Up @@ -373,7 +373,7 @@ class ManagedIndexCoordinator(
*/
@OpenForTesting
fun sweepClusterState(clusterState: ClusterState): Map<String, ClusterStateManagedIndexConfig> {
return clusterState.metaData().indices().values()
return clusterState.metadata().indices().values()
.mapNotNull {
val clusterConfig = it.value.getClusterStateManagedIndexConfig()
clusterConfig?.run {
Expand Down Expand Up @@ -412,7 +412,7 @@ class ManagedIndexCoordinator(
/** Returns a list of [Index]es that need to have their [ManagedIndexMetaData] removed. */
@OpenForTesting
fun getIndicesToDeleteManagedIndexMetaDataFrom(clusterState: ClusterState): List<Index> {
return clusterState.metaData().indices().values().mapNotNull {
return clusterState.metadata().indices().values().mapNotNull {
if (it.value.shouldDeleteManagedIndexMetaData()) it.value.index else null
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ import org.elasticsearch.client.Client
import org.elasticsearch.cluster.block.ClusterBlockException
import org.elasticsearch.cluster.health.ClusterHealthStatus
import org.elasticsearch.cluster.health.ClusterStateHealth
import org.elasticsearch.cluster.metadata.IndexMetaData
import org.elasticsearch.cluster.metadata.IndexMetadata
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.unit.TimeValue
Expand Down Expand Up @@ -678,19 +678,19 @@ object ManagedIndexRunner : ScheduledJobRunner,

private fun clusterIsRed(): Boolean = ClusterStateHealth(clusterService.state()).status == ClusterHealthStatus.RED

private suspend fun getIndexMetaData(index: String): IndexMetaData? {
var indexMetaData: IndexMetaData? = null
private suspend fun getIndexMetaData(index: String): IndexMetadata? {
var indexMetaData: IndexMetadata? = null
try {
val clusterStateRequest = ClusterStateRequest()
.clear()
.indices(index)
.metaData(true)
.metadata(true)
.local(false)
.indicesOptions(IndicesOptions.strictExpand())

val response: ClusterStateResponse = client.admin().cluster().suspendUntil { state(clusterStateRequest, it) }

indexMetaData = response.state.metaData.indices.firstOrNull()?.value
indexMetaData = response.state.metadata.indices.firstOrNull()?.value
if (indexMetaData == null) {
logger.error("Could not find IndexMetaData in master cluster state for $index")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.elasticsearch.ElasticsearchException
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.bulk.BackoffPolicy
import org.elasticsearch.client.ElasticsearchClient
import org.elasticsearch.cluster.metadata.IndexMetaData
import org.elasticsearch.cluster.metadata.IndexMetadata
import org.elasticsearch.common.bytes.BytesReference
import org.elasticsearch.common.xcontent.ToXContent
import org.elasticsearch.common.xcontent.XContentBuilder
Expand Down Expand Up @@ -150,23 +150,23 @@ suspend fun <T> LockService.suspendUntil(block: LockService.(ActionListener<T>)
* @param previousIndexMetaData the previous [IndexMetaData].
* @return whether a [ManagedIndexConfig] should be created.
*/
fun IndexMetaData.shouldCreateManagedIndexConfig(previousIndexMetaData: IndexMetaData?): Boolean {
fun IndexMetadata.shouldCreateManagedIndexConfig(previousIndexMetaData: IndexMetadata?): Boolean {
if (this.getPolicyID() == null) return false

return previousIndexMetaData?.getPolicyID() == null
}

/**
* Compares current and previous IndexMetaData to determine if we should delete [ManagedIndexConfig].
* Compares current and previous IndexMetadata to determine if we should delete [ManagedIndexConfig].
*
* If the previous IndexMetaData is null or its [getPolicyID] returns null then there should
* If the previous IndexMetadata is null or its [getPolicyID] returns null then there should
* be no [ManagedIndexConfig] to delete. Else if the current [getPolicyID] returns null
* then it means we should delete the existing [ManagedIndexConfig].
*
* @param previousIndexMetaData the previous [IndexMetaData].
* @param previousIndexMetaData the previous [IndexMetadata].
* @return whether a [ManagedIndexConfig] should be deleted.
*/
fun IndexMetaData.shouldDeleteManagedIndexConfig(previousIndexMetaData: IndexMetaData?): Boolean {
fun IndexMetadata.shouldDeleteManagedIndexConfig(previousIndexMetaData: IndexMetadata?): Boolean {
if (previousIndexMetaData?.getPolicyID() == null) return false

return this.getPolicyID() == null
Expand All @@ -178,13 +178,13 @@ fun IndexMetaData.shouldDeleteManagedIndexConfig(previousIndexMetaData: IndexMet
* If [getPolicyID] returns null but [ManagedIndexMetaData] is not null then the policy was removed and
* the [ManagedIndexMetaData] remains and should be removed.
*/
fun IndexMetaData.shouldDeleteManagedIndexMetaData(): Boolean =
fun IndexMetadata.shouldDeleteManagedIndexMetaData(): Boolean =
this.getPolicyID() == null && this.getManagedIndexMetaData() != null

/**
* Returns the current policy_id if it exists and is valid otherwise returns null.
* */
fun IndexMetaData.getPolicyID(): String? {
fun IndexMetadata.getPolicyID(): String? {
if (this.settings.get(ManagedIndexSettings.POLICY_ID.key).isNullOrBlank()) return null

return this.settings.get(ManagedIndexSettings.POLICY_ID.key)
Expand All @@ -193,13 +193,13 @@ fun IndexMetaData.getPolicyID(): String? {
/**
* Returns the current rollover_alias if it exists otherwise returns null.
* */
fun IndexMetaData.getRolloverAlias(): String? {
fun IndexMetadata.getRolloverAlias(): String? {
if (this.settings.get(ManagedIndexSettings.ROLLOVER_ALIAS.key).isNullOrBlank()) return null

return this.settings.get(ManagedIndexSettings.ROLLOVER_ALIAS.key)
}

fun IndexMetaData.getClusterStateManagedIndexConfig(): ClusterStateManagedIndexConfig? {
fun IndexMetadata.getClusterStateManagedIndexConfig(): ClusterStateManagedIndexConfig? {
val index = this.index.name
val uuid = this.index.uuid
val policyID = this.getPolicyID()
Expand All @@ -209,7 +209,7 @@ fun IndexMetaData.getClusterStateManagedIndexConfig(): ClusterStateManagedIndexC
return ClusterStateManagedIndexConfig(index = index, uuid = uuid, policyID = policyID)
}

fun IndexMetaData.getManagedIndexMetaData(): ManagedIndexMetaData? {
fun IndexMetadata.getManagedIndexMetaData(): ManagedIndexMetaData? {
val existingMetaDataMap = this.getCustomData(ManagedIndexMetaData.MANAGED_INDEX_METADATA)

if (existingMetaDataMap != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.node.NodeClient
import org.elasticsearch.cluster.ClusterState
import org.elasticsearch.cluster.block.ClusterBlockException
import org.elasticsearch.cluster.metadata.IndexMetaData
import org.elasticsearch.cluster.metadata.IndexMetadata
import org.elasticsearch.common.Strings
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.unit.TimeValue
Expand Down Expand Up @@ -84,7 +84,7 @@ class RestAddPolicyAction : BaseRestHandler() {
val clusterStateRequest = ClusterStateRequest()
.clear()
.indices(*indices)
.metaData(true)
.metadata(true)
.local(false)
.waitForTimeout(TimeValue.timeValueMillis(ADD_POLICY_TIMEOUT_IN_MILLIS))
.indicesOptions(strictExpandOptions)
Expand Down Expand Up @@ -166,7 +166,7 @@ class RestAddPolicyAction : BaseRestHandler() {
}

private fun populateLists(state: ClusterState) {
for (indexMetaDataEntry in state.metaData.indices) {
for (indexMetaDataEntry in state.metadata.indices) {
val indexMetaData = indexMetaDataEntry.value
when {
indexMetaData.getPolicyID() != null ->
Expand All @@ -177,7 +177,7 @@ class RestAddPolicyAction : BaseRestHandler() {
"This index already has a policy, use the update policy API to update index policies"
)
)
indexMetaData.state == IndexMetaData.State.CLOSE ->
indexMetaData.state == IndexMetadata.State.CLOSE ->
failedIndices.add(FailedIndex(indexMetaData.index.name, indexMetaData.index.uuid, "This index is closed"))
else -> indicesToAddPolicyTo.add(indexMetaData.index)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class RestChangePolicyAction(val clusterService: ClusterService) : BaseRestHandl
val clusterStateRequest = ClusterStateRequest()
.clear()
.indices(*indices)
.metaData(true)
.metadata(true)
.local(false)
.indicesOptions(IndicesOptions.strictExpand())

Expand All @@ -155,7 +155,7 @@ class RestChangePolicyAction(val clusterService: ClusterService) : BaseRestHandl

private fun processResponse(response: ClusterStateResponse) {
val includedStates = changePolicy.include.map { it.state }.toSet()
response.state.metaData.indices.forEach {
response.state.metadata.indices.forEach {
val indexMetaData = it.value
val currentState = indexMetaData.getManagedIndexMetaData()?.stateMetaData?.name
if (currentState != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class RestExplainAction : BaseRestHandler() {

clusterStateRequest.clear()
.indices(*indices)
.metaData(true)
.metadata(true)
.local(false)
.local(request.paramAsBoolean("local", clusterStateRequest.local()))
.masterNodeTimeout(request.paramAsTime("master_timeout", clusterStateRequest.masterNodeTimeout()))
Expand All @@ -82,7 +82,7 @@ class RestExplainAction : BaseRestHandler() {
val state = clusterStateResponse.state

builder.startObject()
for (indexMetadataEntry in state.metaData.indices) {
for (indexMetadataEntry in state.metadata.indices) {
builder.startObject(indexMetadataEntry.key)
val indexMetadata = indexMetadataEntry.value
val managedIndexMetaDataMap = indexMetadata.getCustomData(ManagedIndexMetaData.MANAGED_INDEX_METADATA)
Expand Down

0 comments on commit 49902de

Please sign in to comment.