Skip to content

Commit 5e086a2

Browse files
Adding replication (CCR) plugin interface and classes to common-utils (#667)
* Adding replication (CCR) plugin interface and classes Signed-off-by: aggarwalShivani <[email protected]> * Adding new actiontype for unfollow replication through ism plugin Signed-off-by: aggarwalShivani <[email protected]> * Fix ktlint issues for replication libs Signed-off-by: aggarwalShivani <[email protected]> * Changes for stop-replication action Signed-off-by: aggarwalShivani <[email protected]> * Fixed imports for AcknowledgedResponse and org.opensearch.transport classes Signed-off-by: aggarwalShivani <[email protected]> * Changing ReplicationPluginInterface to static object Signed-off-by: aggarwalShivani <[email protected]> --------- Signed-off-by: aggarwalShivani <[email protected]>
1 parent c16de7d commit 5e086a2

File tree

5 files changed

+256
-0
lines changed

5 files changed

+256
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package org.opensearch.commons.replication
6+
7+
import org.opensearch.action.support.clustermanager.AcknowledgedResponse
8+
import org.opensearch.commons.replication.action.ReplicationActions.INTERNAL_STOP_REPLICATION_ACTION_TYPE
9+
import org.opensearch.commons.replication.action.StopIndexReplicationRequest
10+
import org.opensearch.commons.utils.recreateObject
11+
import org.opensearch.core.action.ActionListener
12+
import org.opensearch.core.action.ActionResponse
13+
import org.opensearch.core.common.io.stream.Writeable
14+
import org.opensearch.transport.client.Client
15+
import org.opensearch.transport.client.node.NodeClient
16+
17+
/**
18+
* Transport action plugin interfaces for the cross-cluster-replication plugin.
19+
*/
20+
object ReplicationPluginInterface {
21+
22+
/**
23+
* Stop replication.
24+
* @param client Node client for making transport action
25+
* @param request The request object
26+
* @param listener The listener for getting response
27+
*/
28+
29+
fun stopReplication(
30+
client: Client,
31+
request: StopIndexReplicationRequest,
32+
listener: ActionListener<AcknowledgedResponse>
33+
) {
34+
val nodeClient = client as NodeClient
35+
return nodeClient.execute(
36+
INTERNAL_STOP_REPLICATION_ACTION_TYPE,
37+
request,
38+
wrapActionListener(listener) { response ->
39+
recreateObject(response) {
40+
AcknowledgedResponse(it)
41+
}
42+
}
43+
)
44+
}
45+
46+
/**
47+
* Wrap action listener on concrete response class by a new created one on ActionResponse.
48+
* This is required because the response may be loaded by different classloader across plugins.
49+
* The onResponse(ActionResponse) avoids type cast exception and give a chance to recreate
50+
* the response object.
51+
*/
52+
@Suppress("UNCHECKED_CAST")
53+
private fun <Response : AcknowledgedResponse> wrapActionListener(
54+
listener: ActionListener<Response>,
55+
recreate: (Writeable) -> Response
56+
): ActionListener<Response> {
57+
return object : ActionListener<ActionResponse> {
58+
override fun onResponse(response: ActionResponse) {
59+
val recreated = response as? Response ?: recreate(response)
60+
listener.onResponse(recreated)
61+
}
62+
63+
override fun onFailure(exception: java.lang.Exception) {
64+
listener.onFailure(exception)
65+
}
66+
} as ActionListener<Response>
67+
}
68+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package org.opensearch.commons.replication.action
6+
7+
import org.opensearch.action.ActionType
8+
import org.opensearch.action.support.clustermanager.AcknowledgedResponse
9+
10+
/**
11+
* Information related to the transport stop replication action for the Replication plugin
12+
*/
13+
object ReplicationActions {
14+
15+
/**
16+
* Action names for stopping replication
17+
* STOP_REPLICATION_ACTION_NAME: action used for _replication/_stop REST API
18+
* INTERNAL_STOP_REPLICATION_ACTION_NAME: Internal only - Used by Index Management plugin to invoke stop replication
19+
*/
20+
const val STOP_REPLICATION_ACTION_NAME = "indices:admin/plugins/replication/index/stop"
21+
const val INTERNAL_STOP_REPLICATION_ACTION_NAME = "indices:internal/plugins/replication/index/stop"
22+
23+
/**
24+
* Stop replication transport action types.
25+
*/
26+
val STOP_REPLICATION_ACTION_TYPE =
27+
ActionType(STOP_REPLICATION_ACTION_NAME, ::AcknowledgedResponse)
28+
val INTERNAL_STOP_REPLICATION_ACTION_TYPE =
29+
ActionType(INTERNAL_STOP_REPLICATION_ACTION_NAME, ::AcknowledgedResponse)
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package org.opensearch.commons.replication.action
6+
7+
import org.opensearch.action.ActionRequestValidationException
8+
import org.opensearch.action.IndicesRequest
9+
import org.opensearch.action.support.IndicesOptions
10+
import org.opensearch.action.support.clustermanager.AcknowledgedRequest
11+
import org.opensearch.core.common.io.stream.StreamInput
12+
import org.opensearch.core.common.io.stream.StreamOutput
13+
import org.opensearch.core.xcontent.ObjectParser
14+
import org.opensearch.core.xcontent.ToXContent
15+
import org.opensearch.core.xcontent.ToXContentObject
16+
import org.opensearch.core.xcontent.XContentBuilder
17+
import org.opensearch.core.xcontent.XContentParser
18+
19+
class StopIndexReplicationRequest :
20+
AcknowledgedRequest<StopIndexReplicationRequest>, IndicesRequest.Replaceable, ToXContentObject {
21+
lateinit var indexName: String
22+
constructor(indexName: String) {
23+
this.indexName = indexName
24+
}
25+
26+
private constructor() {
27+
}
28+
29+
constructor(inp: StreamInput) : super(inp) {
30+
indexName = inp.readString()
31+
}
32+
companion object {
33+
private val PARSER = ObjectParser<StopIndexReplicationRequest, Void>("StopReplicationRequestParser") {
34+
StopIndexReplicationRequest()
35+
}
36+
37+
fun fromXContent(parser: XContentParser, followerIndex: String): StopIndexReplicationRequest {
38+
val stopIndexReplicationRequest = PARSER.parse(parser, null)
39+
stopIndexReplicationRequest.indexName = followerIndex
40+
return stopIndexReplicationRequest
41+
}
42+
}
43+
44+
override fun validate(): ActionRequestValidationException? {
45+
return null
46+
}
47+
48+
override fun indices(vararg indices: String?): IndicesRequest {
49+
return this
50+
}
51+
override fun indices(): Array<String> {
52+
return arrayOf(indexName)
53+
}
54+
55+
override fun indicesOptions(): IndicesOptions {
56+
return IndicesOptions.strictSingleIndexNoExpandForbidClosed()
57+
}
58+
59+
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
60+
builder.startObject()
61+
builder.field("indexName", indexName)
62+
builder.endObject()
63+
return builder
64+
}
65+
66+
override fun writeTo(out: StreamOutput) {
67+
super.writeTo(out)
68+
out.writeString(indexName)
69+
}
70+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.commons.replication
7+
8+
import com.nhaarman.mockitokotlin2.whenever
9+
import org.junit.jupiter.api.Test
10+
import org.junit.jupiter.api.extension.ExtendWith
11+
import org.mockito.Mockito.any
12+
import org.mockito.Mockito.mock
13+
import org.mockito.Mockito.verify
14+
import org.mockito.junit.jupiter.MockitoExtension
15+
import org.opensearch.action.support.clustermanager.AcknowledgedResponse
16+
import org.opensearch.commons.replication.action.StopIndexReplicationRequest
17+
import org.opensearch.core.action.ActionListener
18+
import org.opensearch.core.action.ActionResponse
19+
import org.opensearch.transport.client.node.NodeClient
20+
21+
@ExtendWith(MockitoExtension::class)
22+
internal class ReplicationPluginInterfaceTests {
23+
24+
@Test
25+
fun `test stopReplication successful response`() {
26+
// Mock dependencies
27+
val client: NodeClient = mock()
28+
val request: StopIndexReplicationRequest = mock()
29+
val listener: ActionListener<AcknowledgedResponse> = mock()
30+
val acknowledgedResponse = AcknowledgedResponse(true) // Successful response
31+
32+
// Mock the behavior of NodeClient.execute()
33+
whenever(client.execute(any(), any(), any<ActionListener<ActionResponse>>()))
34+
.thenAnswer { invocation ->
35+
val actionListener = invocation.getArgument<ActionListener<ActionResponse>>(2)
36+
actionListener.onResponse(acknowledgedResponse) // Simulate success
37+
}
38+
39+
// Call method under test
40+
ReplicationPluginInterface.stopReplication(client, request, listener)
41+
// Verify that listener.onResponse is called with the correct response
42+
verify(listener).onResponse(acknowledgedResponse)
43+
}
44+
45+
@Test
46+
fun `test stopReplication failure response`() {
47+
// Mock dependencies
48+
val client: NodeClient = mock()
49+
val request: StopIndexReplicationRequest = mock()
50+
val listener: ActionListener<AcknowledgedResponse> = mock()
51+
val exception = Exception("Test failure")
52+
53+
// Mock the behavior of NodeClient.execute()
54+
whenever(client.execute(any(), any(), any<ActionListener<ActionResponse>>()))
55+
.thenAnswer { invocation ->
56+
val actionListener = invocation.getArgument<ActionListener<ActionResponse>>(2)
57+
actionListener.onFailure(exception) // Simulate failure
58+
}
59+
60+
// Call method under test
61+
ReplicationPluginInterface.stopReplication(client, request, listener)
62+
// Verify that listener.onResponse is called with the correct response
63+
verify(listener).onFailure(exception)
64+
}
65+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package org.opensearch.commons.replication.action
6+
7+
import org.junit.jupiter.api.Assertions.assertEquals
8+
import org.junit.jupiter.api.Assertions.assertNotNull
9+
import org.junit.jupiter.api.Assertions.assertNull
10+
import org.junit.jupiter.api.Test
11+
import org.opensearch.commons.utils.recreateObject
12+
13+
internal class StopIndexReplicationRequestTests {
14+
@Test
15+
fun `Stop Replication request serialize and deserialize transport object should be equal`() {
16+
val index = "test-idx"
17+
val request = StopIndexReplicationRequest(index)
18+
val recreatedRequest = recreateObject(request) { StopIndexReplicationRequest(it) }
19+
assertNotNull(recreatedRequest)
20+
assertEquals(request.indexName, recreatedRequest.indexName)
21+
assertNull(recreatedRequest.validate())
22+
}
23+
}

0 commit comments

Comments
 (0)