diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt index fc6902ff3..ed931d2a3 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt @@ -157,7 +157,7 @@ class RollupMapperService( } else { val errorMessage = "Failed to create target index [$targetIndexResolvedName]" return try { - val response = createTargetIndex(targetIndexResolvedName, hasLegacyPlugin) + val response = createTargetIndex(targetIndexResolvedName, job.targetIndexSettings, hasLegacyPlugin) if (response.isAcknowledged) { updateRollupIndexMappings(job, targetIndexResolvedName) } else { @@ -228,13 +228,17 @@ class RollupMapperService( return RollupJobValidationResult.Valid } - private suspend fun createTargetIndex(targetIndexName: String, hasLegacyPlugin: Boolean): CreateIndexResponse { - val settings = - if (hasLegacyPlugin) { - Settings.builder().put(LegacyOpenDistroRollupSettings.ROLLUP_INDEX.key, true).build() + private suspend fun createTargetIndex(targetIndexName: String, targetIndexSettings: Settings?, hasLegacyPlugin: Boolean): CreateIndexResponse { + val settings = Settings.builder().apply { + targetIndexSettings?.let { put(it) } + val rollupIndexSetting = if (hasLegacyPlugin) { + LegacyOpenDistroRollupSettings.ROLLUP_INDEX } else { - Settings.builder().put(RollupSettings.ROLLUP_INDEX.key, true).build() + RollupSettings.ROLLUP_INDEX } + put(rollupIndexSetting.key, true) + }.build() + val request = CreateIndexRequest(targetIndexName) .settings(settings) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/ISMRollup.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/ISMRollup.kt index f6a94982d..024900021 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/ISMRollup.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/ISMRollup.kt @@ -6,6 +6,8 @@ package org.opensearch.indexmanagement.rollup.model import org.apache.commons.codec.digest.DigestUtils +import org.opensearch.Version +import org.opensearch.common.settings.Settings import org.opensearch.commons.authuser.User import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput @@ -29,6 +31,7 @@ import java.time.temporal.ChronoUnit data class ISMRollup( val description: String, val targetIndex: String, + val targetIndexSettings: Settings?, val pageSize: Int, val dimensions: List, val metrics: List, @@ -55,6 +58,11 @@ data class ISMRollup( .field(Rollup.PAGE_SIZE_FIELD, pageSize) .field(Rollup.DIMENSIONS_FIELD, dimensions) .field(Rollup.METRICS_FIELD, metrics) + if (targetIndexSettings != null) { + builder.startObject(Rollup.TARGET_INDEX_SETTINGS_FIELD) + targetIndexSettings.toXContent(builder, params) + builder.endObject() + } builder.endObject() return builder } @@ -74,6 +82,7 @@ data class ISMRollup( description = this.description, sourceIndex = sourceIndex, targetIndex = this.targetIndex, + targetIndexSettings = this.targetIndexSettings, metadataID = null, pageSize = pageSize, delay = null, @@ -88,6 +97,11 @@ data class ISMRollup( constructor(sin: StreamInput) : this( description = sin.readString(), targetIndex = sin.readString(), + targetIndexSettings = if (sin.version.onOrAfter(Version.V_3_0_0) && sin.readBoolean()) { + Settings.readSettingsFromStream(sin) + } else { + null + }, pageSize = sin.readInt(), dimensions = sin.let { @@ -111,6 +125,7 @@ data class ISMRollup( override fun toString(): String { val sb = StringBuffer() sb.append(targetIndex) + sb.append(targetIndexSettings) sb.append(pageSize) dimensions.forEach { sb.append(it.type) @@ -129,6 +144,10 @@ data class ISMRollup( override fun writeTo(out: StreamOutput) { out.writeString(description) out.writeString(targetIndex) + if (out.version.onOrAfter(Version.V_3_0_0)) { + out.writeBoolean(targetIndexSettings != null) + if (targetIndexSettings != null) Settings.writeSettingsToStream(targetIndexSettings, out) + } out.writeInt(pageSize) out.writeVInt(dimensions.size) for (dimension in dimensions) { @@ -151,6 +170,7 @@ data class ISMRollup( ): ISMRollup { var description = "" var targetIndex = "" + var targetIndexSettings: Settings? = null var pageSize = 0 val dimensions = mutableListOf() val metrics = mutableListOf() @@ -164,6 +184,14 @@ data class ISMRollup( when (fieldName) { Rollup.DESCRIPTION_FIELD -> description = xcp.text() Rollup.TARGET_INDEX_FIELD -> targetIndex = xcp.text() + Rollup.TARGET_INDEX_SETTINGS_FIELD -> { + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_OBJECT, + xcp.currentToken(), + xcp, + ) + targetIndexSettings = Settings.fromXContent(xcp) + } Rollup.PAGE_SIZE_FIELD -> pageSize = xcp.intValue() Rollup.DIMENSIONS_FIELD -> { XContentParserUtils.ensureExpectedToken( @@ -195,6 +223,7 @@ data class ISMRollup( dimensions = dimensions, metrics = metrics, targetIndex = targetIndex, + targetIndexSettings = targetIndexSettings, ) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt index d98274cb6..8eefaa52b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt @@ -5,6 +5,9 @@ package org.opensearch.indexmanagement.rollup.model +import org.opensearch.Version +import org.opensearch.common.settings.IndexScopedSettings +import org.opensearch.common.settings.Settings import org.opensearch.commons.authuser.User import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput @@ -47,6 +50,7 @@ data class Rollup( val description: String, val sourceIndex: String, val targetIndex: String, + val targetIndexSettings: Settings?, val metadataID: String?, @Deprecated("Will be ignored, to check the roles use user field") val roles: List = listOf(), val pageSize: Int, @@ -87,6 +91,9 @@ data class Rollup( } } require(sourceIndex != targetIndex) { "Your source and target index cannot be the same" } + if (targetIndexSettings != null) { + IndexScopedSettings(null, IndexScopedSettings.BUILT_IN_INDEX_SETTINGS).validate(targetIndexSettings, true) + } require(dimensions.filter { it.type == Dimension.Type.DATE_HISTOGRAM }.size == 1) { "Must specify precisely one date histogram dimension" // this covers empty dimensions case too } @@ -129,6 +136,11 @@ data class Rollup( description = sin.readString(), sourceIndex = sin.readString(), targetIndex = sin.readString(), + targetIndexSettings = if (sin.getVersion().onOrAfter(Version.V_3_0_0) && sin.readBoolean()) { + Settings.readSettingsFromStream(sin) + } else { + null + }, metadataID = sin.readOptionalString(), roles = sin.readStringArray().toList(), pageSize = sin.readInt(), @@ -177,6 +189,11 @@ data class Rollup( .field(CONTINUOUS_FIELD, continuous) .field(DIMENSIONS_FIELD, dimensions.toTypedArray()) .field(RollupMetrics.METRICS_FIELD, metrics.toTypedArray()) + if (targetIndexSettings != null) { + builder.startObject(TARGET_INDEX_SETTINGS_FIELD) + targetIndexSettings.toXContent(builder, params) + builder.endObject() + } if (params.paramAsBoolean(WITH_USER, true)) builder.optionalUserField(USER_FIELD, user) if (params.paramAsBoolean(WITH_TYPE, true)) builder.endObject() builder.endObject() @@ -200,6 +217,10 @@ data class Rollup( out.writeString(description) out.writeString(sourceIndex) out.writeString(targetIndex) + if (out.version.onOrAfter(Version.V_3_0_0)) { + out.writeBoolean(targetIndexSettings != null) + if (targetIndexSettings != null) Settings.writeSettingsToStream(targetIndexSettings, out) + } out.writeOptionalString(metadataID) out.writeStringArray(emptyList().toTypedArray()) out.writeInt(pageSize) @@ -237,6 +258,7 @@ data class Rollup( const val DESCRIPTION_FIELD = "description" const val SOURCE_INDEX_FIELD = "source_index" const val TARGET_INDEX_FIELD = "target_index" + const val TARGET_INDEX_SETTINGS_FIELD = "target_index_settings" const val METADATA_ID_FIELD = "metadata_id" const val ROLES_FIELD = "roles" const val PAGE_SIZE_FIELD = "page_size" @@ -275,6 +297,7 @@ data class Rollup( var description: String? = null var sourceIndex: String? = null var targetIndex: String? = null + var targetIndexSettings: Settings? = null var metadataID: String? = null var pageSize: Int? = null var delay: Long? = null @@ -301,6 +324,10 @@ data class Rollup( DESCRIPTION_FIELD -> description = xcp.text() SOURCE_INDEX_FIELD -> sourceIndex = xcp.text() TARGET_INDEX_FIELD -> targetIndex = xcp.text() + TARGET_INDEX_SETTINGS_FIELD -> { + ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) + targetIndexSettings = Settings.fromXContent(xcp) + } METADATA_ID_FIELD -> metadataID = xcp.textOrNull() ROLES_FIELD -> { // Parsing but not storing the field, deprecated @@ -357,6 +384,7 @@ data class Rollup( description = requireNotNull(description) { "Rollup description is null" }, sourceIndex = requireNotNull(sourceIndex) { "Rollup source index is null" }, targetIndex = requireNotNull(targetIndex) { "Rollup target index is null" }, + targetIndexSettings = targetIndexSettings, metadataID = metadataID, pageSize = requireNotNull(pageSize) { "Rollup page size is null" }, delay = delay, diff --git a/src/main/resources/mappings/opendistro-ism-config.json b/src/main/resources/mappings/opendistro-ism-config.json index 21add2421..42d5f24d7 100644 --- a/src/main/resources/mappings/opendistro-ism-config.json +++ b/src/main/resources/mappings/opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 22 + "schema_version": 23 }, "dynamic": "strict", "properties": { @@ -380,6 +380,10 @@ } } }, + "target_index_settings": { + "dynamic": "true", + "type": "object" + }, "page_size": { "type": "long" }, @@ -1004,6 +1008,10 @@ } } }, + "target_index_settings": { + "dynamic": "true", + "type": "object" + }, "page_size": { "type": "long" }, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt index 9336096e6..44d5c60f6 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt @@ -41,7 +41,7 @@ import javax.management.remote.JMXConnectorFactory import javax.management.remote.JMXServiceURL abstract class IndexManagementRestTestCase : ODFERestTestCase() { - val configSchemaVersion = 22 + val configSchemaVersion = 23 val historySchemaVersion = 7 // Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexStateManagementSecurityBehaviorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexStateManagementSecurityBehaviorIT.kt index 806e0299d..b712e8934 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexStateManagementSecurityBehaviorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexStateManagementSecurityBehaviorIT.kt @@ -262,6 +262,7 @@ class IndexStateManagementSecurityBehaviorIT : SecurityRestTestCase() { private fun createISMRollup(targetIdxRollup: String): ISMRollup = ISMRollup( description = "basic search test", targetIndex = targetIdxRollup, + targetIndexSettings = null, pageSize = 100, dimensions = listOf( diff --git a/src/test/kotlin/org/opensearch/indexmanagement/RollupSecurityBehaviorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/RollupSecurityBehaviorIT.kt index 99c487185..4b3b3f715 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/RollupSecurityBehaviorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/RollupSecurityBehaviorIT.kt @@ -228,6 +228,7 @@ class RollupSecurityBehaviorIT : SecurityRestTestCase() { description = "basic stats test", sourceIndex = sourceIndex, targetIndex = targetIndex, + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 100, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RollupActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RollupActionIT.kt index 26c9bae16..b5a101e10 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RollupActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RollupActionIT.kt @@ -8,6 +8,9 @@ package org.opensearch.indexmanagement.indexstatemanagement.action import org.apache.hc.core5.http.ContentType import org.apache.hc.core5.http.io.entity.StringEntity import org.opensearch.cluster.metadata.DataStream +import org.opensearch.cluster.metadata.IndexMetadata +import org.opensearch.common.settings.Settings +import org.opensearch.index.engine.EngineConfig import org.opensearch.indexmanagement.common.model.dimension.DateHistogram import org.opensearch.indexmanagement.common.model.dimension.Terms import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase @@ -42,6 +45,66 @@ class RollupActionIT : IndexStateManagementRestTestCase() { ISMRollup( description = "basic search test", targetIndex = "target_rollup_search", + targetIndexSettings = null, + pageSize = 100, + dimensions = + listOf( + DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"), + Terms("RatecodeID", "RatecodeID"), + Terms("PULocationID", "PULocationID"), + ), + metrics = + listOf( + RollupMetrics( + sourceField = "passenger_count", targetField = "passenger_count", + metrics = + listOf( + Sum(), Min(), Max(), + ValueCount(), Average(), + ), + ), + RollupMetrics(sourceField = "total_amount", targetField = "total_amount", metrics = listOf(Max(), Min())), + ), + ) + val actionConfig = RollupAction(rollup, 0) + val states = + listOf( + State("rollup", listOf(actionConfig), listOf()), + ) + val sourceIndexMappingString = + "\"properties\": {\"tpep_pickup_datetime\": { \"type\": \"date\" }, \"RatecodeID\": { \"type\": " + + "\"keyword\" }, \"PULocationID\": { \"type\": \"keyword\" }, \"passenger_count\": { \"type\": \"integer\" }, \"total_amount\": " + + "{ \"type\": \"double\" }}" + val policy = + Policy( + id = policyID, + description = "$testIndexName description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states, + ) + createPolicy(policy, policyID) + createIndex(indexName, policyID, mapping = sourceIndexMappingString) + + assertIndexRolledUp(indexName, policyID, rollup) + } + + fun `test rollup action with specified target index settings`() { + val indexName = "${testIndexName}_index_settings" + val policyID = "${testIndexName}_policy_settings" + val targetIdxTestName = "target_rollup_settings" + val targetIndexReplicas = 0 + val targetIndexCodec = "best_compression" + val rollup = + ISMRollup( + description = "basic search test", + targetIndex = targetIdxTestName, + targetIndexSettings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, targetIndexReplicas) + .put(EngineConfig.INDEX_CODEC_SETTING.key, targetIndexCodec) + .build(), pageSize = 100, dimensions = listOf( @@ -95,6 +158,7 @@ class RollupActionIT : IndexStateManagementRestTestCase() { ISMRollup( description = "data stream rollup", targetIndex = "target_rollup_search", + targetIndexSettings = null, pageSize = 100, dimensions = listOf( @@ -164,6 +228,7 @@ class RollupActionIT : IndexStateManagementRestTestCase() { ISMRollup( description = "data stream rollup", targetIndex = "rollup_{{ctx.source_index}}", + targetIndexSettings = null, pageSize = 100, dimensions = listOf( @@ -234,6 +299,7 @@ class RollupActionIT : IndexStateManagementRestTestCase() { ISMRollup( description = "basic search test", targetIndex = "target_rollup_search", + targetIndexSettings = null, pageSize = 100, dimensions = listOf( @@ -310,6 +376,7 @@ class RollupActionIT : IndexStateManagementRestTestCase() { ISMRollup( description = "basic search test", targetIndex = "target_with_wildcard*", + targetIndexSettings = null, pageSize = 100, dimensions = listOf( @@ -375,6 +442,7 @@ class RollupActionIT : IndexStateManagementRestTestCase() { ISMRollup( description = "basic search test", targetIndex = "target_rollup_search", + targetIndexSettings = null, pageSize = 100, dimensions = listOf( diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt index 12e75b4e3..f996d9d7c 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt @@ -132,6 +132,13 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { return getRollup(rollupId = rollupId) } + // TODO: can be replaced with createRandomRollup if implement assertEqual for mappings with "dynamic"=true fields + protected fun createRandomRollupWithoutTargetSettings(refresh: Boolean = true): Rollup { + val rollup = randomRollup().copy(targetIndexSettings = null) + val rollupId = createRollup(rollup, rollupId = rollup.id, refresh = refresh).id + return getRollup(rollupId = rollupId) + } + // TODO: Maybe clean-up and use XContentFactory.jsonBuilder() to create mappings json protected fun createRollupMappingString(rollup: Rollup): String { var mappingString = "" diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/TestHelpers.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/TestHelpers.kt index 12d98eb58..b288d6732 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/TestHelpers.kt @@ -5,8 +5,12 @@ package org.opensearch.indexmanagement.rollup +import org.opensearch.cluster.metadata.IndexMetadata +import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.XContentFactory import org.opensearch.core.xcontent.ToXContent +import org.opensearch.index.codec.CodecService +import org.opensearch.index.engine.EngineConfig import org.opensearch.index.query.TermQueryBuilder import org.opensearch.indexmanagement.common.model.dimension.DateHistogram import org.opensearch.indexmanagement.common.model.dimension.Dimension @@ -33,6 +37,7 @@ import org.opensearch.indexmanagement.rollup.model.metric.Metric import org.opensearch.indexmanagement.rollup.model.metric.Min import org.opensearch.indexmanagement.rollup.model.metric.Sum import org.opensearch.indexmanagement.rollup.model.metric.ValueCount +import org.opensearch.test.OpenSearchTestCase import org.opensearch.test.rest.OpenSearchRestTestCase import java.util.Locale @@ -98,6 +103,14 @@ fun randomRollupDimensions(): List { return dimensions.toList() } +val codecs = listOf(CodecService.DEFAULT_CODEC, CodecService.LZ4, CodecService.BEST_COMPRESSION_CODEC, CodecService.ZLIB) + +fun randomSettings(): Settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, OpenSearchTestCase.randomIntBetween(0, 2)) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, OpenSearchTestCase.randomIntBetween(1, 5)) + .put(EngineConfig.INDEX_CODEC_SETTING.key, OpenSearchRestTestCase.randomSubsetOf(1, codecs).first()) + .build() + fun randomRollup(): Rollup { val enabled = OpenSearchRestTestCase.randomBoolean() return Rollup( @@ -112,6 +125,7 @@ fun randomRollup(): Rollup { description = OpenSearchRestTestCase.randomAlphaOfLength(10), sourceIndex = OpenSearchRestTestCase.randomAlphaOfLength(10).lowercase(Locale.ROOT), targetIndex = OpenSearchRestTestCase.randomAlphaOfLength(10).lowercase(Locale.ROOT), + targetIndexSettings = if (OpenSearchRestTestCase.randomBoolean()) null else randomSettings(), metadataID = if (OpenSearchRestTestCase.randomBoolean()) null else OpenSearchRestTestCase.randomAlphaOfLength(10), roles = emptyList(), pageSize = OpenSearchRestTestCase.randomIntBetween(1, 10000), @@ -172,6 +186,7 @@ fun randomExplainRollup(): ExplainRollup { fun randomISMRollup(): ISMRollup = ISMRollup( description = OpenSearchRestTestCase.randomAlphaOfLength(10), targetIndex = OpenSearchRestTestCase.randomAlphaOfLength(10).lowercase(Locale.ROOT), + targetIndexSettings = if (OpenSearchRestTestCase.randomBoolean()) null else randomSettings(), pageSize = OpenSearchRestTestCase.randomIntBetween(1, 10000), dimensions = randomRollupDimensions(), metrics = OpenSearchRestTestCase.randomList(20, ::randomRollupMetrics).distinctBy { it.targetField }, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt index 8f658b885..8d92900b1 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt @@ -42,6 +42,7 @@ class RollupInterceptorIT : RollupRestTestCase() { description = "basic search test", sourceIndex = "source_rollup_search", targetIndex = "target_rollup_search", + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 10, @@ -494,6 +495,7 @@ class RollupInterceptorIT : RollupRestTestCase() { description = "basic search test", sourceIndex = "source_rollup_bucket_and_sub", targetIndex = "target_rollup_bucket_and_sub", + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 10, @@ -595,6 +597,7 @@ class RollupInterceptorIT : RollupRestTestCase() { description = "basic search test", sourceIndex = "source_continuous_rollup_search", targetIndex = "target_continuous_rollup_search", + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 10, @@ -684,6 +687,7 @@ class RollupInterceptorIT : RollupRestTestCase() { description = "basic search test", sourceIndex = "source_rollup_search_all_jobs_1", targetIndex = targetIndex, + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 10, @@ -729,6 +733,7 @@ class RollupInterceptorIT : RollupRestTestCase() { description = "basic search test", sourceIndex = "source_rollup_search_all_jobs_2", targetIndex = targetIndex, + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 10, @@ -843,6 +848,7 @@ class RollupInterceptorIT : RollupRestTestCase() { description = "basic search test", sourceIndex = sourceIndex1, targetIndex = targetIndex1, + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 10, @@ -888,6 +894,7 @@ class RollupInterceptorIT : RollupRestTestCase() { description = "basic search test", sourceIndex = sourceIndex2, targetIndex = targetIndex2, + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 10, @@ -1003,6 +1010,7 @@ class RollupInterceptorIT : RollupRestTestCase() { description = "basic search test", sourceIndex = sourceIndex1, targetIndex = targetIndex1, + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 10, @@ -1047,6 +1055,7 @@ class RollupInterceptorIT : RollupRestTestCase() { description = "basic search test", sourceIndex = sourceIndex2, targetIndex = targetIndex2, + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 10, @@ -1170,6 +1179,7 @@ class RollupInterceptorIT : RollupRestTestCase() { description = "basic search test", sourceIndex = sourceIndex, targetIndex = targetIndex, + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 10, @@ -1673,6 +1683,7 @@ class RollupInterceptorIT : RollupRestTestCase() { description = "basic search test", sourceIndex = sourceIndex, targetIndex = targetIndex, + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 10, @@ -1751,6 +1762,7 @@ class RollupInterceptorIT : RollupRestTestCase() { description = "basic search test", sourceIndex = sourceIndex, targetIndex = targetIndex, + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 10, @@ -1831,6 +1843,7 @@ class RollupInterceptorIT : RollupRestTestCase() { description = "basic search test", sourceIndex = "source_111*", targetIndex = targetIndex, + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 10, @@ -1920,6 +1933,7 @@ class RollupInterceptorIT : RollupRestTestCase() { description = "basic search test", sourceIndex = "source_999*", targetIndex = targetIndex, + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 10, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/ISMRollupTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/ISMRollupTests.kt index 89fbf6e04..0799d71ad 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/ISMRollupTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/ISMRollupTests.kt @@ -70,6 +70,7 @@ class ISMRollupTests : OpenSearchTestCase() { assertEquals(sourceIndex, rollup.sourceIndex) assertEquals(ismRollup.targetIndex, rollup.targetIndex) + assertEquals(ismRollup.targetIndexSettings, rollup.targetIndexSettings) assertEquals(ismRollup.pageSize, rollup.pageSize) assertEquals(ismRollup.dimensions, rollup.dimensions) assertEquals(ismRollup.metrics, rollup.metrics) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/RollupTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/RollupTests.kt index e13f1d6e7..a187598b1 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/RollupTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/RollupTests.kt @@ -5,6 +5,8 @@ package org.opensearch.indexmanagement.rollup.model +import org.opensearch.common.settings.Settings +import org.opensearch.common.settings.SettingsException import org.opensearch.indexmanagement.randomInstant import org.opensearch.indexmanagement.randomSchedule import org.opensearch.indexmanagement.rollup.randomDateHistogram @@ -52,6 +54,26 @@ class RollupTests : OpenSearchTestCase() { } } + fun `test rollup requires correct target index settings`() { + assertFailsWith(SettingsException::class, "Unknown property was `index.codec1`") { + randomRollup().copy(targetIndexSettings = Settings.builder().put("index.codec1", "zlib").build()) + } + } + + fun `test rollup with single setting in target index settings`() { + val sb = Settings.builder() + sb.put("index.codec", "zlib") + randomRollup().copy(targetIndexSettings = sb.build()) + } + + fun `test rollup with multiple setting in target index settings`() { + val sb = Settings.builder() + sb.put("index.number_of_replicas", 1) + sb.put("index.codec", "zlib") + sb.put("index.soft_deletes.retention_lease.period", "1h") + randomRollup().copy(targetIndexSettings = sb.build()) + } + fun `test rollup requires page size to be between 1 and 10k`() { assertFailsWith(IllegalArgumentException::class, "Page size was negative") { randomRollup().copy(pageSize = -1) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/WriteableTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/WriteableTests.kt index 374fcf15c..3588af3bb 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/WriteableTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/WriteableTests.kt @@ -27,6 +27,7 @@ import org.opensearch.indexmanagement.rollup.randomRollup import org.opensearch.indexmanagement.rollup.randomRollupMetadata import org.opensearch.indexmanagement.rollup.randomRollupMetrics import org.opensearch.indexmanagement.rollup.randomRollupStats +import org.opensearch.indexmanagement.rollup.randomSettings import org.opensearch.indexmanagement.rollup.randomSum import org.opensearch.indexmanagement.rollup.randomTerms import org.opensearch.indexmanagement.rollup.randomValueCount @@ -126,6 +127,14 @@ class WriteableTests : OpenSearchTestCase() { assertTrue("roles field in rollup model is deprecated and should be parsed to empty list.", streamedRollup.roles.isEmpty()) } + fun `test rollup as stream with target index settings`() { + val rollup = randomRollup().copy(delay = randomLongBetween(0, 60000000), targetIndexSettings = randomSettings()) + val out = BytesStreamOutput().also { rollup.writeTo(it) } + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val streamedRollup = Rollup(sin) + assertEquals("Round tripping Rollup stream with target index settings doesn't work", rollup, streamedRollup) + } + fun `test explain rollup as stream`() { val explainRollup = randomExplainRollup() val out = BytesStreamOutput().also { explainRollup.writeTo(it) } @@ -165,4 +174,16 @@ class WriteableTests : OpenSearchTestCase() { val streamedISMRollup = ISMRollup(sin) assertEquals("Round tripping ISMRollup stream doesn't work", ismRollup, streamedISMRollup) } + + fun `test ism rollup as stream with target index settings`() { + val ismRollup = randomISMRollup().copy(targetIndexSettings = randomSettings()) + val out = BytesStreamOutput().also { ismRollup.writeTo(it) } + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val streamedISMRollup = ISMRollup(sin) + assertEquals( + "Round tripping ISMRollup stream with target index settings doesn't work", + ismRollup, + streamedISMRollup, + ) + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/XContentTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/XContentTests.kt index 487b9f10e..3cdde716d 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/XContentTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/XContentTests.kt @@ -20,6 +20,7 @@ import org.opensearch.indexmanagement.rollup.randomMax import org.opensearch.indexmanagement.rollup.randomMin import org.opensearch.indexmanagement.rollup.randomRollup import org.opensearch.indexmanagement.rollup.randomRollupMetrics +import org.opensearch.indexmanagement.rollup.randomSettings import org.opensearch.indexmanagement.rollup.randomSum import org.opensearch.indexmanagement.rollup.randomTerms import org.opensearch.indexmanagement.rollup.randomValueCount @@ -120,6 +121,14 @@ class XContentTests : OpenSearchTestCase() { assertEquals("Round tripping Rollup without type doesn't work", rollup.copy(roles = listOf()), parsedRollup) } + fun `test rollup parsing with target index settings`() { + val rollup = randomRollup().copy(delay = randomLongBetween(0, 60000000), targetIndexSettings = randomSettings()) + val rollupString = rollup.toJsonString(XCONTENT_WITHOUT_TYPE) + val parsedRollup = Rollup.parse(parser(rollupString), rollup.id, rollup.seqNo, rollup.primaryTerm) + // roles are deprecated and not populated in toXContent and parsed as part of parse + assertEquals("Round tripping Rollup with target index settings doesn't work", rollup.copy(roles = listOf()), parsedRollup) + } + fun `test ism rollup parsing`() { val ismRollup = randomISMRollup() val ismRollupString = ismRollup.toJsonString() @@ -127,6 +136,13 @@ class XContentTests : OpenSearchTestCase() { assertEquals("Round tripping ISMRollup doesn't work", ismRollup, parsedISMRollup) } + fun `test ism rollup parsing with target index settings`() { + val ismRollup = randomISMRollup().copy(targetIndexSettings = randomSettings()) + val ismRollupString = ismRollup.toJsonString() + val parsedISMRollup = ISMRollup.parse(parser(ismRollupString)) + assertEquals("Round tripping ISMRollup with target index settings doesn't work", ismRollup, parsedISMRollup) + } + private fun parser(xc: String): XContentParser { val parser = XContentType.JSON.xContent().createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, xc) parser.nextToken() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupActionIT.kt index c759d4f06..3accb2db9 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupActionIT.kt @@ -73,7 +73,7 @@ class RestIndexRollupActionIT : RollupRestAPITestCase() { @Throws(Exception::class) fun `test mappings after rollup creation`() { - createRandomRollup() + createRandomRollupWithoutTargetSettings() val response = client().makeRequest("GET", "/$INDEX_MANAGEMENT_INDEX/_mapping") val parserMap = createParser(XContentType.JSON.xContent(), response.entity.content).map() as Map> diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStartRollupActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStartRollupActionIT.kt index e165a5b71..d2fc9e398 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStartRollupActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStartRollupActionIT.kt @@ -93,6 +93,7 @@ class RestStartRollupActionIT : RollupRestAPITestCase() { description = "basic search test", sourceIndex = "source_restart_failed_rollup", targetIndex = "target_restart_failed_rollup", + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 10, @@ -156,6 +157,7 @@ class RestStartRollupActionIT : RollupRestAPITestCase() { description = "basic search test", sourceIndex = "source_restart_finished_rollup", targetIndex = "target_restart_finished_rollup", + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 10, @@ -224,6 +226,7 @@ class RestStartRollupActionIT : RollupRestAPITestCase() { description = "basic search test", sourceIndex = "source_multi_shard_start", targetIndex = "target_multi_shard_start", + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 10, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupActionIT.kt index 4789e1f50..2c5d173d2 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupActionIT.kt @@ -210,6 +210,7 @@ class RestStopRollupActionIT : RollupRestAPITestCase() { description = "basic search test", sourceIndex = "source", targetIndex = "target", + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 10, @@ -281,6 +282,7 @@ class RestStopRollupActionIT : RollupRestAPITestCase() { description = "basic search test", sourceIndex = "source_multi_shard_stop", targetIndex = "target_multi_shard_stop", + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 1, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt index e085f6c30..d7e215089 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt @@ -7,8 +7,10 @@ package org.opensearch.indexmanagement.rollup.runner import org.apache.hc.core5.http.ContentType import org.apache.hc.core5.http.io.entity.StringEntity +import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.common.settings.Settings import org.opensearch.core.rest.RestStatus +import org.opensearch.index.engine.EngineConfig import org.opensearch.indexmanagement.IndexManagementPlugin import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.ROLLUP_JOBS_BASE_URI import org.opensearch.indexmanagement.common.model.dimension.DateHistogram @@ -76,6 +78,69 @@ class RollupRunnerIT : RollupRestTestCase() { } } + @Suppress("UNCHECKED_CAST") + fun `test rollup with creating target index with specific settings`() { + val sourceIdxTestName = "source_idx_test_settings" + val targetIdxTestName = "target_idx_test_settings" + val targetIndexReplicas = 0 + val targetIndexCodec = "best_compression" + generateNYCTaxiData(sourceIdxTestName) + + val rollup = + Rollup( + id = testName, + schemaVersion = 1L, + enabled = true, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic stats test", + sourceIndex = sourceIdxTestName, + targetIndex = targetIdxTestName, + targetIndexSettings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, targetIndexReplicas) + .put(EngineConfig.INDEX_CODEC_SETTING.key, targetIndexCodec) + .build(), + metadataID = null, + roles = emptyList(), + pageSize = 100, + delay = 0, + continuous = false, + dimensions = listOf(DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h")), + metrics = + listOf( + RollupMetrics(sourceField = "passenger_count", targetField = "passenger_count", metrics = listOf(Average())), + ), + ).let { createRollup(it, it.id) } + + updateRollupStartTime(rollup) + + waitFor { assertTrue("Target rollup index was not created", indexExists(rollup.targetIndex)) } + + waitFor { + val rollupJob = getRollup(rollupId = rollup.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + + val rawRes = client().makeRequest(RestRequest.Method.GET.name, "/$targetIdxTestName/_settings", mapOf("flat_settings" to "true")) + assertTrue(rawRes.restStatus() == RestStatus.OK) + val indexSettingsRes = rawRes.asMap()[targetIdxTestName] as Map> + val settingsRes = indexSettingsRes["settings"] + assertNotNull("Rollup index did not have any settings", settingsRes) + assertEquals( + "Rollup index did not have correct codec setting", + targetIndexCodec, + settingsRes?.getValue(EngineConfig.INDEX_CODEC_SETTING.key), + ) + assertEquals( + "Rollup index did not have correct replicas setting", + targetIndexReplicas.toString(), + settingsRes?.getValue(IndexMetadata.SETTING_NUMBER_OF_REPLICAS), + ) + } + } + @Suppress("UNCHECKED_CAST") fun `test rollup with avg metric`() { val sourceIdxTestName = "source_idx_test" @@ -96,6 +161,7 @@ class RollupRunnerIT : RollupRestTestCase() { description = "basic stats test", sourceIndex = sourceIdxTestName, targetIndex = targetIdxTestName, + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 100, @@ -387,6 +453,7 @@ class RollupRunnerIT : RollupRestTestCase() { description = "basic stats test", sourceIndex = "source_runner_fifth", targetIndex = "target_runner_fifth", + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 100, @@ -410,6 +477,7 @@ class RollupRunnerIT : RollupRestTestCase() { description = "basic stats test", sourceIndex = "source_runner_fifth", targetIndex = "target_runner_fifth", + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 100, @@ -433,6 +501,7 @@ class RollupRunnerIT : RollupRestTestCase() { description = "basic 1s test", sourceIndex = "source_runner_fifth", targetIndex = "target_runner_fifth", + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 100, @@ -539,6 +608,7 @@ class RollupRunnerIT : RollupRestTestCase() { description = "basic change of page size", sourceIndex = "source_runner_sixth", targetIndex = "target_runner_sixth", + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 1, @@ -602,6 +672,7 @@ class RollupRunnerIT : RollupRestTestCase() { description = "basic page size", sourceIndex = "source_runner_seventh", targetIndex = "target_runner_seventh", + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 100, @@ -638,6 +709,7 @@ class RollupRunnerIT : RollupRestTestCase() { description = "basic page size", sourceIndex = "source_runner_seventh", targetIndex = "new_target_runner_seventh", + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 100, @@ -732,6 +804,7 @@ class RollupRunnerIT : RollupRestTestCase() { description = "basic delay test", sourceIndex = "source_runner_ninth", targetIndex = "target_runner_ninth", + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 100, @@ -798,6 +871,7 @@ class RollupRunnerIT : RollupRestTestCase() { description = "basic delay test", sourceIndex = "source_runner_tenth", targetIndex = "target_runner_tenth", + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 100, @@ -867,6 +941,7 @@ class RollupRunnerIT : RollupRestTestCase() { description = "basic change of page size", sourceIndex = "source_runner_sixth_eleventh_1", targetIndex = indexAlias, + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 1000, @@ -959,6 +1034,7 @@ class RollupRunnerIT : RollupRestTestCase() { description = "basic change of page size", sourceIndex = "source_runner_sixth_29932", targetIndex = indexAlias, + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 1000, @@ -1060,6 +1136,7 @@ class RollupRunnerIT : RollupRestTestCase() { description = "basic change of page size", sourceIndex = "source_runner_sixth_2123", targetIndex = indexAlias, + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 1000, @@ -1179,6 +1256,7 @@ class RollupRunnerIT : RollupRestTestCase() { description = "basic change of page size", sourceIndex = "source_runner_sixth_2209", targetIndex = indexAlias, + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 1000, @@ -1271,6 +1349,7 @@ class RollupRunnerIT : RollupRestTestCase() { description = "basic change of page size", sourceIndex = "source_runner_sixth_1532209", targetIndex = indexAlias, + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 1000, @@ -1347,6 +1426,7 @@ class RollupRunnerIT : RollupRestTestCase() { description = "basic change of page size", sourceIndex = index, targetIndex = rollupIndex, + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 1000, @@ -1398,6 +1478,7 @@ class RollupRunnerIT : RollupRestTestCase() { description = "basic stats test", sourceIndex = sourceIdxTestName, targetIndex = targetIdxTestName, + targetIndexSettings = null, metadataID = null, roles = emptyList(), pageSize = 100, diff --git a/src/test/resources/mappings/cached-opendistro-ism-config.json b/src/test/resources/mappings/cached-opendistro-ism-config.json index 21add2421..42d5f24d7 100644 --- a/src/test/resources/mappings/cached-opendistro-ism-config.json +++ b/src/test/resources/mappings/cached-opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 22 + "schema_version": 23 }, "dynamic": "strict", "properties": { @@ -380,6 +380,10 @@ } } }, + "target_index_settings": { + "dynamic": "true", + "type": "object" + }, "page_size": { "type": "long" }, @@ -1004,6 +1008,10 @@ } } }, + "target_index_settings": { + "dynamic": "true", + "type": "object" + }, "page_size": { "type": "long" }, diff --git a/worksheets/rollups/create.http b/worksheets/rollups/create.http index 2949c986d..b4e5a7274 100644 --- a/worksheets/rollups/create.http +++ b/worksheets/rollups/create.http @@ -16,12 +16,15 @@ Content-Type: application/json "description": "An example rollup", "source_index": "nyc-taxi-data", "target_index": "rollup-test-index", + "target_index_settings": { + "index.codec": "zlib" + }, "page_size": 10, "delay": 0, "continuous": false, "dimensions": [{ "date_histogram": { - "source_field": "timestamp", + "source_field": "tpep_pickup_datetime", "fixed_interval": "1h" } }, {