From d37250a1055a63f5a4c1384d7b172a0cbe1ee24a Mon Sep 17 00:00:00 2001 From: Yee Chen Lim Date: Tue, 20 May 2025 10:03:50 +0000 Subject: [PATCH 1/2] Add support for object fields in Elasticsearch 8 --- .../test-groups/basic-scenarios/blob.go | 10 +- .../test-groups/basic-scenarios/heavy.go | 120 ++++---- .../test-groups/basic-scenarios/light.go | 4 +- .../test-groups/basic-scenarios/medium.go | 10 +- .../test-groups/composite-retrieval/tests.go | 14 +- .../test-groups/logs-search/tests.go | 120 ++++---- .../time-series-operations/tests.go | 12 +- .../test-groups/vector-search/tests.go | 4 +- db/db.go | 67 ++++- db/es/es_test.go | 16 +- db/es/maintenance.go | 63 +++- db/es/maintenance_test.go | 273 ++++++++++++++++++ db/es/select.go | 10 +- db/es/vector_test.go | 6 +- db/sql/maintenance.go | 4 +- db/sql/select.go | 10 +- db/sql/sql_test.go | 12 +- db/sql/vector_test.go | 4 +- 18 files changed, 573 insertions(+), 186 deletions(-) diff --git a/acronis-db-bench/test-groups/basic-scenarios/blob.go b/acronis-db-bench/test-groups/basic-scenarios/blob.go index 426a0ee2..f627e2d6 100644 --- a/acronis-db-bench/test-groups/basic-scenarios/blob.go +++ b/acronis-db-bench/test-groups/basic-scenarios/blob.go @@ -22,11 +22,11 @@ var TestTableBlob = engine.TestTable{ TableDefinition: func(dialect db.DialectName) *db.TableDefinition { return &db.TableDefinition{ TableRows: []db.TableRow{ - {Name: "id", Type: db.DataTypeBigIntAutoIncPK}, - {Name: "uuid", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, - {Name: "tenant_id", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, - {Name: "timestamp", Type: db.DataTypeBigInt, NotNull: true, Indexed: true}, - {Name: "data", Type: db.DataTypeHugeBlob, NotNull: true}, + db.TableRowItem{Name: "id", Type: db.DataTypeBigIntAutoIncPK}, + db.TableRowItem{Name: "uuid", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "tenant_id", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "timestamp", Type: db.DataTypeBigInt, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "data", Type: db.DataTypeHugeBlob, NotNull: true}, }, } }, diff --git a/acronis-db-bench/test-groups/basic-scenarios/heavy.go b/acronis-db-bench/test-groups/basic-scenarios/heavy.go index 3ee83952..b7599c90 100644 --- a/acronis-db-bench/test-groups/basic-scenarios/heavy.go +++ b/acronis-db-bench/test-groups/basic-scenarios/heavy.go @@ -136,104 +136,104 @@ var TestTableHeavy = engine.TestTable{ var tableRows []db.TableRow tableRows = append(tableRows, - db.TableRow{Name: "id", Type: db.DataTypeBigIntAutoIncPK, Indexed: true}, - db.TableRow{Name: "uuid", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, - db.TableRow{Name: "checksum", Type: db.DataTypeVarChar64, NotNull: true}, - db.TableRow{Name: "cti_entity_uuid", Type: db.DataTypeVarChar36, Indexed: true}, + db.TableRowItem{Name: "id", Type: db.DataTypeBigIntAutoIncPK, Indexed: true}, + db.TableRowItem{Name: "uuid", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "checksum", Type: db.DataTypeVarChar64, NotNull: true}, + db.TableRowItem{Name: "cti_entity_uuid", Type: db.DataTypeVarChar36, Indexed: true}, ) if dialect == db.CLICKHOUSE { // Needed for primary key tableRows = append(tableRows, - db.TableRow{Name: "partner_id", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, - db.TableRow{Name: "customer_id", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, - db.TableRow{Name: "tenant_id", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "partner_id", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "customer_id", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "tenant_id", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, ) } else if dialect == db.ELASTICSEARCH { tableRows = append(tableRows, - db.TableRow{Name: "tenant_id", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, - db.TableRow{Name: "tenant_vis_list", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "tenant_id", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "tenant_vis_list", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, ) } else { tableRows = append(tableRows, - db.TableRow{Name: "tenant_id", Type: db.DataTypeVarChar36, NotNull: true, Indexed: true}, - db.TableRow{Name: "euc_id", Type: db.DataTypeVarChar64, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "tenant_id", Type: db.DataTypeVarChar36, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "euc_id", Type: db.DataTypeVarChar64, NotNull: true, Indexed: true}, ) } tableRows = append(tableRows, - db.TableRow{Name: "workflow_id", Type: db.DataTypeBigInt, NotNull: true, Indexed: true}, - db.TableRow{Name: "state", Type: db.DataTypeInt, NotNull: true, Indexed: true}, - db.TableRow{Name: "type", Type: db.DataTypeVarChar64, NotNull: true, Indexed: true}, - db.TableRow{Name: "queue", Type: db.DataTypeVarChar64, NotNull: true, Indexed: true}, - db.TableRow{Name: "priority", Type: db.DataTypeInt, NotNull: true, Indexed: true}, - - db.TableRow{Name: "issuer_id", Type: db.DataTypeVarChar64, NotNull: true, Indexed: true}, - db.TableRow{Name: "issuer_cluster_id", Type: db.DataTypeVarChar64, Indexed: true}, - - db.TableRow{Name: "heartbeat_ivl", Type: db.DataTypeBigInt}, - db.TableRow{Name: "queue_timeout", Type: db.DataTypeBigInt}, - db.TableRow{Name: "ack_timeout", Type: db.DataTypeBigInt}, - db.TableRow{Name: "exec_timeout", Type: db.DataTypeBigInt}, - db.TableRow{Name: "life_time", Type: db.DataTypeBigInt}, - - db.TableRow{Name: "max_assign_count", Type: db.DataTypeInt, NotNull: true}, - db.TableRow{Name: "assign_count", Type: db.DataTypeInt, NotNull: true}, - db.TableRow{Name: "cancellable", Type: db.DataTypeBoolean, NotNull: true}, - db.TableRow{Name: "cancel_requested", Type: db.DataTypeBoolean, NotNull: true}, - db.TableRow{Name: "blocker_count", Type: db.DataTypeInt, NotNull: true}, - - db.TableRow{Name: "started_by_user", Type: db.DataTypeVarChar256, Indexed: true}, + db.TableRowItem{Name: "workflow_id", Type: db.DataTypeBigInt, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "state", Type: db.DataTypeInt, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "type", Type: db.DataTypeVarChar64, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "queue", Type: db.DataTypeVarChar64, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "priority", Type: db.DataTypeInt, NotNull: true, Indexed: true}, + + db.TableRowItem{Name: "issuer_id", Type: db.DataTypeVarChar64, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "issuer_cluster_id", Type: db.DataTypeVarChar64, Indexed: true}, + + db.TableRowItem{Name: "heartbeat_ivl", Type: db.DataTypeBigInt}, + db.TableRowItem{Name: "queue_timeout", Type: db.DataTypeBigInt}, + db.TableRowItem{Name: "ack_timeout", Type: db.DataTypeBigInt}, + db.TableRowItem{Name: "exec_timeout", Type: db.DataTypeBigInt}, + db.TableRowItem{Name: "life_time", Type: db.DataTypeBigInt}, + + db.TableRowItem{Name: "max_assign_count", Type: db.DataTypeInt, NotNull: true}, + db.TableRowItem{Name: "assign_count", Type: db.DataTypeInt, NotNull: true}, + db.TableRowItem{Name: "cancellable", Type: db.DataTypeBoolean, NotNull: true}, + db.TableRowItem{Name: "cancel_requested", Type: db.DataTypeBoolean, NotNull: true}, + db.TableRowItem{Name: "blocker_count", Type: db.DataTypeInt, NotNull: true}, + + db.TableRowItem{Name: "started_by_user", Type: db.DataTypeVarChar256, Indexed: true}, ) if dialect == db.CASSANDRA { - tableRows = append(tableRows, db.TableRow{Name: "policy_id", Type: db.DataTypeInt, Indexed: true}) + tableRows = append(tableRows, db.TableRowItem{Name: "policy_id", Type: db.DataTypeInt, Indexed: true}) } else { - tableRows = append(tableRows, db.TableRow{Name: "policy_id", Type: db.DataTypeVarChar64, Indexed: true}) + tableRows = append(tableRows, db.TableRowItem{Name: "policy_id", Type: db.DataTypeVarChar64, Indexed: true}) } tableRows = append(tableRows, - db.TableRow{Name: "policy_type", Type: db.DataTypeVarChar64, Indexed: true}, - db.TableRow{Name: "policy_name", Type: db.DataTypeVarChar256, Indexed: true}, + db.TableRowItem{Name: "policy_type", Type: db.DataTypeVarChar64, Indexed: true}, + db.TableRowItem{Name: "policy_name", Type: db.DataTypeVarChar256, Indexed: true}, - db.TableRow{Name: "resource_id", Type: db.DataTypeVarChar64, Indexed: true}, + db.TableRowItem{Name: "resource_id", Type: db.DataTypeVarChar64, Indexed: true}, ) if dialect == db.CASSANDRA { - tableRows = append(tableRows, db.TableRow{Name: "resource_type", Type: db.DataTypeInt, Indexed: true}) + tableRows = append(tableRows, db.TableRowItem{Name: "resource_type", Type: db.DataTypeInt, Indexed: true}) } else { - tableRows = append(tableRows, db.TableRow{Name: "resource_type", Type: db.DataTypeVarChar64, Indexed: true}) + tableRows = append(tableRows, db.TableRowItem{Name: "resource_type", Type: db.DataTypeVarChar64, Indexed: true}) } tableRows = append(tableRows, - db.TableRow{Name: "resource_name", Type: db.DataTypeVarChar256, Indexed: true}, + db.TableRowItem{Name: "resource_name", Type: db.DataTypeVarChar256, Indexed: true}, - db.TableRow{Name: "tags", Type: db.DataTypeText, Indexed: true}, + db.TableRowItem{Name: "tags", Type: db.DataTypeText, Indexed: true}, - db.TableRow{Name: "affinity_agent_id", Type: db.DataTypeVarChar64, NotNull: true, Indexed: true}, - db.TableRow{Name: "affinity_cluster_id", Type: db.DataTypeVarChar64, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "affinity_agent_id", Type: db.DataTypeVarChar64, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "affinity_cluster_id", Type: db.DataTypeVarChar64, NotNull: true, Indexed: true}, - db.TableRow{Name: "argument", Type: db.DataTypeBinaryBlobType}, - db.TableRow{Name: "context", Type: db.DataTypeBinaryBlobType}, + db.TableRowItem{Name: "argument", Type: db.DataTypeBinaryBlobType}, + db.TableRowItem{Name: "context", Type: db.DataTypeBinaryBlobType}, - db.TableRow{Name: "progress", Type: db.DataTypeInt}, - db.TableRow{Name: "progress_total", Type: db.DataTypeInt}, + db.TableRowItem{Name: "progress", Type: db.DataTypeInt}, + db.TableRowItem{Name: "progress_total", Type: db.DataTypeInt}, - db.TableRow{Name: "assigned_agent_id", Type: db.DataTypeVarChar64, Indexed: true}, - db.TableRow{Name: "assigned_agent_cluster_id", Type: db.DataTypeVarChar64, Indexed: true}, + db.TableRowItem{Name: "assigned_agent_id", Type: db.DataTypeVarChar64, Indexed: true}, + db.TableRowItem{Name: "assigned_agent_cluster_id", Type: db.DataTypeVarChar64, Indexed: true}, - db.TableRow{Name: "enqueue_time", Type: db.DataTypeBigInt, Indexed: true}, - db.TableRow{Name: "assign_time", Type: db.DataTypeBigInt, Indexed: true}, - db.TableRow{Name: "start_time", Type: db.DataTypeBigInt, Indexed: true}, - db.TableRow{Name: "update_time", Type: db.DataTypeBigInt, Indexed: true}, - db.TableRow{Name: "completion_time", Type: db.DataTypeBigInt, Indexed: true}, + db.TableRowItem{Name: "enqueue_time", Type: db.DataTypeBigInt, Indexed: true}, + db.TableRowItem{Name: "assign_time", Type: db.DataTypeBigInt, Indexed: true}, + db.TableRowItem{Name: "start_time", Type: db.DataTypeBigInt, Indexed: true}, + db.TableRowItem{Name: "update_time", Type: db.DataTypeBigInt, Indexed: true}, + db.TableRowItem{Name: "completion_time", Type: db.DataTypeBigInt, Indexed: true}, - db.TableRow{Name: "result_code", Type: db.DataTypeInt, Indexed: true}, - db.TableRow{Name: "result_error", Type: db.DataTypeBinaryBlobType}, - db.TableRow{Name: "result_warnings", Type: db.DataTypeBinaryBlobType}, - db.TableRow{Name: "result_payload", Type: db.DataTypeBinaryBlobType}, + db.TableRowItem{Name: "result_code", Type: db.DataTypeInt, Indexed: true}, + db.TableRowItem{Name: "result_error", Type: db.DataTypeBinaryBlobType}, + db.TableRowItem{Name: "result_warnings", Type: db.DataTypeBinaryBlobType}, + db.TableRowItem{Name: "result_payload", Type: db.DataTypeBinaryBlobType}, - db.TableRow{Name: "const_val", Type: db.DataTypeInt}, + db.TableRowItem{Name: "const_val", Type: db.DataTypeInt}, ) var tableDef = &db.TableDefinition{ diff --git a/acronis-db-bench/test-groups/basic-scenarios/light.go b/acronis-db-bench/test-groups/basic-scenarios/light.go index 25c2b65f..1426996c 100644 --- a/acronis-db-bench/test-groups/basic-scenarios/light.go +++ b/acronis-db-bench/test-groups/basic-scenarios/light.go @@ -18,8 +18,8 @@ var TestTableLight = engine.TestTable{ TableDefinition: func(dialect db.DialectName) *db.TableDefinition { return &db.TableDefinition{ TableRows: []db.TableRow{ - {Name: "id", Type: db.DataTypeBigIntAutoIncPK}, - {Name: "uuid", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "id", Type: db.DataTypeBigIntAutoIncPK}, + db.TableRowItem{Name: "uuid", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, }, } }, diff --git a/acronis-db-bench/test-groups/basic-scenarios/medium.go b/acronis-db-bench/test-groups/basic-scenarios/medium.go index e8831a68..4ee4a96e 100644 --- a/acronis-db-bench/test-groups/basic-scenarios/medium.go +++ b/acronis-db-bench/test-groups/basic-scenarios/medium.go @@ -25,11 +25,11 @@ var TestTableMedium = engine.TestTable{ TableDefinition: func(dialect db.DialectName) *db.TableDefinition { return &db.TableDefinition{ TableRows: []db.TableRow{ - {Name: "id", Type: db.DataTypeBigIntAutoIncPK}, - {Name: "uuid", Type: db.DataTypeVarCharUUID, NotNull: true, Indexed: true}, - {Name: "tenant_id", Type: db.DataTypeVarCharUUID, NotNull: true, Indexed: true}, - {Name: "euc_id", Type: db.DataTypeInt, NotNull: true, Indexed: true}, - {Name: "progress", Type: db.DataTypeInt}, + db.TableRowItem{Name: "id", Type: db.DataTypeBigIntAutoIncPK}, + db.TableRowItem{Name: "uuid", Type: db.DataTypeVarCharUUID, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "tenant_id", Type: db.DataTypeVarCharUUID, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "euc_id", Type: db.DataTypeInt, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "progress", Type: db.DataTypeInt}, }, } }, diff --git a/acronis-db-bench/test-groups/composite-retrieval/tests.go b/acronis-db-bench/test-groups/composite-retrieval/tests.go index b1d44d63..9aeff113 100644 --- a/acronis-db-bench/test-groups/composite-retrieval/tests.go +++ b/acronis-db-bench/test-groups/composite-retrieval/tests.go @@ -38,13 +38,13 @@ var TestTableEmailSecurity = engine.TestTable{ var tableRows []db.TableRow tableRows = append(tableRows, - db.TableRow{Name: "id", Type: db.DataTypeBigInt, Indexed: true}, - db.TableRow{Name: "date", Type: db.DataTypeDateTime, Indexed: true}, - db.TableRow{Name: "sender", Type: db.DataTypeVarChar, Indexed: true}, - db.TableRow{Name: "recipient", Type: db.DataTypeVarChar, Indexed: true}, - db.TableRow{Name: "subject", Type: db.DataTypeVarChar, Indexed: true}, - db.TableRow{Name: "body", Type: db.DataTypeText, Indexed: true}, - db.TableRow{Name: "embedding", Type: db.DataTypeVector768Float32, Indexed: true}, + db.TableRowItem{Name: "id", Type: db.DataTypeBigInt, Indexed: true}, + db.TableRowItem{Name: "date", Type: db.DataTypeDateTime, Indexed: true}, + db.TableRowItem{Name: "sender", Type: db.DataTypeVarChar, Indexed: true}, + db.TableRowItem{Name: "recipient", Type: db.DataTypeVarChar, Indexed: true}, + db.TableRowItem{Name: "subject", Type: db.DataTypeVarChar, Indexed: true}, + db.TableRowItem{Name: "body", Type: db.DataTypeText, Indexed: true}, + db.TableRowItem{Name: "embedding", Type: db.DataTypeVector768Float32, Indexed: true}, ) var tableDef = &db.TableDefinition{ diff --git a/acronis-db-bench/test-groups/logs-search/tests.go b/acronis-db-bench/test-groups/logs-search/tests.go index 4d291ef6..44c164ef 100644 --- a/acronis-db-bench/test-groups/logs-search/tests.go +++ b/acronis-db-bench/test-groups/logs-search/tests.go @@ -153,104 +153,104 @@ var TestTableLogs = engine.TestTable{ var tableRows []db.TableRow tableRows = append(tableRows, - db.TableRow{Name: "id", Type: db.DataTypeBigIntAutoIncPK, Indexed: true}, - db.TableRow{Name: "uuid", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, - db.TableRow{Name: "checksum", Type: db.DataTypeVarChar64, NotNull: true}, - db.TableRow{Name: "cti_entity_uuid", Type: db.DataTypeVarChar36, Indexed: true}, + db.TableRowItem{Name: "id", Type: db.DataTypeBigIntAutoIncPK, Indexed: true}, + db.TableRowItem{Name: "uuid", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "checksum", Type: db.DataTypeVarChar64, NotNull: true}, + db.TableRowItem{Name: "cti_entity_uuid", Type: db.DataTypeVarChar36, Indexed: true}, ) if dialect == db.CLICKHOUSE { // Needed for primary key tableRows = append(tableRows, - db.TableRow{Name: "partner_id", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, - db.TableRow{Name: "customer_id", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, - db.TableRow{Name: "tenant_id", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "partner_id", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "customer_id", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "tenant_id", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, ) } else if dialect == db.ELASTICSEARCH { tableRows = append(tableRows, - db.TableRow{Name: "tenant_id", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, - db.TableRow{Name: "tenant_vis_list", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "tenant_id", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "tenant_vis_list", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, ) } else { tableRows = append(tableRows, - db.TableRow{Name: "tenant_id", Type: db.DataTypeVarChar36, NotNull: true, Indexed: true}, - db.TableRow{Name: "euc_id", Type: db.DataTypeVarChar64, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "tenant_id", Type: db.DataTypeVarChar36, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "euc_id", Type: db.DataTypeVarChar64, NotNull: true, Indexed: true}, ) } tableRows = append(tableRows, - db.TableRow{Name: "workflow_id", Type: db.DataTypeBigInt, NotNull: true, Indexed: true}, - db.TableRow{Name: "state", Type: db.DataTypeInt, NotNull: true, Indexed: true}, - db.TableRow{Name: "type", Type: db.DataTypeVarChar64, NotNull: true, Indexed: true}, - db.TableRow{Name: "queue", Type: db.DataTypeVarChar64, NotNull: true, Indexed: true}, - db.TableRow{Name: "priority", Type: db.DataTypeInt, NotNull: true, Indexed: true}, - - db.TableRow{Name: "issuer_id", Type: db.DataTypeVarChar64, NotNull: true, Indexed: true}, - db.TableRow{Name: "issuer_cluster_id", Type: db.DataTypeVarChar64, Indexed: true}, - - db.TableRow{Name: "heartbeat_ivl", Type: db.DataTypeBigInt}, - db.TableRow{Name: "queue_timeout", Type: db.DataTypeBigInt}, - db.TableRow{Name: "ack_timeout", Type: db.DataTypeBigInt}, - db.TableRow{Name: "exec_timeout", Type: db.DataTypeBigInt}, - db.TableRow{Name: "life_time", Type: db.DataTypeBigInt}, - - db.TableRow{Name: "max_assign_count", Type: db.DataTypeInt, NotNull: true}, - db.TableRow{Name: "assign_count", Type: db.DataTypeInt, NotNull: true}, - db.TableRow{Name: "cancellable", Type: db.DataTypeBoolean, NotNull: true}, - db.TableRow{Name: "cancel_requested", Type: db.DataTypeBoolean, NotNull: true}, - db.TableRow{Name: "blocker_count", Type: db.DataTypeInt, NotNull: true}, - - db.TableRow{Name: "started_by_user", Type: db.DataTypeVarChar256, Indexed: true}, + db.TableRowItem{Name: "workflow_id", Type: db.DataTypeBigInt, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "state", Type: db.DataTypeInt, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "type", Type: db.DataTypeVarChar64, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "queue", Type: db.DataTypeVarChar64, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "priority", Type: db.DataTypeInt, NotNull: true, Indexed: true}, + + db.TableRowItem{Name: "issuer_id", Type: db.DataTypeVarChar64, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "issuer_cluster_id", Type: db.DataTypeVarChar64, Indexed: true}, + + db.TableRowItem{Name: "heartbeat_ivl", Type: db.DataTypeBigInt}, + db.TableRowItem{Name: "queue_timeout", Type: db.DataTypeBigInt}, + db.TableRowItem{Name: "ack_timeout", Type: db.DataTypeBigInt}, + db.TableRowItem{Name: "exec_timeout", Type: db.DataTypeBigInt}, + db.TableRowItem{Name: "life_time", Type: db.DataTypeBigInt}, + + db.TableRowItem{Name: "max_assign_count", Type: db.DataTypeInt, NotNull: true}, + db.TableRowItem{Name: "assign_count", Type: db.DataTypeInt, NotNull: true}, + db.TableRowItem{Name: "cancellable", Type: db.DataTypeBoolean, NotNull: true}, + db.TableRowItem{Name: "cancel_requested", Type: db.DataTypeBoolean, NotNull: true}, + db.TableRowItem{Name: "blocker_count", Type: db.DataTypeInt, NotNull: true}, + + db.TableRowItem{Name: "started_by_user", Type: db.DataTypeVarChar256, Indexed: true}, ) if dialect == db.CASSANDRA { - tableRows = append(tableRows, db.TableRow{Name: "policy_id", Type: db.DataTypeInt, Indexed: true}) + tableRows = append(tableRows, db.TableRowItem{Name: "policy_id", Type: db.DataTypeInt, Indexed: true}) } else { - tableRows = append(tableRows, db.TableRow{Name: "policy_id", Type: db.DataTypeVarChar64, Indexed: true}) + tableRows = append(tableRows, db.TableRowItem{Name: "policy_id", Type: db.DataTypeVarChar64, Indexed: true}) } tableRows = append(tableRows, - db.TableRow{Name: "policy_type", Type: db.DataTypeVarChar64, Indexed: true}, - db.TableRow{Name: "policy_name", Type: db.DataTypeVarChar256, Indexed: true}, + db.TableRowItem{Name: "policy_type", Type: db.DataTypeVarChar64, Indexed: true}, + db.TableRowItem{Name: "policy_name", Type: db.DataTypeVarChar256, Indexed: true}, - db.TableRow{Name: "resource_id", Type: db.DataTypeVarChar64, Indexed: true}, + db.TableRowItem{Name: "resource_id", Type: db.DataTypeVarChar64, Indexed: true}, ) if dialect == db.CASSANDRA { - tableRows = append(tableRows, db.TableRow{Name: "resource_type", Type: db.DataTypeInt, Indexed: true}) + tableRows = append(tableRows, db.TableRowItem{Name: "resource_type", Type: db.DataTypeInt, Indexed: true}) } else { - tableRows = append(tableRows, db.TableRow{Name: "resource_type", Type: db.DataTypeVarChar64, Indexed: true}) + tableRows = append(tableRows, db.TableRowItem{Name: "resource_type", Type: db.DataTypeVarChar64, Indexed: true}) } tableRows = append(tableRows, - db.TableRow{Name: "resource_name", Type: db.DataTypeVarChar256, Indexed: true}, + db.TableRowItem{Name: "resource_name", Type: db.DataTypeVarChar256, Indexed: true}, - db.TableRow{Name: "tags", Type: db.DataTypeText, Indexed: true}, + db.TableRowItem{Name: "tags", Type: db.DataTypeText, Indexed: true}, - db.TableRow{Name: "affinity_agent_id", Type: db.DataTypeVarChar64, NotNull: true, Indexed: true}, - db.TableRow{Name: "affinity_cluster_id", Type: db.DataTypeVarChar64, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "affinity_agent_id", Type: db.DataTypeVarChar64, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "affinity_cluster_id", Type: db.DataTypeVarChar64, NotNull: true, Indexed: true}, - db.TableRow{Name: "argument", Type: db.DataTypeBinaryBlobType}, - db.TableRow{Name: "context", Type: db.DataTypeBinaryBlobType}, + db.TableRowItem{Name: "argument", Type: db.DataTypeBinaryBlobType}, + db.TableRowItem{Name: "context", Type: db.DataTypeBinaryBlobType}, - db.TableRow{Name: "progress", Type: db.DataTypeInt}, - db.TableRow{Name: "progress_total", Type: db.DataTypeInt}, + db.TableRowItem{Name: "progress", Type: db.DataTypeInt}, + db.TableRowItem{Name: "progress_total", Type: db.DataTypeInt}, - db.TableRow{Name: "assigned_agent_id", Type: db.DataTypeVarChar64, Indexed: true}, - db.TableRow{Name: "assigned_agent_cluster_id", Type: db.DataTypeVarChar64, Indexed: true}, + db.TableRowItem{Name: "assigned_agent_id", Type: db.DataTypeVarChar64, Indexed: true}, + db.TableRowItem{Name: "assigned_agent_cluster_id", Type: db.DataTypeVarChar64, Indexed: true}, - db.TableRow{Name: "enqueue_time", Type: db.DataTypeBigInt, Indexed: true}, - db.TableRow{Name: "assign_time", Type: db.DataTypeBigInt, Indexed: true}, - db.TableRow{Name: "start_time", Type: db.DataTypeBigInt, Indexed: true}, - db.TableRow{Name: "update_time", Type: db.DataTypeBigInt, Indexed: true}, - db.TableRow{Name: "completion_time", Type: db.DataTypeBigInt, Indexed: true}, + db.TableRowItem{Name: "enqueue_time", Type: db.DataTypeBigInt, Indexed: true}, + db.TableRowItem{Name: "assign_time", Type: db.DataTypeBigInt, Indexed: true}, + db.TableRowItem{Name: "start_time", Type: db.DataTypeBigInt, Indexed: true}, + db.TableRowItem{Name: "update_time", Type: db.DataTypeBigInt, Indexed: true}, + db.TableRowItem{Name: "completion_time", Type: db.DataTypeBigInt, Indexed: true}, - db.TableRow{Name: "result_code", Type: db.DataTypeInt, Indexed: true}, - db.TableRow{Name: "result_error", Type: db.DataTypeBinaryBlobType}, - db.TableRow{Name: "result_warnings", Type: db.DataTypeBinaryBlobType}, - db.TableRow{Name: "result_payload", Type: db.DataTypeBinaryBlobType}, + db.TableRowItem{Name: "result_code", Type: db.DataTypeInt, Indexed: true}, + db.TableRowItem{Name: "result_error", Type: db.DataTypeBinaryBlobType}, + db.TableRowItem{Name: "result_warnings", Type: db.DataTypeBinaryBlobType}, + db.TableRowItem{Name: "result_payload", Type: db.DataTypeBinaryBlobType}, - db.TableRow{Name: "const_val", Type: db.DataTypeInt}, + db.TableRowItem{Name: "const_val", Type: db.DataTypeInt}, ) var tableDef = &db.TableDefinition{ diff --git a/acronis-db-bench/test-groups/time-series-operations/tests.go b/acronis-db-bench/test-groups/time-series-operations/tests.go index 2e77f999..14d3c157 100644 --- a/acronis-db-bench/test-groups/time-series-operations/tests.go +++ b/acronis-db-bench/test-groups/time-series-operations/tests.go @@ -38,12 +38,12 @@ var TestTableTimeSeriesSQL = engine.TestTable{ TableDefinition: func(dialect db.DialectName) *db.TableDefinition { return &db.TableDefinition{ TableRows: []db.TableRow{ - {Name: "id", Type: db.DataTypeBigIntAutoIncPK}, - {Name: "tenant_id", Type: db.DataTypeVarCharUUID, NotNull: true, Indexed: true}, - {Name: "device_id", Type: db.DataTypeVarCharUUID, NotNull: true, Indexed: true}, - {Name: "metric_id", Type: db.DataTypeVarCharUUID, NotNull: true, Indexed: true}, - {Name: "ts", Type: db.DataTypeTimestamp, NotNull: true, Indexed: true}, - {Name: "value", Type: db.DataTypeInt, NotNull: true}, + db.TableRowItem{Name: "id", Type: db.DataTypeBigIntAutoIncPK}, + db.TableRowItem{Name: "tenant_id", Type: db.DataTypeVarCharUUID, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "device_id", Type: db.DataTypeVarCharUUID, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "metric_id", Type: db.DataTypeVarCharUUID, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "ts", Type: db.DataTypeTimestamp, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "value", Type: db.DataTypeInt, NotNull: true}, }, } }, diff --git a/acronis-db-bench/test-groups/vector-search/tests.go b/acronis-db-bench/test-groups/vector-search/tests.go index 7fc70845..261af089 100644 --- a/acronis-db-bench/test-groups/vector-search/tests.go +++ b/acronis-db-bench/test-groups/vector-search/tests.go @@ -34,8 +34,8 @@ var TestTableVector768 = engine.TestTable{ var tableRows []db.TableRow tableRows = append(tableRows, - db.TableRow{Name: "id", Type: db.DataTypeBigInt, Indexed: true}, - db.TableRow{Name: "embedding", Type: db.DataTypeVector768Float32, Indexed: true}, + db.TableRowItem{Name: "id", Type: db.DataTypeBigInt, Indexed: true}, + db.TableRowItem{Name: "embedding", Type: db.DataTypeVector768Float32, Indexed: true}, ) var tableDef = &db.TableDefinition{ diff --git a/db/db.go b/db/db.go index 4dd63fd1..b007a191 100644 --- a/db/db.go +++ b/db/db.go @@ -1230,7 +1230,16 @@ type Session interface { // }, // } // ``` -type TableRow struct { +type TableRow interface { + GetName() string + GetType() DataType + IsPrimaryKey() bool + IsNotNull() bool + IsIndexed() bool + GetSubtable() []TableRow +} + +type TableRowItem struct { Name string Type DataType PrimaryKey bool @@ -1238,6 +1247,60 @@ type TableRow struct { Indexed bool // only for Elasticsearch } +func (tri TableRowItem) GetName() string { + return tri.Name +} + +func (tri TableRowItem) GetType() DataType { + return tri.Type +} + +func (tri TableRowItem) IsPrimaryKey() bool { + return tri.PrimaryKey +} + +func (tri TableRowItem) IsNotNull() bool { + return tri.NotNull +} + +func (tri TableRowItem) IsIndexed() bool { + return tri.Indexed +} + +func (tri TableRowItem) GetSubtable() []TableRow { + return nil +} + +// TableRowSubtable is a struct for storing nested fields in NoSQL databases +type TableRowSubtable struct { + Name string + Type DataType + Subtable []TableRow +} + +func (trst TableRowSubtable) GetName() string { + return trst.Name +} + +func (trst TableRowSubtable) GetType() DataType { + return trst.Type +} + +func (trst TableRowSubtable) IsPrimaryKey() bool { + return false +} + +func (trst TableRowSubtable) IsNotNull() bool { + return false +} + +func (trst TableRowSubtable) IsIndexed() bool { + return false +} +func (trst TableRowSubtable) GetSubtable() []TableRow { + return trst.Subtable +} + // ResilienceSettings defines database replication and sharding configuration // SQL databases: Typically ignored // Elasticsearch: Controls cluster configuration @@ -1825,6 +1888,8 @@ const ( DataTypeEngine DataType = "{$engine}" // Storage engine specification DataTypeNotNull DataType = "{$notnull}" // NOT NULL constraint DataTypeNull DataType = "{$null}" // NULL allowed + DataTypeObject DataType = "{$object}" // Only supported for nosql databases + DataTypeNested DataType = "{$nested}" // Only supported for vector databases ) // Dialect is an interface for database dialects diff --git a/db/es/es_test.go b/db/es/es_test.go index 8b83d125..d06e1735 100644 --- a/db/es/es_test.go +++ b/db/es/es_test.go @@ -45,14 +45,14 @@ func (l *testLogger) Log(format string, args ...interface{}) { func testTableDefinition() *db.TableDefinition { return &db.TableDefinition{ TableRows: []db.TableRow{ - {Name: "@timestamp", Type: db.DataTypeDateTime, Indexed: true}, - {Name: "id", Type: db.DataTypeId, Indexed: true}, - {Name: "uuid", Type: db.DataTypeUUID, Indexed: true}, - {Name: "type", Type: db.DataTypeVarChar, Indexed: true}, - {Name: "policy_name", Type: db.DataTypeVarChar, Indexed: true}, - {Name: "resource_name", Type: db.DataTypeVarChar, Indexed: true}, - {Name: "accessors", Type: db.DataTypeVarChar, Indexed: true}, - {Name: "start_time", Type: db.DataTypeDateTime, Indexed: true}, + db.TableRowItem{Name: "@timestamp", Type: db.DataTypeDateTime, Indexed: true}, + db.TableRowItem{Name: "id", Type: db.DataTypeId, Indexed: true}, + db.TableRowItem{Name: "uuid", Type: db.DataTypeUUID, Indexed: true}, + db.TableRowItem{Name: "type", Type: db.DataTypeVarChar, Indexed: true}, + db.TableRowItem{Name: "policy_name", Type: db.DataTypeVarChar, Indexed: true}, + db.TableRowItem{Name: "resource_name", Type: db.DataTypeVarChar, Indexed: true}, + db.TableRowItem{Name: "accessors", Type: db.DataTypeVarChar, Indexed: true}, + db.TableRowItem{Name: "start_time", Type: db.DataTypeDateTime, Indexed: true}, }, } } diff --git a/db/es/maintenance.go b/db/es/maintenance.go index d2773026..0ab4141d 100644 --- a/db/es/maintenance.go +++ b/db/es/maintenance.go @@ -1,6 +1,7 @@ package es import ( + "encoding/json" "fmt" "github.com/acronis/perfkit/db" @@ -64,20 +65,43 @@ const ( fieldTypeBoolean fieldType = "boolean" fieldTypeDateNano fieldType = "date_nanos" fieldTypeDenseVector fieldType = "dense_vector" + fieldTypeObject fieldType = "object" ) -type fieldSpec struct { +type fieldSpec interface { + MarshalJSON() ([]byte, error) +} + +type fieldSpecItem struct { Type fieldType Dims int Indexed bool } func convertToEsType(d dialect, t db.TableRow) fieldSpec { - var spec = fieldSpec{ - Indexed: t.Indexed, + if t.GetType() == db.DataTypeObject || t.GetType() == db.DataTypeNested { + subFieldSpec := make(map[string]fieldSpec, len(t.GetSubtable())) + for _, subTable := range t.GetSubtable() { + subFieldSpec[subTable.GetName()] = convertToEsType(d, subTable) + } + + specObj := fieldSpecObject{ + Type: fieldTypeObject, + fieldSpecs: subFieldSpec, + } + + if t.GetType() == db.DataTypeNested { + specObj.IsNested = true + } + + return specObj } - switch t.Type { + var spec = fieldSpecItem{ + Indexed: t.IsIndexed(), + } + + switch t.GetType() { case db.DataTypeBigIntAutoInc, db.DataTypeId, db.DataTypeInt: spec.Type = fieldTypeLong case db.DataTypeUUID: @@ -103,7 +127,7 @@ func convertToEsType(d dialect, t db.TableRow) fieldSpec { return spec } -func (s fieldSpec) MarshalJSON() ([]byte, error) { +func (s fieldSpecItem) MarshalJSON() ([]byte, error) { if s.Dims > 0 { if s.Indexed { return []byte(fmt.Sprintf(`{"type":%q, "dims":%d}`, s.Type, s.Dims)), nil @@ -115,9 +139,34 @@ func (s fieldSpec) MarshalJSON() ([]byte, error) { if s.Indexed { return []byte(fmt.Sprintf(`{"type":%q}`, s.Type)), nil } + return []byte(fmt.Sprintf(`{"type":%q, "index": false}`, s.Type)), nil } +type fieldSpecObject struct { + Type fieldType + IsNested bool + fieldSpecs map[string]fieldSpec +} + +func (fsn fieldSpecObject) MarshalJSON() ([]byte, error) { + var fields = make(map[string]any, 2) + + if fsn.IsNested { + fields["type"] = "nested" + } + + properties := make(map[string]any, len(fsn.fieldSpecs)+1) + + for name, f := range fsn.fieldSpecs { + properties[name] = f + } + + fields["properties"] = properties + + return json.Marshal(fields) +} + type mapping map[string]fieldSpec type indexName string @@ -213,11 +262,11 @@ func createIndex(d dialect, mig migrator, indexName string, indexDefinition *db. var mp = make(mapping) for _, row := range indexDefinition.TableRows { - if row.Name == "id" { + if row.GetName() == "id" { continue } - mp[row.Name] = convertToEsType(d, row) + mp[row.GetName()] = convertToEsType(d, row) } if err := mig.initComponentTemplate(mappingTemplateName, componentTemplate{ diff --git a/db/es/maintenance_test.go b/db/es/maintenance_test.go index 74d9c4d0..6bb68be2 100644 --- a/db/es/maintenance_test.go +++ b/db/es/maintenance_test.go @@ -1,6 +1,11 @@ package es import ( + "bytes" + "encoding/json" + "reflect" + "strings" + "testing" "time" "github.com/acronis/perfkit/db" @@ -55,3 +60,271 @@ func (suite *TestingSuite) TestElasticSearchSchemaInit() { return } } + +func Test_fieldSpecItem_MarshalJSON(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + fields map[string]fieldSpec + want string + wantErr bool + }{ + { + name: "Valid fieldSpec item", + fields: map[string]fieldSpec{ + "field1": fieldSpecItem{ + Type: "keyword", + Dims: 3, + Indexed: false, + }, + }, + want: `{"field1":{"type":"keyword","dims":3,"index":false}}`, + wantErr: false, + }, + { + name: "Valid fieldSpec item with nested field", + fields: map[string]fieldSpec{ + "field1": fieldSpecItem{ + Type: fieldTypeKeyword, + Dims: 3, + Indexed: false, + }, + "field2": fieldSpecObject{ + fieldSpecs: map[string]fieldSpec{ + "field3": fieldSpecItem{ + Type: fieldTypeKeyword, + Dims: 4, + Indexed: true, + }, + }, + }, + }, + want: `{"field1":{"type":"keyword","dims":3,"index":false},"field2":{"properties":{"field3":{"type":"keyword","dims":4}}}}`, + wantErr: false, + }, + { + name: "Multi-level nested fieldSpec item with object field", + fields: map[string]fieldSpec{ + "field1": fieldSpecItem{ + Type: fieldTypeKeyword, + Dims: 3, + Indexed: false, + }, + "field2": fieldSpecObject{ + fieldSpecs: map[string]fieldSpec{ + "field3": fieldSpecItem{ + Type: fieldTypeKeyword, + Dims: 4, + Indexed: true, + }, + "field4": fieldSpecObject{ + fieldSpecs: map[string]fieldSpec{ + "field5": fieldSpecItem{ + Type: fieldTypeKeyword, + Dims: 3, + Indexed: false, + }, + }, + }, + }, + }, + }, + want: `{"field1":{"type":"keyword","dims":3,"index":false},"field2":{"properties":{"field3":{"type":"keyword","dims":4},"field4":{"properties":{"field5":{"type":"keyword","dims":3,"index":false}}}}}}`, + wantErr: false, + }, + { + name: "Valid fieldSpec history item with nested field", + fields: map[string]fieldSpec{ + "field1": fieldSpecItem{ + Type: fieldTypeKeyword, + Dims: 3, + Indexed: false, + }, + "history": fieldSpecObject{ + Type: fieldTypeObject, + fieldSpecs: map[string]fieldSpec{ + "history_type": fieldSpecItem{ + Type: fieldTypeKeyword, + Indexed: true, + }, + "timestamp": fieldSpecItem{ + Type: fieldTypeDateNano, + Indexed: true, + }, + "event": fieldSpecItem{ + Type: fieldTypeKeyword, + Indexed: false, + }, + }, + }, + }, + want: `{"field1":{"type":"keyword","dims":3,"index":false},"history":{"properties":{"event":{"type":"keyword","index":false},"history_type":{"type":"keyword"},"timestamp":{"type":"date_nanos"}}}}`, + wantErr: false, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + var got bytes.Buffer + err := json.NewEncoder(&got).Encode(tt.fields) + + if (err != nil) != tt.wantErr { + t.Errorf("fieldSpec json marshal error = %v, wantErr %v", err, tt.wantErr) + + return + } + + resultStr := strings.TrimSpace(got.String()) + + if resultStr != tt.want { + t.Errorf("fieldSpec json marshal error, result does not match = %v, want %v", resultStr, tt.want) + } + }) + } +} + +func Test_convertToEsType(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + tr db.TableRow + want fieldSpec + }{ + { + name: "Simple row item", + tr: db.TableRowItem{Name: "@timestamp", Type: db.DataTypeDateTime, Indexed: true}, + want: fieldSpecItem{Type: fieldTypeDateNano, Indexed: true}, + }, + { + name: "Simple nested row item", + tr: db.TableRowSubtable{ + Name: "history", + Type: db.DataTypeNested, + Subtable: []db.TableRow{ + db.TableRowItem{Name: "history_type", Type: db.DataTypeVarChar, Indexed: true}, + db.TableRowItem{Name: "timestamp", Type: db.DataTypeDateTime, Indexed: true}, + db.TableRowItem{Name: "event", Type: db.DataTypeVarChar, Indexed: true}, + }, + }, + want: fieldSpecObject{ + Type: fieldTypeObject, + IsNested: true, + fieldSpecs: map[string]fieldSpec{ + "history_type": fieldSpecItem{Type: fieldTypeKeyword, Indexed: true}, + "timestamp": fieldSpecItem{Type: fieldTypeDateNano, Indexed: true}, + "event": fieldSpecItem{Type: fieldTypeKeyword, Indexed: true}, + }, + }, + }, + { + name: "Multi-level nested row item", + tr: db.TableRowSubtable{ + Name: "history", + Type: db.DataTypeNested, + Subtable: []db.TableRow{ + db.TableRowItem{Name: "history_type", Type: db.DataTypeVarChar, Indexed: true}, + db.TableRowItem{Name: "timestamp", Type: db.DataTypeDateTime, Indexed: true}, + db.TableRowItem{Name: "event", Type: db.DataTypeVarChar, Indexed: true}, + db.TableRowSubtable{ + Name: "history", + Type: db.DataTypeNested, + Subtable: []db.TableRow{ + db.TableRowItem{Name: "history_type", Type: db.DataTypeVarChar, Indexed: true}, + db.TableRowItem{Name: "timestamp", Type: db.DataTypeDateTime, Indexed: true}, + db.TableRowItem{Name: "event", Type: db.DataTypeVarChar, Indexed: false}, + }, + }, + }, + }, + want: fieldSpecObject{ + Type: fieldTypeObject, + IsNested: true, + fieldSpecs: map[string]fieldSpec{ + "history_type": fieldSpecItem{Type: fieldTypeKeyword, Indexed: true}, + "timestamp": fieldSpecItem{Type: fieldTypeDateNano, Indexed: true}, + "event": fieldSpecItem{Type: fieldTypeKeyword, Indexed: true}, + "history": fieldSpecObject{ + Type: fieldTypeObject, + IsNested: true, + fieldSpecs: map[string]fieldSpec{ + "history_type": fieldSpecItem{Type: fieldTypeKeyword, Indexed: true}, + "timestamp": fieldSpecItem{Type: fieldTypeDateNano, Indexed: true}, + "event": fieldSpecItem{Type: fieldTypeKeyword, Indexed: false}, + }, + }, + }, + }, + }, + { + name: "Multi-level with mixed of nested and object row item", + tr: db.TableRowSubtable{ + Name: "history", + Type: db.DataTypeNested, + Subtable: []db.TableRow{ + db.TableRowItem{Name: "history_type", Type: db.DataTypeVarChar, Indexed: true}, + db.TableRowItem{Name: "timestamp", Type: db.DataTypeDateTime, Indexed: true}, + db.TableRowItem{Name: "event", Type: db.DataTypeVarChar, Indexed: true}, + db.TableRowSubtable{ + Name: "history", + Type: db.DataTypeObject, + Subtable: []db.TableRow{ + db.TableRowItem{Name: "history_type", Type: db.DataTypeVarChar, Indexed: true}, + db.TableRowItem{Name: "timestamp", Type: db.DataTypeDateTime, Indexed: true}, + db.TableRowItem{Name: "event", Type: db.DataTypeVarChar, Indexed: false}, + db.TableRowSubtable{ + Name: "history", + Type: db.DataTypeNested, + Subtable: []db.TableRow{ + db.TableRowItem{Name: "history_type", Type: db.DataTypeVarChar, Indexed: true}, + db.TableRowItem{Name: "timestamp", Type: db.DataTypeDateTime, Indexed: true}, + db.TableRowItem{Name: "event", Type: db.DataTypeVarChar, Indexed: false}, + }, + }, + }, + }, + }, + }, + want: fieldSpecObject{ + Type: fieldTypeObject, + IsNested: true, + fieldSpecs: map[string]fieldSpec{ + "history_type": fieldSpecItem{Type: fieldTypeKeyword, Indexed: true}, + "timestamp": fieldSpecItem{Type: fieldTypeDateNano, Indexed: true}, + "event": fieldSpecItem{Type: fieldTypeKeyword, Indexed: true}, + "history": fieldSpecObject{ + Type: fieldTypeObject, + IsNested: false, + fieldSpecs: map[string]fieldSpec{ + "history_type": fieldSpecItem{Type: fieldTypeKeyword, Indexed: true}, + "timestamp": fieldSpecItem{Type: fieldTypeDateNano, Indexed: true}, + "event": fieldSpecItem{Type: fieldTypeKeyword, Indexed: false}, + "history": fieldSpecObject{ + Type: fieldTypeObject, + IsNested: true, + fieldSpecs: map[string]fieldSpec{ + "history_type": fieldSpecItem{Type: fieldTypeKeyword, Indexed: true}, + "timestamp": fieldSpecItem{Type: fieldTypeDateNano, Indexed: true}, + "event": fieldSpecItem{Type: fieldTypeKeyword, Indexed: false}, + }, + }, + }, + }, + }, + }, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + if got := convertToEsType(nil, tt.tr); !reflect.DeepEqual(got, tt.want) { + t.Errorf("convertToEsType() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/db/es/select.go b/db/es/select.go index 795b9f87..29acbe6d 100644 --- a/db/es/select.go +++ b/db/es/select.go @@ -35,15 +35,15 @@ func createSearchQueryBuilder(idxName string, tableRows []db.TableRow) error { } for _, row := range tableRows { - switch row.Type { + switch row.GetType() { case db.DataTypeId: - queryBuilder.queryable[row.Name] = idCond() + queryBuilder.queryable[row.GetName()] = idCond() case db.DataTypeUUID: - queryBuilder.queryable[row.Name] = uuidCond() + queryBuilder.queryable[row.GetName()] = uuidCond() case db.DataTypeVarChar: - queryBuilder.queryable[row.Name] = stringCond(256, true) + queryBuilder.queryable[row.GetName()] = stringCond(256, true) case db.DataTypeDateTime: - queryBuilder.queryable[row.Name] = timeCond() + queryBuilder.queryable[row.GetName()] = timeCond() } } diff --git a/db/es/vector_test.go b/db/es/vector_test.go index 57c84b00..29c219c7 100644 --- a/db/es/vector_test.go +++ b/db/es/vector_test.go @@ -50,9 +50,9 @@ func vectorCleanup(t *testing.T, dbo db.Database) { func testVectorTableDefinition(dia db.DialectName) *db.TableDefinition { return &db.TableDefinition{ TableRows: []db.TableRow{ - {Name: "id", Type: db.DataTypeInt, Indexed: true}, - {Name: "embedding", Type: db.DataTypeVector3Float32, Indexed: true}, - {Name: "text", Type: db.DataTypeVarChar, Indexed: true}, + db.TableRowItem{Name: "id", Type: db.DataTypeInt, Indexed: true}, + db.TableRowItem{Name: "embedding", Type: db.DataTypeVector3Float32, Indexed: true}, + db.TableRowItem{Name: "text", Type: db.DataTypeVarChar, Indexed: true}, }, } } diff --git a/db/sql/maintenance.go b/db/sql/maintenance.go index 5cc80332..e92bbfa8 100644 --- a/db/sql/maintenance.go +++ b/db/sql/maintenance.go @@ -164,8 +164,8 @@ func constructSQLDDLQuery(d dialect, tableName string, tableDefinition *db.Table var query = fmt.Sprintf("CREATE TABLE %v (", d.table(tableName)) for i, row := range tableDefinition.TableRows { - query += fmt.Sprintf("%v %v", row.Name, d.getType(row.Type)) - if row.NotNull { + query += fmt.Sprintf("%v %v", row.GetName(), d.getType(row.GetType())) + if row.IsNotNull() { if d.name() != db.CASSANDRA { query += " NOT NULL" } diff --git a/db/sql/select.go b/db/sql/select.go index e83660b6..720d7434 100644 --- a/db/sql/select.go +++ b/db/sql/select.go @@ -52,20 +52,20 @@ func createSelectQueryBuilder(queryBuilders queryBuilderFactory, tableName strin // Add filter functions for each column based on its data type // These filter functions implement the query building logic for WHERE clauses for _, row := range tableRows { - switch row.Type { + switch row.GetType() { case db.DataTypeInt, db.DataTypeBigInt, db.DataTypeBigIntAutoIncPK, db.DataTypeBigIntAutoInc, db.DataTypeSmallInt, db.DataTypeTinyInt: - queryable[row.Name] = idCond() // Numeric ID conditions + queryable[row.GetName()] = idCond() // Numeric ID conditions case db.DataTypeUUID, db.DataTypeVarCharUUID: - queryable[row.Name] = uuidCond() // UUID conditions + queryable[row.GetName()] = uuidCond() // UUID conditions case db.DataTypeVarChar, db.DataTypeVarChar32, db.DataTypeVarChar36, db.DataTypeVarChar64, db.DataTypeVarChar128, db.DataTypeVarChar256, db.DataTypeText, db.DataTypeLongText: - queryable[row.Name] = stringCond(256, true) // String conditions with LIKE support + queryable[row.GetName()] = stringCond(256, true) // String conditions with LIKE support case db.DataTypeDateTime, db.DataTypeDateTime6, db.DataTypeTimestamp, db.DataTypeTimestamp6, db.DataTypeCurrentTimeStamp6: - queryable[row.Name] = timeCond() // Timestamp conditions + queryable[row.GetName()] = timeCond() // Timestamp conditions } } diff --git a/db/sql/sql_test.go b/db/sql/sql_test.go index f3da2884..18ba56b0 100644 --- a/db/sql/sql_test.go +++ b/db/sql/sql_test.go @@ -94,18 +94,18 @@ func testTableDefinition(dia db.DialectName) *db.TableDefinition { if dia == db.CASSANDRA { tableSpec = &db.TableDefinition{ TableRows: []db.TableRow{ - {Name: "origin", Type: db.DataTypeInt}, - {Name: "type", Type: db.DataTypeInt}, - {Name: "name", Type: db.DataTypeLongText}, + db.TableRowItem{Name: "origin", Type: db.DataTypeInt}, + db.TableRowItem{Name: "type", Type: db.DataTypeInt}, + db.TableRowItem{Name: "name", Type: db.DataTypeLongText}, }, PrimaryKey: []string{"origin", "type", "name"}, } } else { tableSpec = &db.TableDefinition{ TableRows: []db.TableRow{ - {Name: "origin", Type: db.DataTypeInt, NotNull: true}, - {Name: "type", Type: db.DataTypeInt, NotNull: true}, - {Name: "name", Type: db.DataTypeVarChar256, NotNull: false}, + db.TableRowItem{Name: "origin", Type: db.DataTypeInt, NotNull: true}, + db.TableRowItem{Name: "type", Type: db.DataTypeInt, NotNull: true}, + db.TableRowItem{Name: "name", Type: db.DataTypeVarChar256, NotNull: false}, }, PrimaryKey: []string{"origin", "type", "name"}, } diff --git a/db/sql/vector_test.go b/db/sql/vector_test.go index a607d5f2..e315cb03 100644 --- a/db/sql/vector_test.go +++ b/db/sql/vector_test.go @@ -110,8 +110,8 @@ func vectorCleanup(t *testing.T, dbo db.Database) { func testVectorTableDefinition(dia db.DialectName) *db.TableDefinition { return &db.TableDefinition{ TableRows: []db.TableRow{ - {Name: "id", Type: db.DataTypeBigIntAutoIncPK}, - {Name: "embedding", Type: db.DataTypeVector3Float32}, + db.TableRowItem{Name: "id", Type: db.DataTypeBigIntAutoIncPK}, + db.TableRowItem{Name: "embedding", Type: db.DataTypeVector3Float32}, }, } } From 2322de36b0442ea0b0db17ef9f797c95c383e1b3 Mon Sep 17 00:00:00 2001 From: Yee Chen Lim Date: Wed, 21 May 2025 10:01:59 +0000 Subject: [PATCH 2/2] Add elasticsearch test case for nested and parent-child --- acronis-db-bench/acronis-db-bench.go | 1 + acronis-db-bench/engine/db.go | 4 + acronis-db-bench/engine/suite.go | 2 +- acronis-db-bench/engine/tables.go | 1 + acronis-db-bench/engine/workers.go | 99 +- .../nested-objects-search/tables.go | 108 ++ .../test-groups/nested-objects-search/test.go | 170 ++++ .../nested-objects-search/workers_es.go | 944 ++++++++++++++++++ db/es/select.go | 4 +- 9 files changed, 1283 insertions(+), 50 deletions(-) create mode 100644 acronis-db-bench/test-groups/nested-objects-search/tables.go create mode 100644 acronis-db-bench/test-groups/nested-objects-search/test.go create mode 100644 acronis-db-bench/test-groups/nested-objects-search/workers_es.go diff --git a/acronis-db-bench/acronis-db-bench.go b/acronis-db-bench/acronis-db-bench.go index bbbc49a5..d69803e7 100644 --- a/acronis-db-bench/acronis-db-bench.go +++ b/acronis-db-bench/acronis-db-bench.go @@ -18,6 +18,7 @@ import ( _ "github.com/acronis/perfkit/acronis-db-bench/test-groups/json-search" // json-search _ "github.com/acronis/perfkit/acronis-db-bench/test-groups/large-objects-operations" // large-objects-operations _ "github.com/acronis/perfkit/acronis-db-bench/test-groups/logs-search" // logs-search + _ "github.com/acronis/perfkit/acronis-db-bench/test-groups/nested-objects-search" // nested-objects-search _ "github.com/acronis/perfkit/acronis-db-bench/test-groups/ping" // ping _ "github.com/acronis/perfkit/acronis-db-bench/test-groups/sample-dwh" // sample-dwh _ "github.com/acronis/perfkit/acronis-db-bench/test-groups/select-one" // select-one diff --git a/acronis-db-bench/engine/db.go b/acronis-db-bench/engine/db.go index f7fa8ac3..131f4f5d 100644 --- a/acronis-db-bench/engine/db.go +++ b/acronis-db-bench/engine/db.go @@ -153,6 +153,10 @@ type DBWorkerData struct { workingConn *DBConnector } +func (d *DBWorkerData) WorkingConn() *DBConnector { + return d.workingConn +} + func (d *DBWorkerData) release() { if d.workingConn != nil { d.workingConn.Release() diff --git a/acronis-db-bench/engine/suite.go b/acronis-db-bench/engine/suite.go index 79af717e..d9a4ad9d 100644 --- a/acronis-db-bench/engine/suite.go +++ b/acronis-db-bench/engine/suite.go @@ -79,7 +79,7 @@ func (s suiteStepTestExecute) Execute(b *benchmark.Benchmark, testOpts *TestOpts } // Get current dialect - var dialectName = getDBDriver(b) + var dialectName = GetDBDriver(b) // Skip if current dialect is not supported by this test dialectSupported := false diff --git a/acronis-db-bench/engine/tables.go b/acronis-db-bench/engine/tables.go index 30f683b1..f72999c9 100644 --- a/acronis-db-bench/engine/tables.go +++ b/acronis-db-bench/engine/tables.go @@ -24,6 +24,7 @@ type TestTable struct { CreateQuery string CreateQueryPatchFuncs []CreateQueryPatchFunc Indexes [][]string + DisableAutoCreation bool // runtime information RowsCount uint64 diff --git a/acronis-db-bench/engine/workers.go b/acronis-db-bench/engine/workers.go index 96d07bf4..a94fd826 100644 --- a/acronis-db-bench/engine/workers.go +++ b/acronis-db-bench/engine/workers.go @@ -60,54 +60,57 @@ func initGeneric(b *benchmark.Benchmark, testDesc *TestDesc, rowsRequired uint64 return } - var ddlConnDatabase *DBConnector - if ddlConnDatabase, err = NewDBConnector(&tenantCacheDBOpts, -1, true, b.Logger, 1); err != nil { - b.Exit("db: cannot create connection for DDL: %v", err) - return - } + var rowNum int64 + + if !testDesc.Table.DisableAutoCreation { + var ddlConnDatabase *DBConnector + if ddlConnDatabase, err = NewDBConnector(&b.TestOpts.(*TestOpts).DBOpts, -1, true, b.Logger, 1); err != nil { + b.Exit("db: cannot create connection for DDL: %v", err) + return + } - conn := ddlConnDatabase + conn := ddlConnDatabase - t := testRegistry.GetTableByName(tableName) + t := testRegistry.GetTableByName(tableName) - b.Logger.Debug("initializing table '%s'", tableName) - if testDesc.IsReadonly { - t.Create(conn, b) - b.Logger.Debug("readonly test, skipping table '%s' initialization", tableName) - if exists, err := conn.Database.TableExists(tableName); err != nil { - b.Exit(fmt.Sprintf("db: cannot check if table '%s' exists: %v", tableName, err)) - } else if !exists { - b.Exit("The '%s' table doesn't exist, please create tables using -I option, or use individual insert test using the -t `insert-***`", tableName) + b.Logger.Debug("initializing table '%s'", tableName) + if testDesc.IsReadonly { + t.Create(conn, b) + b.Logger.Debug("readonly test, skipping table '%s' initialization", tableName) + if exists, err := conn.Database.TableExists(tableName); err != nil { + b.Exit(fmt.Sprintf("db: cannot check if table '%s' exists: %v", tableName, err)) + } else if !exists { + b.Exit("The '%s' table doesn't exist, please create tables using -I option, or use individual insert test using the -t `insert-***`", tableName) + } + } else { + b.Logger.Debug("creating table '%s'", tableName) + t.Create(conn, b) } - } else { - b.Logger.Debug("creating table '%s'", tableName) - t.Create(conn, b) - } - var session = conn.Database.Session(conn.Database.Context(context.Background(), false)) - var rowNum int64 - if rows, err := session.Select(tableName, &db.SelectCtrl{Fields: []string{"COUNT(0)"}}); err != nil { - b.Exit(fmt.Sprintf("db: cannot get rows count in table '%s': %v", tableName, err)) - } else { - for rows.Next() { - if scanErr := rows.Scan(&rowNum); scanErr != nil { - b.Exit(fmt.Sprintf("db: cannot get rows count in table '%s': %v", tableName, scanErr)) + var session = conn.Database.Session(conn.Database.Context(context.Background(), false)) + if rows, err := session.Select(tableName, &db.SelectCtrl{Fields: []string{"COUNT(0)"}}); err != nil { + b.Exit(fmt.Sprintf("db: cannot get rows count in table '%s': %v", tableName, err)) + } else { + for rows.Next() { + if scanErr := rows.Scan(&rowNum); scanErr != nil { + b.Exit(fmt.Sprintf("db: cannot get rows count in table '%s': %v", tableName, scanErr)) + } } + rows.Close() } - rows.Close() - } - testDesc.Table.RowsCount = uint64(rowNum) - b.Logger.Debug("table '%s' has %d rows", tableName, testDesc.Table.RowsCount) + testDesc.Table.RowsCount = uint64(rowNum) + b.Logger.Debug("table '%s' has %d rows", tableName, testDesc.Table.RowsCount) - if rowsRequired > 0 { - if testDesc.Table.RowsCount < rowsRequired { - b.Exit(fmt.Sprintf("table '%s' has %d rows, but this test requires at least %d rows, please insert it first and then re-run the test", - testDesc.Table.TableName, testDesc.Table.RowsCount, rowsRequired)) + if rowsRequired > 0 { + if testDesc.Table.RowsCount < rowsRequired { + b.Exit(fmt.Sprintf("table '%s' has %d rows, but this test requires at least %d rows, please insert it first and then re-run the test", + testDesc.Table.TableName, testDesc.Table.RowsCount, rowsRequired)) + } } - } - ddlConnDatabase.Release() + ddlConnDatabase.Release() + } if b.TestOpts.(*TestOpts).BenchOpts.ParquetDataSource != "" { var offset int64 @@ -144,7 +147,7 @@ func initWorker(worker *benchmark.BenchmarkWorker) { worker.Logger.Trace("worker is initialized") } -func initCommon(b *benchmark.Benchmark, testDesc *TestDesc, rowsRequired uint64) { +func InitCommon(b *benchmark.Benchmark, testDesc *TestDesc, rowsRequired uint64) { b.Init = func() { initGeneric(b, testDesc, rowsRequired) } @@ -192,7 +195,7 @@ func initCommon(b *benchmark.Benchmark, testDesc *TestDesc, rowsRequired uint64) */ func TestGeneric(b *benchmark.Benchmark, testDesc *TestDesc, workerFunc TestWorkerFunc, rowsRequired uint64) { - initCommon(b, testDesc, rowsRequired) + InitCommon(b, testDesc, rowsRequired) b.WorkerRunFunc = func(worker *benchmark.BenchmarkWorker) (loops int) { c := worker.Data.(*DBWorkerData).workingConn @@ -216,7 +219,7 @@ func TestSelectRun( orderByFunc func(worker *benchmark.BenchmarkWorker) []string, rowsRequired uint64, ) { - initCommon(b, testDesc, rowsRequired) + InitCommon(b, testDesc, rowsRequired) testOpts, ok := b.TestOpts.(*TestOpts) if !ok { @@ -285,7 +288,7 @@ func TestSelectRawSQLQuery( orderByFunc func(worker *benchmark.BenchmarkWorker) string, rowsRequired uint64, ) { - initCommon(b, testDesc, rowsRequired) + InitCommon(b, testDesc, rowsRequired) batch := b.Vault.(*DBTestData).EffectiveBatch b.WorkerRunFunc = func(worker *benchmark.BenchmarkWorker) (loops int) { @@ -368,7 +371,7 @@ func TestSelectRawSQLQuery( * INSERT worker */ -func getDBDriver(b *benchmark.Benchmark) db.DialectName { +func GetDBDriver(b *benchmark.Benchmark) db.DialectName { var dialectName, err = db.GetDialectName(b.TestOpts.(*TestOpts).DBOpts.ConnString) if err != nil { b.Exit(err) @@ -378,13 +381,13 @@ func getDBDriver(b *benchmark.Benchmark) db.DialectName { } func TestInsertGeneric(b *benchmark.Benchmark, testDesc *TestDesc) { - colConfs := testDesc.Table.GetColumnsForInsert(db.WithAutoInc(getDBDriver(b))) + colConfs := testDesc.Table.GetColumnsForInsert(db.WithAutoInc(GetDBDriver(b))) if len(*colConfs) == 0 { b.Exit(fmt.Sprintf("internal error: no columns eligible for INSERT found in '%s' configuration", testDesc.Table.TableName)) } - initCommon(b, testDesc, 0) + InitCommon(b, testDesc, 0) batch := b.Vault.(*DBTestData).EffectiveBatch table := &testDesc.Table @@ -492,7 +495,7 @@ func InsertMultiValueDataWorker(b *benchmark.Benchmark, c *DBConnector, testDesc for i := 0; i < batch; i++ { var genColumns, vals, err = b.Randomizer.GenFakeData(colConfs, db.WithAutoInc(c.Database.DialectName())) if err != nil { - b.Exit(err) + b.Exit(err.Error()) } if genColumns == nil { @@ -525,14 +528,14 @@ func InsertMultiValueDataWorker(b *benchmark.Benchmark, c *DBConnector, testDesc func TestUpdateGeneric(b *benchmark.Benchmark, testDesc *TestDesc, updateRows uint64, colConfs *[]benchmark.DBFakeColumnConf) { if colConfs == nil { - colConfs = testDesc.Table.GetColumnsForUpdate(db.WithAutoInc(getDBDriver(b))) + colConfs = testDesc.Table.GetColumnsForUpdate(db.WithAutoInc(GetDBDriver(b))) } if len(*colConfs) == 0 { b.Exit(fmt.Sprintf("internal error: no columns eligible for UPDATE found in '%s' configuration", testDesc.Table.TableName)) } - initCommon(b, testDesc, updateRows) + InitCommon(b, testDesc, updateRows) batch := b.Vault.(*DBTestData).EffectiveBatch table := &testDesc.Table @@ -594,7 +597,7 @@ func TestUpdateGeneric(b *benchmark.Benchmark, testDesc *TestDesc, updateRows ui */ // testDeleteGeneric is a generic DELETE worker func testDeleteGeneric(b *benchmark.Benchmark, testDesc *TestDesc, deleteRows uint64) { //nolint:unused - initCommon(b, testDesc, deleteRows) + InitCommon(b, testDesc, deleteRows) batch := b.Vault.(*DBTestData).EffectiveBatch table := &testDesc.Table diff --git a/acronis-db-bench/test-groups/nested-objects-search/tables.go b/acronis-db-bench/test-groups/nested-objects-search/tables.go new file mode 100644 index 00000000..8adf6706 --- /dev/null +++ b/acronis-db-bench/test-groups/nested-objects-search/tables.go @@ -0,0 +1,108 @@ +package nested_objects_search + +import ( + "github.com/acronis/perfkit/db" + + "github.com/acronis/perfkit/acronis-db-bench/engine" +) + +// TestTableEmailNested is table to store email objects with nested fields +var TestTableEmailNested = engine.TestTable{ + TableName: "acronis_db_bench_email_nested", + Databases: []db.DialectName{db.ELASTICSEARCH, db.OPENSEARCH}, + Columns: [][]interface{}{ + {"id", "dataset.id"}, + {"uuid", "uuid"}, + {"euc_id", "int", 2147483647}, + {"progress", "int", 100}, + {"sender", "dataset.From"}, + {"recipient", "dataset.To"}, + {"subject", "dataset.Subject"}, + {"body", "dataset.Body"}, + //{"history.history_type", "string", 3, 32, 4}, + //{"history.timestamp", "time_ns", 90}, + //{"history.event", "string", 10, 32, 8}, + }, + InsertColumns: []string{}, // all + UpdateColumns: []string{"progress"}, + TableDefinition: func(_ db.DialectName) *db.TableDefinition { + return &db.TableDefinition{ + TableRows: []db.TableRow{ + db.TableRowItem{Name: "id", Type: db.DataTypeBigIntAutoInc}, + db.TableRowItem{Name: "uuid", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "euc_id", Type: db.DataTypeInt, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "progress", Type: db.DataTypeInt}, + db.TableRowItem{Name: "sender", Type: db.DataTypeVarChar, Indexed: true}, + db.TableRowItem{Name: "recipient", Type: db.DataTypeVarChar, Indexed: true}, + db.TableRowItem{Name: "subject", Type: db.DataTypeVarChar, Indexed: true}, + db.TableRowItem{Name: "body", Type: db.DataTypeText, Indexed: true}, + db.TableRowSubtable{ + Name: "history", + Type: db.DataTypeNested, + Subtable: []db.TableRow{ + db.TableRowItem{Name: "history_type", Type: db.DataTypeVarChar, Indexed: true}, + db.TableRowItem{Name: "timestamp", Type: db.DataTypeDateTime, Indexed: true}, + db.TableRowItem{Name: "event", Type: db.DataTypeVarChar, Indexed: true}, + }, + }, + }, + PrimaryKey: []string{"id"}, + } + }, + CreateQuery: `create table {table} ( + id {$bigint_autoinc_pk}, + uuid {$varchar_uuid} {$notnull}, + euc_id int {$notnull}, + progress int {$null} + ) {$engine};`, + Indexes: [][]string{{"tenant_id"}}, + DisableAutoCreation: true, +} + +// TestTableEmailParentChild is table to store email objects with parent child +var TestTableEmailParentChild = engine.TestTable{ + TableName: "acronis_db_bench_email_pc", + Databases: []db.DialectName{db.ELASTICSEARCH, db.OPENSEARCH}, + Columns: [][]interface{}{ + {"id", "dataset.id"}, + {"uuid", "uuid"}, + {"euc_id", "int", 2147483647}, + {"progress", "int", 100}, + {"sender", "dataset.From"}, + {"recipient", "dataset.To"}, + {"subject", "dataset.Subject"}, + {"body", "dataset.Body"}, + //{"history_type", "string", 3, 32, 4}, + //{"history_timestamp", "time_ns", 90}, + //{"history_event", "string", 10, 32, 8}, + }, + InsertColumns: []string{}, // all + UpdateColumns: []string{"progress"}, + TableDefinition: func(_ db.DialectName) *db.TableDefinition { + return &db.TableDefinition{ + TableRows: []db.TableRow{ + db.TableRowItem{Name: "id", Type: db.DataTypeBigIntAutoInc}, + db.TableRowItem{Name: "uuid", Type: db.DataTypeUUID, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "euc_id", Type: db.DataTypeInt, NotNull: true, Indexed: true}, + db.TableRowItem{Name: "progress", Type: db.DataTypeInt}, + db.TableRowItem{Name: "sender", Type: db.DataTypeVarChar, Indexed: true}, + db.TableRowItem{Name: "recipient", Type: db.DataTypeVarChar, Indexed: true}, + db.TableRowItem{Name: "subject", Type: db.DataTypeVarChar, Indexed: true}, + db.TableRowItem{Name: "body", Type: db.DataTypeText, Indexed: true}, + db.TableRowItem{Name: "doc_type", Type: db.DataTypeVarChar, Indexed: true}, + db.TableRowItem{Name: "history_type", Type: db.DataTypeVarChar, Indexed: true}, + db.TableRowItem{Name: "history_timestamp", Type: db.DataTypeDateTime, Indexed: true}, + db.TableRowItem{Name: "history_event", Type: db.DataTypeVarChar, Indexed: true}, + }, + PrimaryKey: []string{"id"}, + } + }, + CreateQuery: `create table {table} ( + id {$bigint_autoinc_pk}, + uuid {$varchar_uuid} {$notnull}, + euc_id int {$notnull}, + progress int {$null} + ) {$engine};`, + Indexes: [][]string{{"tenant_id"}}, + DisableAutoCreation: true, +} diff --git a/acronis-db-bench/test-groups/nested-objects-search/test.go b/acronis-db-bench/test-groups/nested-objects-search/test.go new file mode 100644 index 00000000..0d3e746a --- /dev/null +++ b/acronis-db-bench/test-groups/nested-objects-search/test.go @@ -0,0 +1,170 @@ +package nested_objects_search + +import ( + "github.com/acronis/perfkit/benchmark" + "github.com/acronis/perfkit/db" + + "github.com/acronis/perfkit/acronis-db-bench/engine" +) + +func init() { + var tg = engine.NewTestGroup("Nested objects search tests group") + + // Elasticsearch nested tests + tg.Add(&TestInsertEmailNested) + tg.Add(&TestAddHistoryToEmailNested) + tg.Add(&TestDeleteHistoryToEmailNested) + tg.Add(&TestSearchEmailNested) + tg.Add(&TestListEmailNested) + + // Elasticsearch parent-child tests + tg.Add(&TestInsertEmailParentChild) + tg.Add(&TestAddHistoryToEmailParentChild) + tg.Add(&TestDeleteHistoryToEmailParentChild) + tg.Add(&TestSearchEmailPC) + tg.Add(&TestListEmailPC) + + if err := engine.RegisterTestGroup(tg); err != nil { + panic(err) + } +} + +// TestInsertEmailNested inserts email data into the 'email_nested' table +var TestInsertEmailNested = engine.TestDesc{ + Name: "insert-email-nested", + Metric: "emails/sec", + Description: "insert an email data into the 'email_nested' table", + Category: engine.TestInsert, + IsReadonly: false, + Databases: []db.DialectName{db.ELASTICSEARCH, db.OPENSEARCH}, + Table: TestTableEmailNested, + LauncherFunc: func(b *benchmark.Benchmark, testDesc *engine.TestDesc) { + testInsertElasticsearch(b, testDesc) + }, +} + +// TestAddHistoryToEmailNested update email data with history into the 'email_nested' table +var TestAddHistoryToEmailNested = engine.TestDesc{ + Name: "add-email-history-nested", + Metric: "histories/sec", + Description: "add history of an email in the 'email_nested' table", + Category: engine.TestUpdate, + IsReadonly: false, + Databases: []db.DialectName{db.ELASTICSEARCH, db.OPENSEARCH}, + Table: TestTableEmailNested, + LauncherFunc: func(b *benchmark.Benchmark, testDesc *engine.TestDesc) { + testAddHistoryNestedElasticsearch(b, testDesc) + }, +} + +// TestDeleteHistoryToEmailNested delete history of email data into the 'email_nested' table +var TestDeleteHistoryToEmailNested = engine.TestDesc{ + Name: "delete-email-history-nested", + Metric: "histories/sec", + Description: "add history of an email in the 'email_nested' table", + Category: engine.TestDelete, + IsReadonly: false, + Databases: []db.DialectName{db.ELASTICSEARCH, db.OPENSEARCH}, + Table: TestTableEmailNested, + LauncherFunc: func(b *benchmark.Benchmark, testDesc *engine.TestDesc) { + testDeleteHistoryNestedES(b, testDesc) + }, +} + +// TestSearchEmailNested selects all emails with a specific history type on nested mapping +var TestSearchEmailNested = engine.TestDesc{ + Name: "search-email-nested", + Metric: "emails/sec", + Description: "select all emails with a specific history type", + Category: engine.TestSelect, + IsReadonly: true, + Databases: []db.DialectName{db.ELASTICSEARCH, db.OPENSEARCH}, + Table: TestTableEmailNested, + LauncherFunc: func(b *benchmark.Benchmark, testDesc *engine.TestDesc) { + testSearchNestedElasticsearch(b, testDesc) + }, +} + +// TestListEmailNested list all emails with their history list on nested mapping +var TestListEmailNested = engine.TestDesc{ + Name: "list-email-nested", + Metric: "emails/sec", + Description: "list all emails with its history details", + Category: engine.TestSelect, + IsReadonly: true, + Databases: []db.DialectName{db.ELASTICSEARCH, db.OPENSEARCH}, + Table: TestTableEmailNested, + LauncherFunc: func(b *benchmark.Benchmark, testDesc *engine.TestDesc) { + testListNestedElasticsearch(b, testDesc) + }, +} + +// TestInsertEmailParentChild inserts email data into the 'email_parent_child' table +var TestInsertEmailParentChild = engine.TestDesc{ + Name: "insert-email-parent-child", + Metric: "emails/sec", + Description: "insert an email data into the 'email_pc' table", + Category: engine.TestInsert, + IsReadonly: false, + Databases: []db.DialectName{db.ELASTICSEARCH, db.OPENSEARCH}, + Table: TestTableEmailParentChild, + LauncherFunc: func(b *benchmark.Benchmark, testDesc *engine.TestDesc) { + testInsertPCElasticsearch(b, testDesc) + }, +} + +// TestAddHistoryToEmailParentChild update email data with history into the 'email_pc' table +var TestAddHistoryToEmailParentChild = engine.TestDesc{ + Name: "add-email-history-parent-child", + Metric: "histories/sec", + Description: "add history of an email in the 'email_pc' table", + Category: engine.TestUpdate, + IsReadonly: false, + Databases: []db.DialectName{db.ELASTICSEARCH, db.OPENSEARCH}, + Table: TestTableEmailParentChild, + LauncherFunc: func(b *benchmark.Benchmark, testDesc *engine.TestDesc) { + testAddHistoryPCElasticsearch(b, testDesc) + }, +} + +// TestDeleteHistoryToEmailParentChild delete history of email data into the 'email_pc' table +var TestDeleteHistoryToEmailParentChild = engine.TestDesc{ + Name: "delete-email-history-parent-child", + Metric: "histories/sec", + Description: "add history of an email in the 'email_pc' table", + Category: engine.TestUpdate, + IsReadonly: false, + Databases: []db.DialectName{db.ELASTICSEARCH, db.OPENSEARCH}, + Table: TestTableEmailParentChild, + LauncherFunc: func(b *benchmark.Benchmark, testDesc *engine.TestDesc) { + testDeleteHistoryPCES(b, testDesc) + }, +} + +// TestSearchEmailPC selects all emails with a specific history type on parent-child mapping +var TestSearchEmailPC = engine.TestDesc{ + Name: "search-email-parent-child", + Metric: "emails/sec", + Description: "select all emails with a specific history type", + Category: engine.TestSelect, + IsReadonly: true, + Databases: []db.DialectName{db.ELASTICSEARCH, db.OPENSEARCH}, + Table: TestTableEmailParentChild, + LauncherFunc: func(b *benchmark.Benchmark, testDesc *engine.TestDesc) { + testSearchPCElasticsearch(b, testDesc) + }, +} + +// TestListEmailPC list all emails with their history list on parent-child mapping +var TestListEmailPC = engine.TestDesc{ + Name: "list-email-parent-child", + Metric: "emails/sec", + Description: "list all emails with its history details", + Category: engine.TestSelect, + IsReadonly: true, + Databases: []db.DialectName{db.ELASTICSEARCH, db.OPENSEARCH}, + Table: TestTableEmailParentChild, + LauncherFunc: func(b *benchmark.Benchmark, testDesc *engine.TestDesc) { + testListPCElasticsearch(b, testDesc) + }, +} diff --git a/acronis-db-bench/test-groups/nested-objects-search/workers_es.go b/acronis-db-bench/test-groups/nested-objects-search/workers_es.go new file mode 100644 index 00000000..ea04aeae --- /dev/null +++ b/acronis-db-bench/test-groups/nested-objects-search/workers_es.go @@ -0,0 +1,944 @@ +package nested_objects_search + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "strings" + "time" + + es8 "github.com/elastic/go-elasticsearch/v8" + + "github.com/acronis/perfkit/acronis-db-bench/engine" + "github.com/acronis/perfkit/benchmark" + "github.com/acronis/perfkit/db" +) + +var ( + esDataHistoryType = [...]string{"audit", "log", "error", "info", "warning", "debug", "trace", "security", "performance", "network"} + esDataEventType = [...]string{"modify", "delete", "create", "update", "read", "write"} +) + +const ( + maxHistoryPerItem = 10 +) + +func createESNestedTable(c *engine.DBConnector, b *benchmark.Benchmark, table *engine.TestTable) { + mapping := `{ + "mappings": { + "properties": { + "@timestamp": { + "type": "date" + }, + "euc_id": { + "type": "long" + }, + "history": { + "type": "nested", + "properties": { + "event": { + "type": "keyword" + }, + "history_type": { + "type": "keyword" + }, + "timestamp": { + "type": "date_nanos" + } + } + }, + "sender": { + "type": "keyword" + }, + "recipient": { + "type": "keyword" + }, + "subject": { + "type": "keyword" + }, + "body": { + "type": "text" + }, + "progress": { + "type": "long", + "index": false + }, + "uuid": { + "type": "keyword" + } + } + }, + "settings": { + "number_of_shards": 1, + "number_of_replicas": 0, + "codec": "best_compression", + "index": { + "refresh_interval": "1m", + "mapping": { + "ignore_malformed": true + } + } + } +}}` + + switch rawSession := c.Database.RawSession().(type) { + case *es8.Client: + existRes, err := rawSession.Indices.Exists([]string{table.TableName}) + if err != nil { + b.Exit(fmt.Sprintf("error checking if index exist: %v", err)) + } + + defer existRes.Body.Close() + + if existRes.IsError() { + if existRes.StatusCode != http.StatusNotFound { + b.Exit("failed to check if index exist: %s", existRes.String()) + } + } + + if existRes.StatusCode == http.StatusOK { + return + } + + res, err := rawSession.Indices.Create( + table.TableName, + rawSession.Indices.Create.WithBody(strings.NewReader(mapping)), + ) + if err != nil { + b.Exit(fmt.Sprintf("error creating index: %v", err)) + } + + defer res.Body.Close() + + if res.IsError() { + if res.StatusCode != http.StatusNotFound { + b.Exit("failed to create index: %s", res.String()) + } + } + + if res.StatusCode >= 300 { + b.Exit("failed to create index: %s", res.String()) + } + } +} + +func testInsertElasticsearch(b *benchmark.Benchmark, testDesc *engine.TestDesc) { + colConfs := testDesc.Table.GetColumnsForInsert(db.WithAutoInc(engine.GetDBDriver(b))) + + if len(*colConfs) == 0 { + b.Exit(fmt.Sprintf("internal error: no columns eligible for INSERT found in '%s' configuration", testDesc.Table.TableName)) + } + + engine.InitCommon(b, testDesc, 0) + + batch := b.Vault.(*engine.DBTestData).EffectiveBatch + table := &testDesc.Table + + b.WorkerRunFunc = func(worker *benchmark.BenchmarkWorker) (loops int) { + workerData := worker.Data.(*engine.DBWorkerData) + + var c = workerData.WorkingConn() + + createESNestedTable(c, b, table) + + // Prepare the buffer to store payload + // + var buf bytes.Buffer + + for i := range batch { + columns, values, err := worker.Randomizer.GenFakeData(colConfs, db.WithAutoInc(engine.GetDBDriver(b))) + if err != nil { + b.Exit(err.Error()) + } + + queryMap := make(map[string]interface{}) + for k, col := range columns { + fields := strings.Split(col, ".") + + if len(fields) == 1 { + queryMap[col] = values[k] + } else { + var m = queryMap + for j, f := range fields { + if j == len(fields)-1 { + m[f] = values[k] + } else { + if _, ok := m[f]; !ok { + m[f] = make(map[string]interface{}) + } + m = m[f].(map[string]interface{}) + } + } + } + } + + // Add nested fields + history := make([]interface{}, 0, maxHistoryPerItem) + for _ = range worker.Randomizer.Intn(maxHistoryPerItem) { + history = append(history, map[string]interface{}{ + "event": esDataEventType[worker.Randomizer.Intn(len(esDataEventType))], + "history_type": esDataHistoryType[worker.Randomizer.Intn(len(esDataHistoryType))], + "timestamp": time.Now().Format(time.RFC3339), + }) + } + + queryMap["history"] = history + + meta := []byte(fmt.Sprintf(`{ "index" : { "_id" : "%d" } }%s`, i, "\n")) + + data, err := json.Marshal(queryMap) + if err != nil { + b.Exit(fmt.Sprintf("error encoding document: %v", err)) + } + + // Append newline to the data payload + // + data = append(data, "\n"...) // <-- Comment out to trigger failure for batch + + // Append payloads to the buffer (ignoring write errors) + // + buf.Grow(len(meta) + len(data)) + buf.Write(meta) + buf.Write(data) + } + + switch rawSession := c.Database.RawSession().(type) { + case *es8.Client: + res, err := rawSession.Bulk( + bytes.NewReader(buf.Bytes()), + rawSession.Bulk.WithIndex(table.TableName), + rawSession.Bulk.WithRefresh(`true`), + ) + if err != nil { + b.Exit(fmt.Sprintf("error indexing document: %v", err)) + } + + defer res.Body.Close() + + if res.IsError() { + if res.StatusCode != http.StatusNotFound { + b.Exit("failed to perform indexing: %s", res.String()) + } + } + + if res.StatusCode >= 300 { + b.Exit("failed to perform indexing: %s", res.String()) + } + + testFlushIndex(b, table.TableName, rawSession) + } + + return batch + } + + b.Run() + + b.Vault.(*engine.DBTestData).Scores[testDesc.Category] = append(b.Vault.(*engine.DBTestData).Scores[testDesc.Category], b.Score) +} + +func testAddHistoryNestedElasticsearch(b *benchmark.Benchmark, testDesc *engine.TestDesc) { + engine.InitCommon(b, testDesc, 0) + + batch := b.Vault.(*engine.DBTestData).EffectiveBatch + table := &testDesc.Table + + b.WorkerRunFunc = func(worker *benchmark.BenchmarkWorker) (loops int) { + workerData := worker.Data.(*engine.DBWorkerData) + + var c = workerData.WorkingConn() + + updateScript := `{"script": {"source": "ctx._source.history.add(params.history)","lang": "painless","params": {"history": {` + + `"event": %q,` + + `"history_type": %q,` + + `"timestamp": %q` + + `}}}}` + + // Prepare the buffer to store payload + // + var buf bytes.Buffer + + for _ = range batch { + // Add nested fields + finalUpdateScript := fmt.Sprintf(updateScript, + esDataEventType[worker.Randomizer.Intn(len(esDataEventType))], + esDataHistoryType[worker.Randomizer.Intn(len(esDataHistoryType))], + time.Now().Format(time.RFC3339), + ) + + meta := []byte(fmt.Sprintf(`{ "update" : { "_id" : "%d" } }%s`, worker.Randomizer.Intn(batch), "\n")) + + data := []byte(finalUpdateScript) + + // Append newline to the data payload + // + data = append(data, "\n"...) // <-- Comment out to trigger failure for batch + + // Append payloads to the buffer (ignoring write errors) + // + buf.Grow(len(meta) + len(data)) + buf.Write(meta) + buf.Write(data) + } + + switch rawSession := c.Database.RawSession().(type) { + case *es8.Client: + res, err := rawSession.Bulk( + bytes.NewReader(buf.Bytes()), + rawSession.Bulk.WithIndex(table.TableName), + rawSession.Bulk.WithRefresh(`true`), + ) + if err != nil { + b.Exit(fmt.Sprintf("error updating document: %v", err)) + } + + defer res.Body.Close() + + if res.IsError() { + if res.StatusCode != http.StatusNotFound { + b.Exit("failed to perform bulk update: %s", res.String()) + } + } + + if res.StatusCode >= 300 { + b.Exit("failed to perform bulk update: %s", res.String()) + } + + testFlushIndex(b, table.TableName, rawSession) + } + + return batch + } + + b.Run() + + b.Vault.(*engine.DBTestData).Scores[testDesc.Category] = append(b.Vault.(*engine.DBTestData).Scores[testDesc.Category], b.Score) +} + +func testDeleteHistoryNestedES(b *benchmark.Benchmark, testDesc *engine.TestDesc) { + engine.InitCommon(b, testDesc, 0) + + batch := b.Vault.(*engine.DBTestData).EffectiveBatch + table := &testDesc.Table + + b.WorkerRunFunc = func(worker *benchmark.BenchmarkWorker) (loops int) { + workerData := worker.Data.(*engine.DBWorkerData) + + var c = workerData.WorkingConn() + + updateScript := `{ + "script": { + "source": "if (ctx._source.history != null) {ctx._source.history.removeIf(item -> item.history_type == params.history_type)}", + "lang": "painless", + "params": { + "history_type": %q + } + }, + "query": { + "match_all": {} + } +}` + + toDelete := esDataHistoryType[worker.Randomizer.Intn(len(esDataHistoryType))] + + finalUpdateScript := fmt.Sprintf(updateScript, toDelete) + + // Delete nested fields + switch rawSession := c.Database.RawSession().(type) { + case *es8.Client: + res, err := rawSession.UpdateByQuery( + []string{table.TableName}, + rawSession.UpdateByQuery.WithBody(strings.NewReader(finalUpdateScript)), + rawSession.UpdateByQuery.WithConflicts(`proceed`), + rawSession.UpdateByQuery.WithRefresh(true), + ) + if err != nil { + b.Exit(fmt.Sprintf("error updating document: %v", err)) + } + + defer res.Body.Close() + + if res.IsError() { + if res.StatusCode != http.StatusNotFound { + b.Exit("failed to perform indexing: %s", res.String()) + } + } + + if res.StatusCode >= 300 { + b.Exit("failed to perform indexing: %s", res.String()) + } + + testFlushIndex(b, table.TableName, rawSession) + } + + return batch + } + + b.Run() + + b.Vault.(*engine.DBTestData).Scores[testDesc.Category] = append(b.Vault.(*engine.DBTestData).Scores[testDesc.Category], b.Score) +} + +func testSearchNestedElasticsearch(b *benchmark.Benchmark, testDesc *engine.TestDesc) { + getQuery := func(worker *benchmark.BenchmarkWorker) string { + return `{ + "query": { + "nested": { + "path": "history", + "query": { + "bool": { + "must": [ + { "term": { "history.history_type": "` + esDataHistoryType[worker.Randomizer.Intn(len(esDataHistoryType))] + `" } } + ] + } + } + } + } +}` + } + + testSearchElasticsearch(b, testDesc, getQuery) +} + +func testListNestedElasticsearch(b *benchmark.Benchmark, testDesc *engine.TestDesc) { + getQuery := func(_ *benchmark.BenchmarkWorker) string { + return `{ + "query": { + "match_all": {} + } +}` + } + + testSearchElasticsearch(b, testDesc, getQuery) +} + +func createESPCTable(c *engine.DBConnector, b *benchmark.Benchmark, table *engine.TestTable) { + mapping := `{ + "mappings": { + "properties": { + "@timestamp": { + "type": "date" + }, + "historyJoin": { + "type": "join", + "relations": { + "document": "history" + } + }, + "euc_id": { + "type": "long" + }, + "sender": { + "type": "keyword" + }, + "recipient": { + "type": "keyword" + }, + "subject": { + "type": "keyword" + }, + "body": { + "type": "text" + }, + "doc_type": { + "type": "keyword" + }, + "history_event": { + "type": "keyword" + }, + "history_type": { + "type": "keyword" + }, + "history_timestamp": { + "type": "date_nanos" + }, + "progress": { + "type": "long", + "index": false + }, + "uuid": { + "type": "keyword" + } + } + }, + "settings": { + "number_of_shards": 1, + "number_of_replicas": 0, + "codec": "best_compression", + "index": { + "refresh_interval": "1m", + "mapping": { + "ignore_malformed": true + } + } + } +}` + + switch rawSession := c.Database.RawSession().(type) { + case *es8.Client: + existRes, err := rawSession.Indices.Exists([]string{table.TableName}) + if err != nil { + b.Exit(fmt.Sprintf("error checking if index exist: %v", err)) + } + + defer existRes.Body.Close() + + if existRes.IsError() { + if existRes.StatusCode != http.StatusNotFound { + b.Exit("failed to check if index exist: %s", existRes.String()) + } + } + + if existRes.StatusCode == http.StatusOK { + return + } + + res, err := rawSession.Indices.Create( + table.TableName, + rawSession.Indices.Create.WithBody(strings.NewReader(mapping)), + ) + if err != nil { + b.Exit(fmt.Sprintf("error creating index: %v", err)) + } + + defer res.Body.Close() + + if res.IsError() { + if res.StatusCode != http.StatusNotFound { + b.Exit("failed to create index: %s", res.String()) + } + } + + if res.StatusCode >= 300 { + b.Exit("failed to create index: %s", res.String()) + } + } +} + +func testInsertPCElasticsearch(b *benchmark.Benchmark, testDesc *engine.TestDesc) { + colConfs := testDesc.Table.GetColumnsForInsert(db.WithAutoInc(engine.GetDBDriver(b))) + + if len(*colConfs) == 0 { + b.Exit(fmt.Sprintf("internal error: no columns eligible for INSERT found in '%s' configuration", testDesc.Table.TableName)) + } + + engine.InitCommon(b, testDesc, 0) + + batch := b.Vault.(*engine.DBTestData).EffectiveBatch + table := &testDesc.Table + + b.WorkerRunFunc = func(worker *benchmark.BenchmarkWorker) (loops int) { + workerData := worker.Data.(*engine.DBWorkerData) + + var c = workerData.WorkingConn() + + createESPCTable(c, b, table) + + // Prepare the buffer to store payload + // + var buf bytes.Buffer + + for i := range batch { + columns, values, err := worker.Randomizer.GenFakeData(colConfs, db.WithAutoInc(engine.GetDBDriver(b))) + if err != nil { + b.Exit(err.Error()) + } + + queryMap := make(map[string]interface{}) + for k, col := range columns { + fields := strings.Split(col, ".") + + if len(fields) == 1 { + queryMap[col] = values[k] + } else { + var m = queryMap + for j, f := range fields { + if j == len(fields)-1 { + m[f] = values[k] + } else { + if _, ok := m[f]; !ok { + m[f] = make(map[string]interface{}) + } + m = m[f].(map[string]interface{}) + } + } + } + } + + // Add parent child join on parent + queryMap["historyJoin"] = "document" + queryMap["doc_type"] = "email" + + meta := []byte(fmt.Sprintf(`{ "index" : { "_id" : "%d" } }%s`, i, "\n")) + + data, err := json.Marshal(queryMap) + if err != nil { + b.Exit(fmt.Sprintf("error encoding document: %v", err)) + } + + // Append newline to the data payload + // + data = append(data, "\n"...) // <-- Comment out to trigger failure for batch + + // Append payloads to the buffer (ignoring write errors) + // + buf.Grow(len(meta) + len(data)) + buf.Write(meta) + buf.Write(data) + + // Add child documents + for _ = range worker.Randomizer.Intn(maxHistoryPerItem) { + history := map[string]interface{}{ + "doc_type": "history", + "history_event": esDataEventType[worker.Randomizer.Intn(len(esDataEventType))], + "history_type": esDataHistoryType[worker.Randomizer.Intn(len(esDataHistoryType))], + "history_timestamp": time.Now().Format(time.RFC3339), + "historyJoin": map[string]interface{}{ + "name": "history", + "parent": fmt.Sprintf("%d", i), + }, + } + + historyMeta := []byte(fmt.Sprintf(`{ "index" : { "_id" : "%d_%s", "routing": "%d" } }%s`, i, worker.Randomizer.UUID(), i, "\n")) + + historyData, err := json.Marshal(history) + if err != nil { + b.Exit(fmt.Sprintf("error encoding document: %v", err)) + } + + // Append newline to the data payload + // + historyData = append(historyData, "\n"...) // <-- Comment out to trigger failure for batch + + // Append payloads to the buffer (ignoring write errors) + // + buf.Grow(len(historyMeta) + len(historyData)) + buf.Write(historyMeta) + buf.Write(historyData) + } + } + + switch rawSession := c.Database.RawSession().(type) { + case *es8.Client: + res, err := rawSession.Bulk( + bytes.NewReader(buf.Bytes()), + rawSession.Bulk.WithIndex(table.TableName), + rawSession.Bulk.WithRefresh(`true`), + ) + if err != nil { + b.Exit(fmt.Sprintf("error indexing document: %v", err)) + } + + defer res.Body.Close() + + if res.IsError() { + if res.StatusCode != http.StatusNotFound { + b.Exit("failed to perform indexing: %s", res.String()) + } + } + + if res.StatusCode >= 300 { + b.Exit("failed to perform indexing: %s", res.String()) + } + + testFlushIndex(b, table.TableName, rawSession) + } + + return batch + } + + b.Run() + + b.Vault.(*engine.DBTestData).Scores[testDesc.Category] = append(b.Vault.(*engine.DBTestData).Scores[testDesc.Category], b.Score) +} +func testAddHistoryPCElasticsearch(b *benchmark.Benchmark, testDesc *engine.TestDesc) { + engine.InitCommon(b, testDesc, 0) + + batch := b.Vault.(*engine.DBTestData).EffectiveBatch + table := &testDesc.Table + + b.WorkerRunFunc = func(worker *benchmark.BenchmarkWorker) (loops int) { + workerData := worker.Data.(*engine.DBWorkerData) + + var c = workerData.WorkingConn() + + // Prepare the buffer to store payload + // + var buf bytes.Buffer + + for _ = range batch { + // Add child documents + itemID := fmt.Sprintf("%d", worker.Randomizer.Intn(batch)) + + for _ = range worker.Randomizer.Intn(maxHistoryPerItem) { + history := map[string]interface{}{ + "doc_type": "history", + "history_event": esDataEventType[worker.Randomizer.Intn(len(esDataEventType))], + "history_type": esDataHistoryType[worker.Randomizer.Intn(len(esDataHistoryType))], + "history_timestamp": time.Now().Format(time.RFC3339), + "historyJoin": map[string]interface{}{ + "name": "history", + "parent": itemID, + }, + } + + meta := []byte(fmt.Sprintf(`{ "index" : { "_id" : "%s_%s", "routing": "%s" } }%s`, itemID, worker.Randomizer.UUID(), itemID, "\n")) + + data, err := json.Marshal(history) + if err != nil { + b.Exit(fmt.Sprintf("error encoding document: %v", err)) + } + + // Append newline to the data payload + // + data = append(data, "\n"...) // <-- Comment out to trigger failure for batch + + // Append payloads to the buffer (ignoring write errors) + // + buf.Grow(len(meta) + len(data)) + buf.Write(meta) + buf.Write(data) + } + } + + switch rawSession := c.Database.RawSession().(type) { + case *es8.Client: + res, err := rawSession.Bulk( + bytes.NewReader(buf.Bytes()), + rawSession.Bulk.WithIndex(table.TableName), + rawSession.Bulk.WithRefresh(`true`), + ) + if err != nil { + b.Exit(fmt.Sprintf("error updating document: %v", err)) + } + + defer res.Body.Close() + + if res.IsError() { + if res.StatusCode != http.StatusNotFound { + b.Exit("failed to perform bulk update: %s", res.String()) + } + } + + if res.StatusCode >= 300 { + b.Exit("failed to perform bulk update: %s", res.String()) + } + + testFlushIndex(b, table.TableName, rawSession) + } + + return batch + } + + b.Run() + + b.Vault.(*engine.DBTestData).Scores[testDesc.Category] = append(b.Vault.(*engine.DBTestData).Scores[testDesc.Category], b.Score) +} + +func testDeleteHistoryPCES(b *benchmark.Benchmark, testDesc *engine.TestDesc) { + engine.InitCommon(b, testDesc, 0) + + batch := b.Vault.(*engine.DBTestData).EffectiveBatch + table := &testDesc.Table + + b.WorkerRunFunc = func(worker *benchmark.BenchmarkWorker) (loops int) { + workerData := worker.Data.(*engine.DBWorkerData) + + var c = workerData.WorkingConn() + + deleteQuery := `{ + "query": { + "bool": { + "must": [ + { + "term": { + "history_type": %q + } + }, + { + "has_parent": { + "parent_type": "document", + "query": { + "match_all": {} + } + } + } + ] + } + } +}` + + toDelete := esDataHistoryType[worker.Randomizer.Intn(len(esDataHistoryType))] + + finalUpdateScript := fmt.Sprintf(deleteQuery, toDelete) + + // Delete all matching children + switch rawSession := c.Database.RawSession().(type) { + case *es8.Client: + res, err := rawSession.DeleteByQuery( + []string{table.TableName}, + strings.NewReader(finalUpdateScript), + rawSession.DeleteByQuery.WithConflicts(`proceed`), + rawSession.DeleteByQuery.WithRefresh(true), + ) + if err != nil { + b.Exit(fmt.Sprintf("error deleting by query: %v", err)) + } + + defer res.Body.Close() + + if res.IsError() { + if res.StatusCode != http.StatusNotFound { + b.Exit("failed to delete by query: %s", res.String()) + } + } + + if res.StatusCode >= 300 { + b.Exit("failed to delete by query: %s", res.String()) + } + + // Flush index + testFlushIndex(b, table.TableName, rawSession) + } + + return batch + } + + b.Run() + + b.Vault.(*engine.DBTestData).Scores[testDesc.Category] = append(b.Vault.(*engine.DBTestData).Scores[testDesc.Category], b.Score) +} + +func testSearchPCElasticsearch(b *benchmark.Benchmark, testDesc *engine.TestDesc) { + getQuery := func(worker *benchmark.BenchmarkWorker) string { + return `{ + "query": { + "bool": { + "should": [ + { + "has_child": { + "type": "history", + "query": { + "bool": { + "must": [ + { + "term": { + "history_type": "` + esDataHistoryType[worker.Randomizer.Intn(len(esDataHistoryType))] + `" + } + } + ] + } + }, + "inner_hits": {} + } + } + ] + } + } +}` + } + + testSearchElasticsearch(b, testDesc, getQuery) +} + +func testListPCElasticsearch(b *benchmark.Benchmark, testDesc *engine.TestDesc) { + getQuery := func(_ *benchmark.BenchmarkWorker) string { + return `{ + "query": { + "bool": { + "should": [ + { + "has_child": { + "type": "history", + "query": { + "match_all": {} + }, + "inner_hits": {} + } + }, + { + "bool": { + "filter": { + "term": { + "historyJoin": "document" + } + }, + "must_not": { + "has_child": { + "type": "history", + "query": { + "match_all": {} + } + } + } + } + } + ] + } + } +}` + } + + testSearchElasticsearch(b, testDesc, getQuery) +} + +func testSearchElasticsearch(b *benchmark.Benchmark, testDesc *engine.TestDesc, getQuery func(worker *benchmark.BenchmarkWorker) string) { + engine.InitCommon(b, testDesc, 0) + + batch := b.Vault.(*engine.DBTestData).EffectiveBatch + table := &testDesc.Table + + b.WorkerRunFunc = func(worker *benchmark.BenchmarkWorker) (loops int) { + workerData := worker.Data.(*engine.DBWorkerData) + + var c = workerData.WorkingConn() + + switch rawSession := c.Database.RawSession().(type) { + case *es8.Client: + res, err := rawSession.Search( + rawSession.Search.WithIndex(table.TableName), + rawSession.Search.WithBody(strings.NewReader(getQuery(worker))), + rawSession.Search.WithSize(batch), + ) + if err != nil { + b.Exit(fmt.Sprintf("error searching document: %v", err)) + } + + defer res.Body.Close() + + if res.IsError() { + if res.StatusCode != http.StatusNotFound { + b.Exit("failed to perform search: %s", res.String()) + } + } + } + + return batch + } + + b.Run() + + b.Vault.(*engine.DBTestData).Scores[testDesc.Category] = append(b.Vault.(*engine.DBTestData).Scores[testDesc.Category], b.Score) +} + +func testFlushIndex(b *benchmark.Benchmark, indexName string, rawSession *es8.Client) { + // Flush index + res, err := rawSession.Indices.Flush( + rawSession.Indices.Flush.WithIndex(indexName), + ) + if err != nil { + b.Exit(fmt.Sprintf("error flushing indexes: %v", err)) + } + + defer res.Body.Close() + + if res.IsError() { + if res.StatusCode != http.StatusNotFound { + b.Exit("failed flushing indexes: %s", res.String()) + } + } + + if res.StatusCode >= 300 { + b.Exit("failed flushing indexes: %s", res.String()) + } +} diff --git a/db/es/select.go b/db/es/select.go index 29acbe6d..3e258218 100644 --- a/db/es/select.go +++ b/db/es/select.go @@ -900,7 +900,9 @@ func (g *esGateway) Select(idxName string, sc *db.SelectCtrl) (db.Rows, error) { var queryBuilder, ok = indexQueryBuilders[index] if !ok { - return nil, fmt.Errorf("index %s is not supported", index) + queryBuilder = searchQueryBuilder{ + queryable: make(map[string]filterFunction), + } } var query, qType, empty, err = queryBuilder.searchRequest(index, sc)