diff --git a/csharp/src/Drivers/Databricks/DatabricksStatement.cs b/csharp/src/Drivers/Databricks/DatabricksStatement.cs index 0047259180..3d5ddae10a 100644 --- a/csharp/src/Drivers/Databricks/DatabricksStatement.cs +++ b/csharp/src/Drivers/Databricks/DatabricksStatement.cs @@ -25,6 +25,7 @@ using Apache.Arrow.Adbc.Drivers.Apache.Hive2; using Apache.Arrow.Adbc.Drivers.Apache.Spark; using Apache.Arrow.Adbc.Drivers.Databricks.Result; +using Apache.Arrow.Adbc.Tracing; using Apache.Arrow.Types; using Apache.Hive.Service.Rpc.Thrift; using static Apache.Arrow.Adbc.Drivers.Databricks.Result.DescTableExtendedResult; @@ -88,22 +89,32 @@ public DatabricksStatement(DatabricksConnection connection) /// The Arrow schema protected override Schema GetSchemaFromMetadata(TGetResultSetMetadataResp metadata) { + // Log schema parsing decision + Activity.Current?.SetTag("statement.schema.has_arrow_schema", metadata.__isset.arrowSchema); + // For Protocol V5+, prefer Arrow schema if available if (metadata.__isset.arrowSchema) { Schema? arrowSchema = ((DatabricksSchemaParser)Connection.SchemaParser).ParseArrowSchema(metadata.ArrowSchema); if (arrowSchema != null) { + Activity.Current?.SetTag("statement.schema.source", "arrow"); + Activity.Current?.SetTag("statement.schema.column_count", arrowSchema.FieldsList.Count); return arrowSchema; } } // Fallback to traditional Thrift schema - return Connection.SchemaParser.GetArrowSchema(metadata.Schema, Connection.DataTypeConversion); + Activity.Current?.SetTag("statement.schema.source", "thrift"); + var thriftSchema = Connection.SchemaParser.GetArrowSchema(metadata.Schema, Connection.DataTypeConversion); + Activity.Current?.SetTag("statement.schema.column_count", thriftSchema.FieldsList.Count); + return thriftSchema; } protected override void SetStatementProperties(TExecuteStatementReq statement) { + Activity.Current?.AddEvent("statement.set_properties.start"); + base.SetStatementProperties(statement); // Set Databricks-specific statement properties @@ -130,6 +141,23 @@ protected override void SetStatementProperties(TExecuteStatementReq statement) statement.RunAsync = runAsyncInThrift; Connection.TrySetGetDirectResults(statement); + + // Log Databricks-specific properties + Activity.Current?.SetTag("statement.property.enforce_result_persistence_mode", statement.EnforceResultPersistenceMode); + Activity.Current?.SetTag("statement.property.can_read_arrow_result", statement.CanReadArrowResult); + Activity.Current?.SetTag("statement.property.timestamp_as_arrow", statement.UseArrowNativeTypes.TimestampAsArrow); + Activity.Current?.SetTag("statement.property.decimal_as_arrow", statement.UseArrowNativeTypes.DecimalAsArrow); + Activity.Current?.SetTag("statement.property.complex_types_as_arrow", statement.UseArrowNativeTypes.ComplexTypesAsArrow); + Activity.Current?.SetTag("statement.property.interval_types_as_arrow", statement.UseArrowNativeTypes.IntervalTypesAsArrow); + + // Log CloudFetch configuration + Activity.Current?.SetTag("statement.cloudfetch.enabled", useCloudFetch); + Activity.Current?.SetTag("statement.cloudfetch.can_decompress_lz4", canDecompressLz4); + Activity.Current?.SetTag("statement.cloudfetch.max_bytes_per_file", maxBytesPerFile); + Activity.Current?.SetTag("statement.cloudfetch.max_bytes_per_file_mb", maxBytesPerFile / 1024.0 / 1024.0); + Activity.Current?.SetTag("statement.property.run_async", runAsyncInThrift); + + Activity.Current?.AddEvent("statement.set_properties.complete"); } // Cast the Client to IAsync for CloudFetch compatibility @@ -313,24 +341,41 @@ private void HandleSparkCatalog() /// Query result containing catalog information protected override async Task GetCatalogsAsync(CancellationToken cancellationToken = default) { - // If EnableMultipleCatalogSupport is false, return a single catalog "SPARK" without making an RPC call - if (!enableMultipleCatalogSupport) + return await this.TraceActivityAsync(async activity => { - // Create a schema with a single column TABLE_CAT - var field = new Field("TABLE_CAT", StringType.Default, true); - var schema = new Schema(new[] { field }, null); + activity?.AddEvent("statement.get_catalogs.start"); + activity?.SetTag("statement.feature.enable_multiple_catalog_support", enableMultipleCatalogSupport); - // Create a single row with value "SPARK" - var builder = new StringArray.Builder(); - builder.Append("SPARK"); - var array = builder.Build(); + // If EnableMultipleCatalogSupport is false, return a single catalog "SPARK" without making an RPC call + if (!enableMultipleCatalogSupport) + { + activity?.AddEvent("statement.get_catalogs.returning_spark_catalog", [ + new("reason", "Multiple catalog support disabled") + ]); - // Return the result without making an RPC call - return new QueryResult(1, new HiveServer2Connection.HiveInfoArrowStream(schema, new[] { array })); - } + // Create a schema with a single column TABLE_CAT + var field = new Field("TABLE_CAT", StringType.Default, true); + var schema = new Schema(new[] { field }, null); + + // Create a single row with value "SPARK" + var builder = new StringArray.Builder(); + builder.Append("SPARK"); + var array = builder.Build(); + + activity?.SetTag(SemanticConventions.Db.Response.ReturnedRows, 1); + activity?.AddEvent("statement.get_catalogs.complete"); + + // Return the result without making an RPC call + return new QueryResult(1, new HiveServer2Connection.HiveInfoArrowStream(schema, new[] { array })); + } - // If EnableMultipleCatalogSupport is true, delegate to base class implementation - return await base.GetCatalogsAsync(cancellationToken); + // If EnableMultipleCatalogSupport is true, delegate to base class implementation + activity?.AddEvent("statement.get_catalogs.calling_base_implementation"); + QueryResult result = await base.GetCatalogsAsync(cancellationToken); + activity?.SetTag(SemanticConventions.Db.Response.ReturnedRows, result.RowCount); + activity?.AddEvent("statement.get_catalogs.complete"); + return result; + }, activityName: "GetCatalogs"); } /// @@ -344,30 +389,49 @@ protected override async Task GetCatalogsAsync(CancellationToken ca /// Query result containing schema information protected override async Task GetSchemasAsync(CancellationToken cancellationToken = default) { - // Handle SPARK catalog case - HandleSparkCatalog(); - - // If EnableMultipleCatalogSupport is false and catalog is not null or SPARK, return empty result without RPC call - if (!enableMultipleCatalogSupport && CatalogName != null) + return await this.TraceActivityAsync(async activity => { - // Create a schema with TABLE_SCHEM and TABLE_CATALOG columns - var fields = new[] + activity?.AddEvent("statement.get_schemas.start"); + activity?.SetTag("statement.catalog_name", CatalogName ?? "(none)"); + activity?.SetTag("statement.feature.enable_multiple_catalog_support", enableMultipleCatalogSupport); + + // Handle SPARK catalog case + HandleSparkCatalog(); + activity?.SetTag("statement.catalog_name_after_spark_handling", CatalogName ?? "(none)"); + + // If EnableMultipleCatalogSupport is false and catalog is not null or SPARK, return empty result without RPC call + if (!enableMultipleCatalogSupport && CatalogName != null) { - new Field("TABLE_SCHEM", StringType.Default, true), - new Field("TABLE_CATALOG", StringType.Default, true) - }; - var schema = new Schema(fields, null); + activity?.AddEvent("statement.get_schemas.returning_empty_result", [ + new("reason", "Multiple catalog support disabled and catalog is not null") + ]); - // Create empty arrays for both columns - var catalogArray = new StringArray.Builder().Build(); - var schemaArray = new StringArray.Builder().Build(); + // Create a schema with TABLE_SCHEM and TABLE_CATALOG columns + var fields = new[] + { + new Field("TABLE_SCHEM", StringType.Default, true), + new Field("TABLE_CATALOG", StringType.Default, true) + }; + var schema = new Schema(fields, null); - // Return empty result - return new QueryResult(0, new HiveServer2Connection.HiveInfoArrowStream(schema, new[] { catalogArray, schemaArray })); - } + // Create empty arrays for both columns + var catalogArray = new StringArray.Builder().Build(); + var schemaArray = new StringArray.Builder().Build(); - // Call the base implementation with the potentially modified catalog name - return await base.GetSchemasAsync(cancellationToken); + activity?.SetTag(SemanticConventions.Db.Response.ReturnedRows, 0); + activity?.AddEvent("statement.get_schemas.complete"); + + // Return empty result + return new QueryResult(0, new HiveServer2Connection.HiveInfoArrowStream(schema, new[] { catalogArray, schemaArray })); + } + + // Call the base implementation with the potentially modified catalog name + activity?.AddEvent("statement.get_schemas.calling_base_implementation"); + QueryResult result = await base.GetSchemasAsync(cancellationToken); + activity?.SetTag(SemanticConventions.Db.Response.ReturnedRows, result.RowCount); + activity?.AddEvent("statement.get_schemas.complete"); + return result; + }, activityName: "GetSchemas"); } /// @@ -381,49 +445,71 @@ protected override async Task GetSchemasAsync(CancellationToken can /// Query result containing table information protected override async Task GetTablesAsync(CancellationToken cancellationToken = default) { - // Handle SPARK catalog case - HandleSparkCatalog(); - - // If EnableMultipleCatalogSupport is false and catalog is not null or SPARK, return empty result without RPC call - if (!enableMultipleCatalogSupport && CatalogName != null) + return await this.TraceActivityAsync(async activity => { - // Correct schema for GetTables - var fields = new[] + activity?.AddEvent("statement.get_tables.start"); + activity?.SetTag("statement.catalog_name", CatalogName ?? "(none)"); + activity?.SetTag("statement.schema_name", SchemaName ?? "(none)"); + activity?.SetTag("statement.table_name", TableName ?? "(none)"); + activity?.SetTag("statement.table_types", TableTypes ?? "(none)"); + activity?.SetTag("statement.feature.enable_multiple_catalog_support", enableMultipleCatalogSupport); + + // Handle SPARK catalog case + HandleSparkCatalog(); + activity?.SetTag("statement.catalog_name_after_spark_handling", CatalogName ?? "(none)"); + + // If EnableMultipleCatalogSupport is false and catalog is not null or SPARK, return empty result without RPC call + if (!enableMultipleCatalogSupport && CatalogName != null) { - new Field("TABLE_CAT", StringType.Default, true), - new Field("TABLE_SCHEM", StringType.Default, true), - new Field("TABLE_NAME", StringType.Default, true), - new Field("TABLE_TYPE", StringType.Default, true), - new Field("REMARKS", StringType.Default, true), - new Field("TYPE_CAT", StringType.Default, true), - new Field("TYPE_SCHEM", StringType.Default, true), - new Field("TYPE_NAME", StringType.Default, true), - new Field("SELF_REFERENCING_COL_NAME", StringType.Default, true), - new Field("REF_GENERATION", StringType.Default, true) - }; - var schema = new Schema(fields, null); - - // Create empty arrays for all columns - var arrays = new IArrowArray[] - { - new StringArray.Builder().Build(), // TABLE_CAT - new StringArray.Builder().Build(), // TABLE_SCHEM - new StringArray.Builder().Build(), // TABLE_NAME - new StringArray.Builder().Build(), // TABLE_TYPE - new StringArray.Builder().Build(), // REMARKS - new StringArray.Builder().Build(), // TYPE_CAT - new StringArray.Builder().Build(), // TYPE_SCHEM - new StringArray.Builder().Build(), // TYPE_NAME - new StringArray.Builder().Build(), // SELF_REFERENCING_COL_NAME - new StringArray.Builder().Build() // REF_GENERATION - }; - - // Return empty result - return new QueryResult(0, new HiveServer2Connection.HiveInfoArrowStream(schema, arrays)); - } + activity?.AddEvent("statement.get_tables.returning_empty_result", [ + new("reason", "Multiple catalog support disabled and catalog is not null") + ]); - // Call the base implementation with the potentially modified catalog name - return await base.GetTablesAsync(cancellationToken); + // Correct schema for GetTables + var fields = new[] + { + new Field("TABLE_CAT", StringType.Default, true), + new Field("TABLE_SCHEM", StringType.Default, true), + new Field("TABLE_NAME", StringType.Default, true), + new Field("TABLE_TYPE", StringType.Default, true), + new Field("REMARKS", StringType.Default, true), + new Field("TYPE_CAT", StringType.Default, true), + new Field("TYPE_SCHEM", StringType.Default, true), + new Field("TYPE_NAME", StringType.Default, true), + new Field("SELF_REFERENCING_COL_NAME", StringType.Default, true), + new Field("REF_GENERATION", StringType.Default, true) + }; + var schema = new Schema(fields, null); + + // Create empty arrays for all columns + var arrays = new IArrowArray[] + { + new StringArray.Builder().Build(), // TABLE_CAT + new StringArray.Builder().Build(), // TABLE_SCHEM + new StringArray.Builder().Build(), // TABLE_NAME + new StringArray.Builder().Build(), // TABLE_TYPE + new StringArray.Builder().Build(), // REMARKS + new StringArray.Builder().Build(), // TYPE_CAT + new StringArray.Builder().Build(), // TYPE_SCHEM + new StringArray.Builder().Build(), // TYPE_NAME + new StringArray.Builder().Build(), // SELF_REFERENCING_COL_NAME + new StringArray.Builder().Build() // REF_GENERATION + }; + + activity?.SetTag(SemanticConventions.Db.Response.ReturnedRows, 0); + activity?.AddEvent("statement.get_tables.complete"); + + // Return empty result + return new QueryResult(0, new HiveServer2Connection.HiveInfoArrowStream(schema, arrays)); + } + + // Call the base implementation with the potentially modified catalog name + activity?.AddEvent("statement.get_tables.calling_base_implementation"); + QueryResult result = await base.GetTablesAsync(cancellationToken); + activity?.SetTag(SemanticConventions.Db.Response.ReturnedRows, result.RowCount); + activity?.AddEvent("statement.get_tables.complete"); + return result; + }, activityName: "GetTables"); } /// @@ -437,25 +523,46 @@ protected override async Task GetTablesAsync(CancellationToken canc /// Query result containing column information protected override async Task GetColumnsAsync(CancellationToken cancellationToken = default) { - // Handle SPARK catalog case - HandleSparkCatalog(); - - // If EnableMultipleCatalogSupport is false and catalog is not null, return empty result without RPC call - if (!enableMultipleCatalogSupport && CatalogName != null) + return await this.TraceActivityAsync(async activity => { - // Correct schema for GetColumns - var schema = CreateColumnMetadataSchema(); + activity?.AddEvent("statement.get_columns.start"); + activity?.SetTag("statement.catalog_name", CatalogName ?? "(none)"); + activity?.SetTag("statement.schema_name", SchemaName ?? "(none)"); + activity?.SetTag("statement.table_name", TableName ?? "(none)"); + activity?.SetTag("statement.column_name", ColumnName ?? "(none)"); + activity?.SetTag("statement.feature.enable_multiple_catalog_support", enableMultipleCatalogSupport); + + // Handle SPARK catalog case + HandleSparkCatalog(); + activity?.SetTag("statement.catalog_name_after_spark_handling", CatalogName ?? "(none)"); + + // If EnableMultipleCatalogSupport is false and catalog is not null, return empty result without RPC call + if (!enableMultipleCatalogSupport && CatalogName != null) + { + activity?.AddEvent("statement.get_columns.returning_empty_result", [ + new("reason", "Multiple catalog support disabled and catalog is not null") + ]); + // Correct schema for GetColumns + var schema = CreateColumnMetadataSchema(); - // Create empty arrays for all columns - var arrays = CreateColumnMetadataEmptyArray(); + // Create empty arrays for all columns + var arrays = CreateColumnMetadataEmptyArray(); - // Return empty result - return new QueryResult(0, new HiveServer2Connection.HiveInfoArrowStream(schema, arrays)); - } + activity?.SetTag(SemanticConventions.Db.Response.ReturnedRows, 0); + activity?.AddEvent("statement.get_columns.complete"); - // Call the base implementation with the potentially modified catalog name - return await base.GetColumnsAsync(cancellationToken); + // Return empty result + return new QueryResult(0, new HiveServer2Connection.HiveInfoArrowStream(schema, arrays)); + } + + // Call the base implementation with the potentially modified catalog name + activity?.AddEvent("statement.get_columns.calling_base_implementation"); + QueryResult result = await base.GetColumnsAsync(cancellationToken); + activity?.SetTag(SemanticConventions.Db.Response.ReturnedRows, result.RowCount); + activity?.AddEvent("statement.get_columns.complete"); + return result; + }, activityName: "GetColumns"); } /// @@ -497,10 +604,31 @@ internal bool ShouldReturnEmptyPkFkResult() protected override async Task GetPrimaryKeysAsync(CancellationToken cancellationToken = default) { - if (ShouldReturnEmptyPkFkResult()) - return EmptyPrimaryKeysResult(); + return await this.TraceActivityAsync(async activity => + { + activity?.AddEvent("statement.get_primary_keys.start"); + activity?.SetTag("statement.catalog_name", CatalogName ?? "(none)"); + activity?.SetTag("statement.schema_name", SchemaName ?? "(none)"); + activity?.SetTag("statement.table_name", TableName ?? "(none)"); + activity?.SetTag("statement.feature.pk_fk_enabled", enablePKFK); - return await base.GetPrimaryKeysAsync(cancellationToken); + if (ShouldReturnEmptyPkFkResult()) + { + activity?.AddEvent("statement.get_primary_keys.returning_empty_result", [ + new("reason", !enablePKFK ? "PK/FK feature disabled" : "Invalid catalog for PK/FK"), + new("catalog_name", CatalogName ?? "(none)") + ]); + activity?.SetTag(SemanticConventions.Db.Response.ReturnedRows, 0); + activity?.AddEvent("statement.get_primary_keys.complete"); + return EmptyPrimaryKeysResult(); + } + + activity?.AddEvent("statement.get_primary_keys.calling_base_implementation"); + QueryResult result = await base.GetPrimaryKeysAsync(cancellationToken); + activity?.SetTag(SemanticConventions.Db.Response.ReturnedRows, result.RowCount); + activity?.AddEvent("statement.get_primary_keys.complete"); + return result; + }, activityName: "GetPrimaryKeys"); } private QueryResult EmptyPrimaryKeysResult() @@ -531,18 +659,60 @@ private QueryResult EmptyPrimaryKeysResult() protected override async Task GetCrossReferenceAsync(CancellationToken cancellationToken = default) { - if (ShouldReturnEmptyPkFkResult()) - return EmptyCrossReferenceResult(); + return await this.TraceActivityAsync(async activity => + { + activity?.AddEvent("statement.get_cross_reference.start"); + activity?.SetTag("statement.catalog_name", CatalogName ?? "(none)"); + activity?.SetTag("statement.schema_name", SchemaName ?? "(none)"); + activity?.SetTag("statement.table_name", TableName ?? "(none)"); + activity?.SetTag("statement.foreign_catalog_name", ForeignCatalogName ?? "(none)"); + activity?.SetTag("statement.foreign_schema_name", ForeignSchemaName ?? "(none)"); + activity?.SetTag("statement.foreign_table_name", ForeignTableName ?? "(none)"); + activity?.SetTag("statement.feature.pk_fk_enabled", enablePKFK); + + if (ShouldReturnEmptyPkFkResult()) + { + activity?.AddEvent("statement.get_cross_reference.returning_empty_result", [ + new("reason", !enablePKFK ? "PK/FK feature disabled" : "Invalid catalog for PK/FK") + ]); + activity?.SetTag(SemanticConventions.Db.Response.ReturnedRows, 0); + activity?.AddEvent("statement.get_cross_reference.complete"); + return EmptyCrossReferenceResult(); + } - return await base.GetCrossReferenceAsync(cancellationToken); + activity?.AddEvent("statement.get_cross_reference.calling_base_implementation"); + QueryResult result = await base.GetCrossReferenceAsync(cancellationToken); + activity?.SetTag(SemanticConventions.Db.Response.ReturnedRows, result.RowCount); + activity?.AddEvent("statement.get_cross_reference.complete"); + return result; + }, activityName: "GetCrossReference"); } protected override async Task GetCrossReferenceAsForeignTableAsync(CancellationToken cancellationToken = default) { - if (ShouldReturnEmptyPkFkResult()) - return EmptyCrossReferenceResult(); + return await this.TraceActivityAsync(async activity => + { + activity?.AddEvent("statement.get_cross_reference_as_foreign_table.start"); + activity?.SetTag("statement.catalog_name", CatalogName ?? "(none)"); + activity?.SetTag("statement.foreign_catalog_name", ForeignCatalogName ?? "(none)"); + activity?.SetTag("statement.feature.pk_fk_enabled", enablePKFK); - return await base.GetCrossReferenceAsForeignTableAsync(cancellationToken); + if (ShouldReturnEmptyPkFkResult()) + { + activity?.AddEvent("statement.get_cross_reference_as_foreign_table.returning_empty_result", [ + new("reason", !enablePKFK ? "PK/FK feature disabled" : "Invalid catalog for PK/FK") + ]); + activity?.SetTag(SemanticConventions.Db.Response.ReturnedRows, 0); + activity?.AddEvent("statement.get_cross_reference_as_foreign_table.complete"); + return EmptyCrossReferenceResult(); + } + + activity?.AddEvent("statement.get_cross_reference_as_foreign_table.calling_base_implementation"); + QueryResult result = await base.GetCrossReferenceAsForeignTableAsync(cancellationToken); + activity?.SetTag(SemanticConventions.Db.Response.ReturnedRows, result.RowCount); + activity?.AddEvent("statement.get_cross_reference_as_foreign_table.complete"); + return result; + }, activityName: "GetCrossReferenceAsForeignTable"); } private QueryResult EmptyCrossReferenceResult() @@ -589,61 +759,108 @@ private QueryResult EmptyCrossReferenceResult() protected override async Task GetColumnsExtendedAsync(CancellationToken cancellationToken = default) { - string? fullTableName = BuildTableName(); - var canUseDescTableExtended = ((DatabricksConnection)Connection).CanUseDescTableExtended; - - if (!canUseDescTableExtended || string.IsNullOrEmpty(fullTableName)) + return await this.TraceActivityAsync(async activity => { - // When fullTableName is empty, we cannot use metadata SQL query to get the info, - // so fallback to base class implementation - return await base.GetColumnsExtendedAsync(cancellationToken); - } + activity?.AddEvent("statement.get_columns_extended.start"); + string? fullTableName = BuildTableName(); + var canUseDescTableExtended = ((DatabricksConnection)Connection).CanUseDescTableExtended; - string query = $"DESC TABLE EXTENDED {fullTableName} AS JSON"; - using var descStmt = Connection.CreateStatement(); - descStmt.SqlQuery = query; - QueryResult descResult; + activity?.SetTag("statement.catalog_name", CatalogName ?? "(none)"); + activity?.SetTag("statement.schema_name", SchemaName ?? "(none)"); + activity?.SetTag("statement.table_name", TableName ?? "(none)"); + activity?.SetTag("statement.desc_table_extended.full_table_name", fullTableName ?? "(none)"); + activity?.SetTag("statement.desc_table_extended.can_use", canUseDescTableExtended); - try - { - descResult = await descStmt.ExecuteQueryAsync(); - } - catch (HiveServer2Exception ex) when (ex.SqlState == "42601" || ex.SqlState == "20000") - { - // 42601 is error code of syntax error, which this command (DESC TABLE EXTENDED ... AS JSON) is not supported by current DBR - // Sometimes server may also return 20000 (internal error) if it fails to convert some data types of the table columns - // So we should fallback to base implementation - Debug.WriteLine($"[WARN] Failed to run {query} (reason={ex.Message}). Fallback to base::GetColumnsExtendedAsync."); - return await base.GetColumnsExtendedAsync(cancellationToken); - } + if (!canUseDescTableExtended || string.IsNullOrEmpty(fullTableName)) + { + activity?.AddEvent("statement.get_columns_extended.fallback_to_base", [ + new("reason", !canUseDescTableExtended ? "DESC TABLE EXTENDED not available" : "Full table name is empty") + ]); + // When fullTableName is empty, we cannot use metadata SQL query to get the info, + // so fallback to base class implementation + QueryResult baseResult = await base.GetColumnsExtendedAsync(cancellationToken); + activity?.SetTag(SemanticConventions.Db.Response.ReturnedRows, baseResult.RowCount); + activity?.AddEvent("statement.get_columns_extended.complete"); + return baseResult; + } - var columnMetadataSchema = CreateColumnMetadataSchema(); + string query = $"DESC TABLE EXTENDED {fullTableName} AS JSON"; + activity?.AddEvent("statement.desc_table_extended.executing_query", [ + new("query_summary", query.Length > 100 ? query.Substring(0, 100) + "..." : query) + ]); - if (descResult.Stream == null) - { - return CreateEmptyExtendedColumnsResult(columnMetadataSchema); - } + using var descStmt = Connection.CreateStatement(); + descStmt.SqlQuery = query; + QueryResult descResult; - // Read the json result - var resultJson = ""; - using (var stream = descResult.Stream) - { - var batch = await stream.ReadNextRecordBatchAsync(cancellationToken); - if (batch == null || batch.Length == 0) + try + { + descResult = await descStmt.ExecuteQueryAsync(); + } + catch (HiveServer2Exception ex) when (ex.SqlState == "42601" || ex.SqlState == "20000") { + // 42601 is error code of syntax error, which this command (DESC TABLE EXTENDED ... AS JSON) is not supported by current DBR + // Sometimes server may also return 20000 (internal error) if it fails to convert some data types of the table columns + // So we should fallback to base implementation + activity?.AddEvent("statement.desc_table_extended.query_failed_fallback_to_base", [ + new("sql_state", ex.SqlState), + new("error_message", ex.Message) + ]); + Debug.WriteLine($"[WARN] Failed to run {query} (reason={ex.Message}). Fallback to base::GetColumnsExtendedAsync."); + QueryResult baseResult = await base.GetColumnsExtendedAsync(cancellationToken); + activity?.SetTag(SemanticConventions.Db.Response.ReturnedRows, baseResult.RowCount); + activity?.AddEvent("statement.get_columns_extended.complete"); + return baseResult; + } + + var columnMetadataSchema = CreateColumnMetadataSchema(); + + if (descResult.Stream == null) + { + activity?.AddEvent("statement.desc_table_extended.empty_stream"); + activity?.SetTag(SemanticConventions.Db.Response.ReturnedRows, 0); + activity?.AddEvent("statement.get_columns_extended.complete"); return CreateEmptyExtendedColumnsResult(columnMetadataSchema); } - resultJson = ((StringArray)batch.Column(0)).GetString(0); - } + // Read the json result + activity?.AddEvent("statement.desc_table_extended.parsing_result"); + var resultJson = ""; + using (var stream = descResult.Stream) + { + var batch = await stream.ReadNextRecordBatchAsync(cancellationToken); + if (batch == null || batch.Length == 0) + { + activity?.AddEvent("statement.desc_table_extended.empty_batch"); + activity?.SetTag(SemanticConventions.Db.Response.ReturnedRows, 0); + activity?.AddEvent("statement.get_columns_extended.complete"); + return CreateEmptyExtendedColumnsResult(columnMetadataSchema); + } - // Parse the JSON result - var result = JsonSerializer.Deserialize(resultJson); - if (result == null) - { - throw new FormatException($"Invalid json result of {query}.Result={resultJson}"); - } - return CreateExtendedColumnsResult(columnMetadataSchema,result); + resultJson = ((StringArray)batch.Column(0)).GetString(0); + activity?.SetTag("statement.desc_table_extended.result_json_length", resultJson?.Length ?? 0); + } + + // Parse the JSON result + if (string.IsNullOrEmpty(resultJson)) + { + throw new FormatException($"Invalid json result of {query}: result is null or empty"); + } + var result = JsonSerializer.Deserialize(resultJson!); + if (result == null) + { + throw new FormatException($"Invalid json result of {query}.Result={resultJson}"); + } + + activity?.SetTag("statement.desc_table_extended.column_count", result.Columns?.Count ?? 0); + activity?.SetTag("statement.desc_table_extended.pk_count", result.PrimaryKeys?.Count ?? 0); + activity?.SetTag("statement.desc_table_extended.fk_count", result.ForeignKeys?.Count ?? 0); + + QueryResult finalResult = CreateExtendedColumnsResult(columnMetadataSchema, result); + activity?.SetTag(SemanticConventions.Db.Response.ReturnedRows, finalResult.RowCount); + activity?.AddEvent("statement.get_columns_extended.complete"); + return finalResult; + }, activityName: "GetColumnsExtended"); } public override string AssemblyName => DatabricksConnection.s_assemblyName;