Skip to content

Commit d8c11a4

Browse files
committed
EC2 Pass
Signed-off-by: Peng Huo <[email protected]>
1 parent f510431 commit d8c11a4

File tree

6 files changed

+161
-22
lines changed

6 files changed

+161
-22
lines changed

flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java

+16
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77

88
import org.opensearch.OpenSearchException;
99
import org.opensearch.action.DocWriteResponse;
10+
import org.opensearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
11+
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
12+
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
1013
import org.opensearch.action.bulk.BulkRequest;
1114
import org.opensearch.action.bulk.BulkResponse;
1215
import org.opensearch.action.delete.DeleteRequest;
@@ -20,6 +23,7 @@
2023
import org.opensearch.action.search.SearchRequest;
2124
import org.opensearch.action.search.SearchResponse;
2225
import org.opensearch.action.search.SearchScrollRequest;
26+
import org.opensearch.action.support.master.AcknowledgedResponse;
2327
import org.opensearch.action.update.UpdateRequest;
2428
import org.opensearch.client.RequestOptions;
2529
import org.opensearch.client.indices.CreateIndexRequest;
@@ -32,13 +36,19 @@
3236
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
3337
import org.opensearch.client.opensearch.indices.IndicesStatsRequest;
3438
import org.opensearch.client.opensearch.indices.IndicesStatsResponse;
39+
import org.opensearch.client.opensearch.snapshot.CreateRepositoryRequest;
40+
import org.opensearch.client.opensearch.snapshot.CreateRepositoryResponse;
41+
import org.opensearch.client.opensearch.snapshot.RestoreRequest;
42+
import org.opensearch.client.opensearch.snapshot.RestoreResponse;
3543
import org.opensearch.flint.core.logging.CustomLogging;
3644
import org.opensearch.flint.core.logging.OperationMessage;
3745
import org.opensearch.flint.core.metrics.MetricsUtil;
3846

3947
import java.io.Closeable;
4048
import java.io.IOException;
4149

50+
import static org.opensearch.flint.core.metrics.MetricConstants.OS_WRITE_OP_METRIC_PREFIX;
51+
4252
/**
4353
* Interface for wrapping the OpenSearch High Level REST Client with additional functionality,
4454
* such as metrics tracking.
@@ -75,6 +85,12 @@ public interface IRestHighLevelClient extends Closeable {
7585

7686
CreatePitResponse createPit(CreatePitRequest request) throws IOException;
7787

88+
AcknowledgedResponse createRepository(PutRepositoryRequest request) throws IOException;
89+
90+
RestoreSnapshotResponse restoreSnapshot(RestoreSnapshotRequest request) throws IOException;
91+
92+
void prepare(String indexName);
93+
7894
/**
7995
* Records the success of an OpenSearch operation by incrementing the corresponding metric counter.
8096
* This method constructs the metric name by appending ".200.count" to the provided metric name prefix.

flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java

+75-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55

66
package org.opensearch.flint.core;
77

8+
import org.opensearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
9+
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
10+
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
811
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
912
import org.opensearch.action.bulk.BulkRequest;
1013
import org.opensearch.action.bulk.BulkResponse;
@@ -19,6 +22,7 @@
1922
import org.opensearch.action.search.SearchRequest;
2023
import org.opensearch.action.search.SearchResponse;
2124
import org.opensearch.action.search.SearchScrollRequest;
25+
import org.opensearch.action.support.master.AcknowledgedResponse;
2226
import org.opensearch.action.update.UpdateRequest;
2327
import org.opensearch.action.update.UpdateResponse;
2428
import org.opensearch.client.RequestOptions;
@@ -35,10 +39,15 @@
3539
import org.opensearch.client.opensearch.indices.IndicesStatsRequest;
3640
import org.opensearch.client.opensearch.indices.IndicesStatsResponse;
3741
import org.opensearch.client.transport.rest_client.RestClientTransport;
38-
39-
import java.io.IOException;
42+
import org.opensearch.common.settings.Settings;
4043
import org.opensearch.flint.core.storage.BulkRequestRateLimiter;
4144
import org.opensearch.flint.core.storage.OpenSearchBulkRetryWrapper;
45+
import org.opensearch.flint.core.table.OpenSearchCluster;
46+
import org.opensearch.rest.RestStatus;
47+
48+
import java.io.IOException;
49+
import java.util.Map;
50+
import java.util.logging.Logger;
4251

4352
import static org.opensearch.flint.core.metrics.MetricConstants.OS_READ_OP_METRIC_PREFIX;
4453
import static org.opensearch.flint.core.metrics.MetricConstants.OS_WRITE_OP_METRIC_PREFIX;
@@ -48,6 +57,9 @@
4857
* with integrated metrics tracking.
4958
*/
5059
public class RestHighLevelClientWrapper implements IRestHighLevelClient {
60+
61+
private static final Logger LOG = Logger.getLogger(RestHighLevelClientWrapper.class.getName());
62+
5163
private final RestHighLevelClient client;
5264
private final BulkRequestRateLimiter rateLimiter;
5365
private final OpenSearchBulkRetryWrapper bulkRetryWrapper;
@@ -153,6 +165,67 @@ public CreatePitResponse createPit(CreatePitRequest request) throws IOException
153165
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> openSearchClient().createPit(request));
154166
}
155167

168+
@Override
169+
public AcknowledgedResponse createRepository(PutRepositoryRequest request) throws IOException {
170+
return execute(OS_WRITE_OP_METRIC_PREFIX,
171+
() -> client.snapshot().createRepository(request, RequestOptions.DEFAULT));
172+
}
173+
174+
@Override
175+
public RestoreSnapshotResponse restoreSnapshot(RestoreSnapshotRequest request) throws IOException {
176+
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.snapshot().restore(request, RequestOptions.DEFAULT));
177+
}
178+
179+
@Override
180+
public void prepare(String indexName) {
181+
try {
182+
if (doesIndexExist(new GetIndexRequest(indexName), RequestOptions.DEFAULT)) {
183+
LOG.info("Index [" + indexName + "] already exists");
184+
} else {
185+
String repoName = "my-s3-repository";
186+
String snapshotName = "s001";
187+
String bucket = "flint-data-dp-us-west-2-beta";
188+
String snapshotPath = "data/quickwit/generated-logs-v1/213_snapshot_001";
189+
190+
PutRepositoryRequest putRepositoryRequest = new PutRepositoryRequest(repoName);
191+
putRepositoryRequest.type("s3");
192+
putRepositoryRequest.settings(Settings.builder()
193+
.put("base_path", snapshotPath)
194+
.put("bucket", bucket)
195+
.build());
196+
AcknowledgedResponse createRepoResp = createRepository(putRepositoryRequest);
197+
if (!createRepoResp.isAcknowledged()) {
198+
LOG.severe("Failed to create repository");
199+
throw new RuntimeException("Failed to create repository");
200+
}
201+
LOG.info("Created repository [" + repoName + "]");
202+
RestoreSnapshotRequest
203+
restoreSnapshotRequest =
204+
new RestoreSnapshotRequest(repoName, snapshotName);
205+
restoreSnapshotRequest.indices(indexName);
206+
restoreSnapshotRequest.source(Map.of("storage_type", "remote_snapshot"));
207+
RestoreSnapshotResponse
208+
restoreSnapshotResponse =
209+
restoreSnapshot(restoreSnapshotRequest);
210+
if (restoreSnapshotResponse.status() != RestStatus.OK && restoreSnapshotResponse.status() != RestStatus.CREATED && restoreSnapshotResponse.status() != RestStatus.ACCEPTED) {
211+
LOG.severe("Failed to restore snapshot " + restoreSnapshotResponse.status());
212+
throw new RuntimeException("Failed to restore snapshot " + restoreSnapshotResponse.status());
213+
}
214+
LOG.info("Restored repository [" + repoName + "]" + "snapshot [" + snapshotName + "]");
215+
216+
IndicesStatsResponse stats = stats(new IndicesStatsRequest.Builder().index(indexName).build());
217+
while (stats.shards().successful().intValue() != stats.shards().total().intValue()) {
218+
LOG.info("wait..., successful:" + stats.shards().successful() + " total:" + stats.shards().total());
219+
Thread.sleep(100);
220+
stats = stats(new IndicesStatsRequest.Builder().index(indexName).build());
221+
}
222+
}
223+
} catch (Exception e) {
224+
LOG.severe(e.getMessage());
225+
throw new RuntimeException(e);
226+
}
227+
}
228+
156229
/**
157230
* Executes a given operation, tracks metrics, and handles exceptions.
158231
*

flint-core/src/main/scala/org/opensearch/flint/core/Table.scala

+2
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ trait Table extends Serializable {
6464
* {@link Schema}
6565
*/
6666
def schema(): Schema
67+
68+
def prepare(): Boolean
6769
}
6870

6971
object Table {

flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchCluster.java

+37-14
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,29 @@
55

66
package org.opensearch.flint.core.table;
77

8+
import org.opensearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
9+
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
10+
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
11+
import org.opensearch.action.support.master.AcknowledgedResponse;
812
import org.opensearch.client.RequestOptions;
913
import org.opensearch.client.indices.GetIndexRequest;
1014
import org.opensearch.client.indices.GetIndexResponse;
15+
import org.opensearch.client.opensearch.indices.IndicesStatsRequest;
16+
import org.opensearch.client.opensearch.indices.IndicesStatsResponse;
17+
import org.opensearch.client.opensearch.snapshot.CreateRepositoryRequest;
18+
import org.opensearch.client.opensearch.snapshot.CreateRepositoryResponse;
19+
import org.opensearch.client.opensearch.snapshot.Repository;
20+
import org.opensearch.client.opensearch.snapshot.RepositorySettings;
21+
import org.opensearch.common.settings.Settings;
1122
import org.opensearch.flint.core.FlintOptions;
1223
import org.opensearch.flint.core.IRestHighLevelClient;
1324
import org.opensearch.flint.core.MetaData;
1425
import org.opensearch.flint.core.storage.OpenSearchClientUtils;
26+
import org.opensearch.rest.RestStatus;
1527

1628
import java.util.Arrays;
1729
import java.util.List;
30+
import java.util.Map;
1831
import java.util.logging.Logger;
1932
import java.util.stream.Collectors;
2033

@@ -33,8 +46,13 @@ public class OpenSearchCluster {
3346
* A list of OpenSearchIndexTable instance.
3447
*/
3548
public static List<OpenSearchIndexTable> apply(String indexName, FlintOptions options) {
36-
return getAllOpenSearchTableMetadata(options, indexName.split(","))
37-
.stream()
49+
// FIXME, restore index from snapshot if not exist.
50+
try (IRestHighLevelClient client = OpenSearchClientUtils.createClient(options)) {
51+
client.prepare(indexName);
52+
} catch (Exception e) {
53+
throw new IllegalStateException("Failed to get OpenSearch prepare index " + indexName, e);
54+
}
55+
return getAllOpenSearchTableMetadata(options, indexName.split(",")).stream()
3856
.map(metadata -> new OpenSearchIndexTable(metadata, options))
3957
.collect(Collectors.toList());
4058
}
@@ -46,23 +64,28 @@ public static List<OpenSearchIndexTable> apply(String indexName, FlintOptions op
4664
* @param indexNamePattern index name pattern
4765
* @return list of OpenSearch table metadata
4866
*/
49-
public static List<MetaData> getAllOpenSearchTableMetadata(FlintOptions options, String... indexNamePattern) {
50-
LOG.info("Fetching all OpenSearch table metadata for pattern " + String.join(",", indexNamePattern));
51-
String[] indexNames =
52-
Arrays.stream(indexNamePattern).map(OpenSearchClientUtils::sanitizeIndexName).toArray(String[]::new);
67+
public static List<MetaData> getAllOpenSearchTableMetadata(
68+
FlintOptions options,
69+
String... indexNamePattern) {
70+
LOG.info("Fetching all OpenSearch table metadata for pattern " + String.join(
71+
",",
72+
indexNamePattern));
73+
String[]
74+
indexNames =
75+
Arrays.stream(indexNamePattern)
76+
.map(OpenSearchClientUtils::sanitizeIndexName)
77+
.toArray(String[]::new);
5378
try (IRestHighLevelClient client = OpenSearchClientUtils.createClient(options)) {
5479
GetIndexRequest request = new GetIndexRequest(indexNames);
5580
GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT);
5681

57-
return Arrays.stream(response.getIndices())
58-
.map(index -> new MetaData(
59-
index,
60-
response.getMappings().get(index).source().string(),
61-
response.getSettings().get(index).toString()))
62-
.collect(Collectors.toList());
82+
return Arrays.stream(response.getIndices()).map(index -> new MetaData(index,
83+
response.getMappings().get(index).source().string(),
84+
response.getSettings().get(index).toString())).collect(Collectors.toList());
6385
} catch (Exception e) {
64-
throw new IllegalStateException("Failed to get OpenSearch table metadata for " +
65-
String.join(",", indexNames), e);
86+
throw new IllegalStateException("Failed to get OpenSearch table metadata for " + String.join(
87+
",",
88+
indexNames), e);
6689
}
6790
}
6891
}

flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchIndexTable.scala

+24-3
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,26 @@
55

66
package org.opensearch.flint.core.table
77

8+
import java.util
9+
import java.util.Map
10+
811
import org.json4s.{Formats, NoTypeHints}
912
import org.json4s.JsonAST.JString
1013
import org.json4s.jackson.JsonMethods
1114
import org.json4s.native.Serialization
15+
import org.opensearch.action.admin.cluster.repositories.put.PutRepositoryRequest
16+
import org.opensearch.action.admin.cluster.snapshots.restore.{RestoreSnapshotRequest, RestoreSnapshotResponse}
1217
import org.opensearch.action.search.SearchRequest
13-
import org.opensearch.client.opensearch.indices.IndicesStatsRequest
18+
import org.opensearch.action.support.master.AcknowledgedResponse
19+
import org.opensearch.client.RequestOptions
20+
import org.opensearch.client.indices.GetIndexRequest
21+
import org.opensearch.client.opensearch.indices.{IndicesStatsRequest, IndicesStatsResponse}
1422
import org.opensearch.client.opensearch.indices.stats.IndicesStats
23+
import org.opensearch.common.settings.Settings
1524
import org.opensearch.flint.core._
1625
import org.opensearch.flint.core.storage.{FlintReader, OpenSearchClientUtils, OpenSearchSearchAfterQueryReader}
1726
import org.opensearch.flint.core.table.OpenSearchIndexTable.maxSplitSizeBytes
27+
import org.opensearch.rest.RestStatus
1828
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder
1929
import org.opensearch.search.builder.SearchSourceBuilder
2030
import org.opensearch.search.sort.SortOrder
@@ -58,8 +68,7 @@ class OpenSearchIndexTable(metaData: MetaData, option: FlintOptions) extends Tab
5868
} else {
5969
val totalSizeBytes = indexStats.primaries().store().sizeInBytes
6070
val docSize = Math.ceil(totalSizeBytes / docCount).toLong
61-
// Math.max(Math.min(maxSplitSizeBytes / docSize, maxResultWindow), 1).toInt
62-
1
71+
Math.max(Math.min(maxSplitSizeBytes / docSize, maxResultWindow), 1).toInt
6372
}
6473
}
6574
}
@@ -139,6 +148,18 @@ class OpenSearchIndexTable(metaData: MetaData, option: FlintOptions) extends Tab
139148
* true if splittable, otherwise false.
140149
*/
141150
override def isSplittable(): Boolean = numberOfShards > 1
151+
152+
/**
153+
* FIXME
154+
*
155+
* @return
156+
*/
157+
override def prepare(): Boolean = {
158+
OpenSearchClientUtils
159+
.createClient(option)
160+
.prepare(name)
161+
true
162+
}
142163
}
143164

144165
object OpenSearchIndexTable {

flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintPartitionReaderFactory.scala

+7-3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.apache.spark.sql.flint
77

8+
import org.apache.spark.internal.Logging
89
import org.apache.spark.sql.catalyst.InternalRow
910
import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
1011
import org.apache.spark.sql.connector.expressions.filter.Predicate
@@ -18,12 +19,15 @@ case class FlintPartitionReaderFactory(
1819
options: FlintSparkConf,
1920
pushedPredicates: Array[Predicate],
2021
pushedAggregate: Option[Aggregation])
21-
extends PartitionReaderFactory {
22+
extends PartitionReaderFactory
23+
with Logging {
2224
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
2325
val query = FlintQueryCompiler(schema).compile(pushedPredicates)
2426
val aggBuilder = pushedAggregate.map(agg => FlintQueryCompiler(schema).compileAgg(agg))
25-
26-
val reader = partition.asInstanceOf[OpenSearchSplit].table.createReader(query, aggBuilder)
27+
val table = partition.asInstanceOf[OpenSearchSplit].table
28+
logInfo(s"prepare table")
29+
table.prepare()
30+
val reader = table.createReader(query, aggBuilder)
2731
new FlintPartitionReader(
2832
reader,
2933
schema,

0 commit comments

Comments
 (0)