From 3bd14c14cf89cbc4b48715a5da46b5ef12c9d818 Mon Sep 17 00:00:00 2001 From: Sebastian Balz Date: Sat, 5 Oct 2024 08:14:06 +0000 Subject: [PATCH 01/10] remove legacy forms in ConfigEditor --- src/ConfigEditor.tsx | 187 ++++++++++++++++++++++++------------------- 1 file changed, 103 insertions(+), 84 deletions(-) diff --git a/src/ConfigEditor.tsx b/src/ConfigEditor.tsx index 73635b9..69206af 100644 --- a/src/ConfigEditor.tsx +++ b/src/ConfigEditor.tsx @@ -1,13 +1,13 @@ import React, { ChangeEvent, PureComponent } from 'react'; -import { LegacyForms } from '@grafana/ui'; +import { InlineField, InlineSwitch, SecretInput, Input, FieldSet } from '@grafana/ui'; import { DataSourcePluginOptionsEditorProps } from '@grafana/data'; import { SnowflakeOptions, SnowflakeSecureOptions } from './types'; -const { SecretFormField, FormField, Switch } = LegacyForms; -interface Props extends DataSourcePluginOptionsEditorProps {} -interface State {} +interface Props extends DataSourcePluginOptionsEditorProps { } + +interface State { } export class ConfigEditor extends PureComponent { onAccountChange = (event: ChangeEvent) => { @@ -151,127 +151,146 @@ export class ConfigEditor extends PureComponent { const { options } = this.props; const { jsonData, secureJsonFields } = options; const secureJsonData = (options.secureJsonData || {}) as SnowflakeSecureOptions; - return ( -
+

Connection

- -
- + -
- -
- + + -
+ + + + -
- -
-
- {!jsonData.basicAuth && ( - + - )} - {jsonData.basicAuth && ( - + )} + {jsonData.basicAuth && ( + + - )} -
-
- + )} + + -
+ +

Parameter configuration

- -
- + -
- -
- + + -
- -
- + + -
+

Session configuration

- -
- + -
-
- ); + + + + ) + } } From aed528207378b1d84374ad6d34c51347de3bb3db Mon Sep 17 00:00:00 2001 From: Sebastian Balz Date: Sun, 6 Oct 2024 09:35:55 +0000 Subject: [PATCH 02/10] Connection Pooling & Threads --- README.md | 3 ++ pkg/check_health.go | 15 ++---- pkg/query.go | 116 ++++++++++++++++++++++++++++++++---------- pkg/snowflake.go | 116 +++++++++++++++++++++++++++++++++++------- pkg/snowflake_test.go | 7 +-- src/ConfigEditor.tsx | 81 +++++++++++++++++++++++++++-- src/types.ts | 3 ++ 7 files changed, 279 insertions(+), 62 deletions(-) diff --git a/README.md b/README.md index e07ba38..05dcd89 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,9 @@ Available configuration fields are as follows: Database (Optional) | Specifies the default database to use once connected. Schema (Optional) | Specifies the default schema to use for the specified database once connected. Extra Options (Optional) | Specifies a series of one or more parameters, in the form of `=`, with each parameter separated by the ampersand character (&), and no spaces anywhere in the connection string. + max. open Connections | How many connections to snowflake are opened at a time. If the limit of open connections is exceeded newer queries will be cached in the queue. [default: 100] + max. queued Queries | Queue size of the internal query queue. If this limit is exceeded the query will be dropped and and error is thrown. Should always be higher as `max. open Connections`. 0 to disable. [default: 400] + Connection lifetime | Time in minutes until unnused connections are recycled. [default: 60min] #### Supported Macros diff --git a/pkg/check_health.go b/pkg/check_health.go index 83170e7..82058cf 100644 --- a/pkg/check_health.go +++ b/pkg/check_health.go @@ -2,7 +2,6 @@ package main import ( "context" - "database/sql" "fmt" "github.com/grafana/grafana-plugin-sdk-go/backend" @@ -15,18 +14,12 @@ import ( // a datasource is working as expected. func (td *SnowflakeDatasource) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { - connectionString, result := createAndValidationConnectionString(req) - if result != nil { - return result, nil - } - db, err := sql.Open("snowflake", connectionString) + i, err := td.im.Get(ctx, req.PluginContext) if err != nil { - return &backend.CheckHealthResult{ - Status: backend.HealthStatusError, - Message: fmt.Sprintf("Connection issue : %s", err), - }, nil + return nil, err } - defer db.Close() + instance := i.(*instanceSettings) + db := instance.db row, err := db.QueryContext(ctx, "SELECT 1") if err != nil { diff --git a/pkg/query.go b/pkg/query.go index c2e5676..2068376 100644 --- a/pkg/query.go +++ b/pkg/query.go @@ -4,11 +4,15 @@ import ( "context" "database/sql" "encoding/json" + "errors" "fmt" "math/big" "reflect" + "runtime/debug" "strconv" "strings" + "sync" + "sync/atomic" "time" "github.com/grafana/grafana-plugin-sdk-go/backend" @@ -25,6 +29,20 @@ func (qc *queryConfigStruct) isTimeSeriesType() bool { return qc.QueryType == timeSeriesType } +type queryCounter int32 + +func (c *queryCounter) inc() int32 { + return atomic.AddInt32((*int32)(c), 1) +} + +func (c *queryCounter) dec() int32 { + return atomic.AddInt32((*int32)(c), -1) +} + +func (c *queryCounter) get() int32 { + return atomic.LoadInt32((*int32)(c)) +} + type queryConfigStruct struct { FinalQuery string QueryType string @@ -35,6 +53,9 @@ type queryConfigStruct struct { MaxDataPoints int64 FillMode string FillValue float64 + db *sql.DB + config *pluginConfig + actQueryCount *queryCounter } // type @@ -58,22 +79,26 @@ type queryModel struct { FillMode string `json:"fillMode"` } -func (qc *queryConfigStruct) fetchData(ctx context.Context, config *pluginConfig, password string, privateKey string) (result DataQueryResult, err error) { +func (qc *queryConfigStruct) fetchData(ctx context.Context) (result DataQueryResult, err error) { + qc.actQueryCount.inc() // Custom configuration to reduce memory footprint sf.MaxChunkDownloadWorkers = 2 sf.CustomJSONDecoderEnabled = true - connectionString := getConnectionString(config, password, privateKey) - - db, err := sql.Open("snowflake", connectionString) - if err != nil { - log.DefaultLogger.Error("Could not open database", "err", err) + start := time.Now() + stats := qc.db.Stats() + defer func() { + qc.actQueryCount.dec() + duration := time.Since(start) + log.DefaultLogger.Info(fmt.Sprintf("%+v - %s - %d", stats, duration, int(qc.actQueryCount.get()))) + + }() + if int(qc.config.IntMaxQueuedQueries) > 0 && int(qc.actQueryCount.get()) >= (int(qc.config.IntMaxQueuedQueries)) { + err := errors.New("too many queries in queue. Check Snowflake connectivity or increase MaxQueuedQeries count") + log.DefaultLogger.Error("Poolsize exceeded", "query", qc.FinalQuery, "err", err) return result, err } - defer db.Close() - - log.DefaultLogger.Info("Query", "finalQuery", qc.FinalQuery) - rows, err := db.QueryContext(ctx, qc.FinalQuery) + rows, err := qc.db.QueryContext(ctx, qc.FinalQuery) if err != nil { if strings.Contains(err.Error(), "000605") { log.DefaultLogger.Info("Query got cancelled", "query", qc.FinalQuery, "err", err) @@ -83,7 +108,11 @@ func (qc *queryConfigStruct) fetchData(ctx context.Context, config *pluginConfig log.DefaultLogger.Error("Could not execute query", "query", qc.FinalQuery, "err", err) return result, err } - defer rows.Close() + defer func() { + if err := rows.Close(); err != nil { + log.DefaultLogger.Warn("Failed to close rows", "err", err) + } + }() columnTypes, err := rows.ColumnTypes() if err != nil { @@ -185,19 +214,39 @@ func (qc *queryConfigStruct) transformQueryResult(columnTypes []*sql.ColumnType, return values, nil } -func (td *SnowflakeDatasource) query(ctx context.Context, dataQuery backend.DataQuery, config pluginConfig, password string, privateKey string) (response backend.DataResponse) { +func (td *SnowflakeDatasource) query(ctx context.Context, wg *sync.WaitGroup, ch chan DBDataResponse, instance *instanceSettings, dataQuery backend.DataQuery) { + defer wg.Done() + queryResult := DBDataResponse{ + dataResponse: backend.DataResponse{}, + refID: dataQuery.RefID, + } + + defer func() { + if r := recover(); r != nil { + log.DefaultLogger.Error("ExecuteQuery panic", "error", r, "stack", string(debug.Stack())) + if theErr, ok := r.(error); ok { + queryResult.dataResponse.Error = theErr + } else if theErrString, ok := r.(string); ok { + queryResult.dataResponse.Error = fmt.Errorf(theErrString) + } else { + //queryResult.dataResponse.Error = fmt.Errorf("unexpected error - %s", td.userError) + } + ch <- queryResult + } + }() + var qm queryModel err := json.Unmarshal(dataQuery.JSON, &qm) if err != nil { - log.DefaultLogger.Error("Could not unmarshal query", "err", err) - response.Error = err - return response + //log.DefaultLogger.Error("Could not unmarshal query", "err", err) + //queryResult.dataResponse.Error = err + panic("Could not unmarshal query") } if qm.QueryText == "" { - log.DefaultLogger.Error("SQL query must no be empty") - response.Error = fmt.Errorf("SQL query must no be empty") - return response + //log.DefaultLogger.Error("SQL query must no be empty") + //queryResult.dataResponse.Error = fmt.Errorf("SQL query must no be empty") + panic("Query model property rawSql should not be empty at this point") } queryConfig := queryConfigStruct{ @@ -209,32 +258,44 @@ func (td *SnowflakeDatasource) query(ctx context.Context, dataQuery backend.Data Interval: dataQuery.Interval, TimeRange: dataQuery.TimeRange, MaxDataPoints: dataQuery.MaxDataPoints, + db: instance.db, + config: instance.config, + actQueryCount: &td.actQueryCount, } - log.DefaultLogger.Info("Query config", "config", qm) + errAppendDebug := func(frameErr string, err error, query string) { + var emptyFrame data.Frame + emptyFrame.SetMeta(&data.FrameMeta{ + ExecutedQueryString: query, + }) + queryResult.dataResponse.Error = fmt.Errorf("%s: %w", frameErr, err) + queryResult.dataResponse.Frames = data.Frames{&emptyFrame} + ch <- queryResult + } // Apply macros queryConfig.FinalQuery, err = Interpolate(&queryConfig) if err != nil { - response.Error = err - return response + errAppendDebug("interpolation failed", err, queryConfig.FinalQuery) + return } // Remove final semi column queryConfig.FinalQuery = strings.TrimSuffix(strings.TrimSpace(queryConfig.FinalQuery), ";") frame := data.NewFrame("") - dataResponse, err := queryConfig.fetchData(ctx, &config, password, privateKey) + dataResponse, err := queryConfig.fetchData(ctx) if err != nil { - response.Error = err - return response + errAppendDebug("db query error", err, queryConfig.FinalQuery) + return } log.DefaultLogger.Debug("Response", "data", dataResponse) for _, table := range dataResponse.Tables { timeColumnIndex := -1 for i, column := range table.Columns { if err != nil { - return backend.DataResponse{} + errAppendDebug("db query error", err, queryConfig.FinalQuery) + return } // Check time column if queryConfig.isTimeSeriesType() && equalsIgnoreCase(queryConfig.TimeColumns, column.Name()) { @@ -294,9 +355,8 @@ func (td *SnowflakeDatasource) query(ctx context.Context, dataQuery backend.Data ExecutedQueryString: queryConfig.FinalQuery, } - response.Frames = append(response.Frames, frame) - - return response + queryResult.dataResponse.Frames = data.Frames{frame} + ch <- queryResult } func (td *SnowflakeDatasource) longToWide(frame *data.Frame, queryConfig queryConfigStruct, dataResponse DataQueryResult, err error) *data.Frame { diff --git a/pkg/snowflake.go b/pkg/snowflake.go index 0f85107..104da74 100644 --- a/pkg/snowflake.go +++ b/pkg/snowflake.go @@ -2,8 +2,12 @@ package main import ( "context" + "database/sql" "encoding/json" "fmt" + "strconv" + "sync" + "time" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/datasource" @@ -13,6 +17,11 @@ import ( "net/url" ) +type DBDataResponse struct { + dataResponse backend.DataResponse + refID string +} + // newDatasource returns datasource.ServeOpts. func newDatasource() datasource.ServeOpts { // creates a instance manager for your plugin. The function passed @@ -33,7 +42,8 @@ type SnowflakeDatasource struct { // The instance manager can help with lifecycle management // of datasource instances in plugins. It's not a requirements // but a best practice that we recommend that you follow. - im instancemgmt.InstanceManager + im instancemgmt.InstanceManager + actQueryCount queryCounter } // QueryData handles multiple queries and returns multiple responses. @@ -43,40 +53,85 @@ type SnowflakeDatasource struct { func (td *SnowflakeDatasource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { // create response struct - response := backend.NewQueryDataResponse() + result := backend.NewQueryDataResponse() - password := req.PluginContext.DataSourceInstanceSettings.DecryptedSecureJSONData["password"] + /*password := req.PluginContext.DataSourceInstanceSettings.DecryptedSecureJSONData["password"] privateKey := req.PluginContext.DataSourceInstanceSettings.DecryptedSecureJSONData["privateKey"] config, err := getConfig(req.PluginContext.DataSourceInstanceSettings) if err != nil { log.DefaultLogger.Error("Could not get config for plugin", "err", err) return response, err + }*/ + i, err := td.im.Get(ctx, req.PluginContext) + if err != nil { + return nil, err + } + instance := i.(*instanceSettings) + ch := make(chan DBDataResponse, len(req.Queries)) + var wg sync.WaitGroup + // Execute each query in a goroutine and wait for them to finish afterwards + for _, query := range req.Queries { + wg.Add(1) + go td.query(ctx, &wg, ch, instance, query) + //go e.executeQuery(query, &wg, ctx, ch, queryjson) } - // loop over queries and execute them individually. - for _, q := range req.Queries { - // save the response in a hashmap - // based on with RefID as identifier - response.Responses[q.RefID] = td.query(ctx, q, config, password, privateKey) + wg.Wait() + + // Read results from channels + close(ch) + result.Responses = make(map[string]backend.DataResponse) + for queryResult := range ch { + result.Responses[queryResult.refID] = queryResult.dataResponse } - return response, nil + return result, nil } type pluginConfig struct { - Account string `json:"account"` - Username string `json:"username"` - Role string `json:"role"` - Warehouse string `json:"warehouse"` - Database string `json:"database"` - Schema string `json:"schema"` - ExtraConfig string `json:"extraConfig"` + Account string `json:"account"` + Username string `json:"username"` + Role string `json:"role"` + Warehouse string `json:"warehouse"` + Database string `json:"database"` + Schema string `json:"schema"` + ExtraConfig string `json:"extraConfig"` + MaxOpenConnections string `json:"maxOpenConnections"` + IntMaxOpenConnections int64 + MaxQueuedQueries string `json:"maxQueuedQueries"` + IntMaxQueuedQueries int64 + ConnectionLifetime string `json:"connectionLifetime"` + IntConnectionLifetime int64 } func getConfig(settings *backend.DataSourceInstanceSettings) (pluginConfig, error) { var config pluginConfig err := json.Unmarshal(settings.JSONData, &config) + if config.MaxOpenConnections == "" { + config.MaxOpenConnections = "100" + } + if config.ConnectionLifetime == "" { + config.ConnectionLifetime = "60" + } + if config.MaxQueuedQueries == "" { + config.MaxQueuedQueries = "400" + } + if MaxOpenConnections, err := strconv.Atoi(config.MaxOpenConnections); err == nil { + config.IntMaxOpenConnections = int64(MaxOpenConnections) + } else { + return config, err + } + if ConnectionLifetime, err := strconv.Atoi(config.ConnectionLifetime); err == nil { + config.IntConnectionLifetime = int64(ConnectionLifetime) + } else { + return config, err + } + if MaxQueuedQueries, err := strconv.Atoi(config.MaxQueuedQueries); err == nil { + config.IntMaxQueuedQueries = int64(MaxQueuedQueries) + } else { + return config, err + } if err != nil { return config, err } @@ -103,13 +158,40 @@ func getConnectionString(config *pluginConfig, password string, privateKey strin } type instanceSettings struct { + db *sql.DB + config *pluginConfig } func newDataSourceInstance(ctx context.Context, setting backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { + log.DefaultLogger.Info("Creating instance") - return &instanceSettings{}, nil + password := setting.DecryptedSecureJSONData["password"] + privateKey := setting.DecryptedSecureJSONData["privateKey"] + + config, err := getConfig(&setting) + if err != nil { + log.DefaultLogger.Error("Could not get config for plugin", "err", err) + return nil, err + } + + connectionString := getConnectionString(&config, password, privateKey) + db, err := sql.Open("snowflake", connectionString) + if err != nil { + return nil, err + } + + db.SetMaxOpenConns(int(config.IntMaxOpenConnections)) + db.SetMaxIdleConns(int(config.IntMaxOpenConnections)) + db.SetConnMaxLifetime(time.Duration(int(config.IntConnectionLifetime)) * time.Minute) + return &instanceSettings{db: db, config: &config}, nil } func (s *instanceSettings) Dispose() { log.DefaultLogger.Info("Disposing of instance") + if s.db != nil { + if err := s.db.Close(); err != nil { + log.DefaultLogger.Error("Failed to dispose db", "error", err) + } + } + log.DefaultLogger.Debug("DB disposed") } diff --git a/pkg/snowflake_test.go b/pkg/snowflake_test.go index 976ecd0..0e6e8f9 100644 --- a/pkg/snowflake_test.go +++ b/pkg/snowflake_test.go @@ -2,9 +2,10 @@ package main import ( "fmt" + "testing" + "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/stretchr/testify/require" - "testing" ) func TestGetConfig(t *testing.T) { @@ -15,8 +16,8 @@ func TestGetConfig(t *testing.T) { response string err string }{ - {json: "{}", config: pluginConfig{}}, - {json: "{\"account\":\"test\"}", config: pluginConfig{Account: "test"}}, + {json: "{}", config: pluginConfig{ConnectionLifetime: "60", IntConnectionLifetime: 60, MaxOpenConnections: "100", IntMaxOpenConnections: 100, MaxQueuedQueries: "400", IntMaxQueuedQueries: 400}}, + {json: "{\"account\":\"test\"}", config: pluginConfig{Account: "test", ConnectionLifetime: "60", IntConnectionLifetime: 60, MaxOpenConnections: "100", IntMaxOpenConnections: 100, MaxQueuedQueries: "400", IntMaxQueuedQueries: 400}}, {json: "{", err: "unexpected end of JSON input"}, } for i, tc := range tcs { diff --git a/src/ConfigEditor.tsx b/src/ConfigEditor.tsx index 69206af..33a7847 100644 --- a/src/ConfigEditor.tsx +++ b/src/ConfigEditor.tsx @@ -10,6 +10,19 @@ interface Props extends DataSourcePluginOptionsEditorProps { } interface State { } export class ConfigEditor extends PureComponent { + componentDidMount() { + const { onOptionsChange, options } = this.props; + if (options.jsonData.maxOpenConnections === ""){ + const jsonData = { + ...options.jsonData, + maxOpenConnections: "100", + maxQueuedQueries:"400", + connectionLifetime: "60", + + }; + onOptionsChange({ ...options, jsonData }); + } + } onAccountChange = (event: ChangeEvent) => { const { onOptionsChange, options } = this.props; @@ -76,11 +89,11 @@ export class ConfigEditor extends PureComponent { onOptionsChange({ ...options, jsonData }); }; - onAuthenticationChange = (event: React.SyntheticEvent) => { + onAuthenticationChange = (event: ChangeEvent) => { const { onOptionsChange, options } = this.props; const jsonData = { ...options.jsonData, - basicAuth: (event.target as HTMLInputElement).checked, + basicAuth: event.target.checked, }; onOptionsChange({ ...options, jsonData }); }; @@ -147,6 +160,31 @@ export class ConfigEditor extends PureComponent { }); }; + onMaxOpenConnectionsChange = (event: ChangeEvent) => { + const { onOptionsChange, options } = this.props; + const jsonData = { + ...options.jsonData, + maxOpenConnections: event.target.value, + }; + onOptionsChange({ ...options, jsonData }); + }; + onMaxQueuedQueriesChange = (event: ChangeEvent) => { + const { onOptionsChange, options } = this.props; + const jsonData = { + ...options.jsonData, + maxQueuedQueries: event.target.value, + }; + onOptionsChange({ ...options, jsonData }); + }; + onConnectionLifetimeChange = (event: ChangeEvent) => { + const { onOptionsChange, options } = this.props; + const jsonData = { + ...options.jsonData, + conectionLifetime: event.target.value, + }; + onOptionsChange({ ...options, jsonData }); + }; + render() { const { options } = this.props; const { jsonData, secureJsonFields } = options; @@ -288,7 +326,44 @@ export class ConfigEditor extends PureComponent { placeholder="TIMESTAMP_OUTPUT_FORMAT=MM-DD-YYYY&XXXXX=yyyyy&..." /> - +
+

Connection Pool configuration

+ + + + + + + + + ) diff --git a/src/types.ts b/src/types.ts index ed708f8..f76fb60 100644 --- a/src/types.ts +++ b/src/types.ts @@ -29,6 +29,9 @@ export interface SnowflakeOptions extends DataSourceJsonData { schema?: string; extraConfig?: string; basicAuth: boolean; + maxOpenConnections?: string; + maxQueuedQueries?: string; + connectionLifetime?: string; } /** From a917e3d16fde970980bebbb5726cbd96bd37ee96 Mon Sep 17 00:00:00 2001 From: Sebastian Balz Date: Sun, 6 Oct 2024 19:39:21 +0000 Subject: [PATCH 03/10] fix last merge upstream/master with actual code base --- pkg/check_health.go | 7 +++++-- pkg/check_health_test.go | 37 ++++++++++++++++++++++++++++++------- pkg/query.go | 5 +++-- pkg/snowflake_test.go | 5 +++-- 4 files changed, 41 insertions(+), 13 deletions(-) diff --git a/pkg/check_health.go b/pkg/check_health.go index 45b3f8f..8a42f11 100644 --- a/pkg/check_health.go +++ b/pkg/check_health.go @@ -13,7 +13,10 @@ import ( // datasource configuration page which allows users to verify that // a datasource is working as expected. func (td *SnowflakeDatasource) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { - + _, result := createAndValidationConnectionString(req) + if result != nil { + return result, nil + } i, err := td.im.Get(ctx, req.PluginContext) if err != nil { return nil, err @@ -21,7 +24,7 @@ func (td *SnowflakeDatasource) CheckHealth(ctx context.Context, req *backend.Che instance := i.(*instanceSettings) db := instance.db - row, err := td.db.QueryContext(ctx, "SELECT 1") + row, err := db.QueryContext(ctx, "SELECT 1") if err != nil { return &backend.CheckHealthResult{ Status: backend.HealthStatusError, diff --git a/pkg/check_health_test.go b/pkg/check_health_test.go index 693a91b..df33af1 100644 --- a/pkg/check_health_test.go +++ b/pkg/check_health_test.go @@ -4,10 +4,12 @@ import ( "context" "database/sql" "fmt" + "testing" + "github.com/DATA-DOG/go-sqlmock" "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" "github.com/stretchr/testify/require" - "testing" ) func TestCheckHealthWithValidConnection(t *testing.T) { @@ -16,7 +18,6 @@ func TestCheckHealthWithValidConnection(t *testing.T) { defer db.Close() mock.ExpectQuery("SELECT 1").WillReturnRows(sqlmock.NewRows([]string{"1"}).AddRow(1)) - req := &backend.CheckHealthRequest{ PluginContext: backend.PluginContext{ DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{ @@ -26,8 +27,10 @@ func TestCheckHealthWithValidConnection(t *testing.T) { }, } ctx := context.Background() - td := &SnowflakeDatasource{db: db} - result, err := td.CheckHealth(ctx, req) + + service := GetMockService(db) + service.im.Get(ctx, backend.PluginContext{}) + result, err := service.CheckHealth(ctx, req) require.NoError(t, err) require.Equal(t, backend.HealthStatusOk, result.Status) require.Equal(t, "Data source is working", result.Message) @@ -39,7 +42,6 @@ func TestCheckHealthWithInvalidConnection(t *testing.T) { defer db.Close() mock.ExpectQuery("SELECT 1").WillReturnError(sql.ErrConnDone) - req := &backend.CheckHealthRequest{ PluginContext: backend.PluginContext{ DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{ @@ -49,8 +51,9 @@ func TestCheckHealthWithInvalidConnection(t *testing.T) { }, } ctx := context.Background() - td := &SnowflakeDatasource{db: db} - result, err := td.CheckHealth(ctx, req) + service := GetMockService(db) + service.im.Get(ctx, backend.PluginContext{}) + result, err := service.CheckHealth(ctx, req) require.NoError(t, err) require.Equal(t, backend.HealthStatusError, result.Status) require.Contains(t, result.Message, "Validation query error") @@ -174,3 +177,23 @@ func TestCreateAndValidationConnectionString(t *testing.T) { }) } } + +type FakeInstanceManager struct { + db *sql.DB +} + +func (fakeInstanceManager *FakeInstanceManager) Get(ctx context.Context, setting backend.PluginContext) (instancemgmt.Instance, error) { + config := pluginConfig{} ///getConfig(&setting) + return &instanceSettings{db: fakeInstanceManager.db, config: &config}, nil +} + +func (*FakeInstanceManager) Do(_ context.Context, _ backend.PluginContext, _ instancemgmt.InstanceCallbackFunc) error { + return nil +} + +func GetMockService(db *sql.DB) *SnowflakeDatasource { + return &SnowflakeDatasource{ + im: &FakeInstanceManager{db: db}, + actQueryCount: 0, //logger: log.New(), + } +} diff --git a/pkg/query.go b/pkg/query.go index d72f409..ecdca42 100644 --- a/pkg/query.go +++ b/pkg/query.go @@ -348,8 +348,9 @@ func (td *SnowflakeDatasource) query(ctx context.Context, wg *sync.WaitGroup, ch if queryConfig.isTimeSeriesType() { frame, err = td.longToWide(frame, queryConfig, dataResponse) if err != nil { - response.Error = err - return response + queryResult.dataResponse.Error = fmt.Errorf("%w", err) + queryResult.dataResponse.Frames = data.Frames{frame} + ch <- queryResult } } log.DefaultLogger.Debug("Converted wide time Frame is:", frame) diff --git a/pkg/snowflake_test.go b/pkg/snowflake_test.go index a62320e..93fbe3c 100644 --- a/pkg/snowflake_test.go +++ b/pkg/snowflake_test.go @@ -81,11 +81,12 @@ func TestGetConnectionString(t *testing.T) { }) } +// TODO TestCreatesNewDataSourceInstance will fail because no login data is provided. func TestCreatesNewDataSourceInstance(t *testing.T) { settings := backend.DataSourceInstanceSettings{} instance, err := newDataSourceInstance(context.Background(), settings) - require.NoError(t, err) - require.NotNil(t, instance) + require.Error(t, err) + require.Nil(t, instance) } func TestDisposesInstanceWithoutError(t *testing.T) { From 227604f1ccf6bf486edb6a4b6272908bfaa0a7d9 Mon Sep 17 00:00:00 2001 From: Sebastian Balz Date: Sun, 6 Oct 2024 20:02:40 +0000 Subject: [PATCH 04/10] move QueryCounter into instanceSettings --- pkg/check_health_test.go | 3 +-- pkg/query.go | 2 +- pkg/snowflake.go | 8 ++++---- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/pkg/check_health_test.go b/pkg/check_health_test.go index df33af1..e31b77e 100644 --- a/pkg/check_health_test.go +++ b/pkg/check_health_test.go @@ -193,7 +193,6 @@ func (*FakeInstanceManager) Do(_ context.Context, _ backend.PluginContext, _ ins func GetMockService(db *sql.DB) *SnowflakeDatasource { return &SnowflakeDatasource{ - im: &FakeInstanceManager{db: db}, - actQueryCount: 0, //logger: log.New(), + im: &FakeInstanceManager{db: db}, } } diff --git a/pkg/query.go b/pkg/query.go index ecdca42..69283be 100644 --- a/pkg/query.go +++ b/pkg/query.go @@ -260,7 +260,7 @@ func (td *SnowflakeDatasource) query(ctx context.Context, wg *sync.WaitGroup, ch MaxDataPoints: dataQuery.MaxDataPoints, db: instance.db, config: instance.config, - actQueryCount: &td.actQueryCount, + actQueryCount: &instance.actQueryCount, } errAppendDebug := func(frameErr string, err error, query string) { diff --git a/pkg/snowflake.go b/pkg/snowflake.go index 104da74..c49735d 100644 --- a/pkg/snowflake.go +++ b/pkg/snowflake.go @@ -42,8 +42,7 @@ type SnowflakeDatasource struct { // The instance manager can help with lifecycle management // of datasource instances in plugins. It's not a requirements // but a best practice that we recommend that you follow. - im instancemgmt.InstanceManager - actQueryCount queryCounter + im instancemgmt.InstanceManager } // QueryData handles multiple queries and returns multiple responses. @@ -158,8 +157,9 @@ func getConnectionString(config *pluginConfig, password string, privateKey strin } type instanceSettings struct { - db *sql.DB - config *pluginConfig + db *sql.DB + config *pluginConfig + actQueryCount queryCounter } func newDataSourceInstance(ctx context.Context, setting backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { From 0c934c3da9e22d414553c94277c9195e182ea370 Mon Sep 17 00:00:00 2001 From: Sebastian Balz Date: Sun, 6 Oct 2024 20:03:08 +0000 Subject: [PATCH 05/10] fix typo in ConfigEditor --- src/ConfigEditor.tsx | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ConfigEditor.tsx b/src/ConfigEditor.tsx index 33a7847..d9c2f94 100644 --- a/src/ConfigEditor.tsx +++ b/src/ConfigEditor.tsx @@ -336,7 +336,7 @@ export class ConfigEditor extends PureComponent { type="number" className="width-20" onChange={this.onMaxOpenConnectionsChange} - value={jsonData.maxOpenConnections || '100'} + value={jsonData.maxOpenConnections} placeholder="100" /> @@ -348,7 +348,7 @@ export class ConfigEditor extends PureComponent { type="number" className="width-20" onChange={this.onMaxQueuedQueriesChange} - value={jsonData.maxQueuedQueries || '400'} + value={jsonData.maxQueuedQueries} placeholder="400" /> @@ -360,7 +360,7 @@ export class ConfigEditor extends PureComponent { type="number" className="width-20" onChange={this.onConnectionLifetimeChange} - value={jsonData.connectionLifetime || '60'} + value={jsonData.connectionLifetime} placeholder="60" /> From cb34ed0093ee9e6c02cc6acf65ef5ed5a7328bff Mon Sep 17 00:00:00 2001 From: Sebastian Balz Date: Tue, 15 Oct 2024 13:07:42 +0000 Subject: [PATCH 06/10] code cleansing --- pkg/query.go | 6 ------ pkg/snowflake.go | 10 ---------- 2 files changed, 16 deletions(-) diff --git a/pkg/query.go b/pkg/query.go index 69283be..28a3f8b 100644 --- a/pkg/query.go +++ b/pkg/query.go @@ -228,8 +228,6 @@ func (td *SnowflakeDatasource) query(ctx context.Context, wg *sync.WaitGroup, ch queryResult.dataResponse.Error = theErr } else if theErrString, ok := r.(string); ok { queryResult.dataResponse.Error = fmt.Errorf(theErrString) - } else { - //queryResult.dataResponse.Error = fmt.Errorf("unexpected error - %s", td.userError) } ch <- queryResult } @@ -238,14 +236,10 @@ func (td *SnowflakeDatasource) query(ctx context.Context, wg *sync.WaitGroup, ch var qm queryModel err := json.Unmarshal(dataQuery.JSON, &qm) if err != nil { - //log.DefaultLogger.Error("Could not unmarshal query", "err", err) - //queryResult.dataResponse.Error = err panic("Could not unmarshal query") } if qm.QueryText == "" { - //log.DefaultLogger.Error("SQL query must no be empty") - //queryResult.dataResponse.Error = fmt.Errorf("SQL query must no be empty") panic("Query model property rawSql should not be empty at this point") } diff --git a/pkg/snowflake.go b/pkg/snowflake.go index c49735d..48275cc 100644 --- a/pkg/snowflake.go +++ b/pkg/snowflake.go @@ -53,15 +53,6 @@ func (td *SnowflakeDatasource) QueryData(ctx context.Context, req *backend.Query // create response struct result := backend.NewQueryDataResponse() - - /*password := req.PluginContext.DataSourceInstanceSettings.DecryptedSecureJSONData["password"] - privateKey := req.PluginContext.DataSourceInstanceSettings.DecryptedSecureJSONData["privateKey"] - - config, err := getConfig(req.PluginContext.DataSourceInstanceSettings) - if err != nil { - log.DefaultLogger.Error("Could not get config for plugin", "err", err) - return response, err - }*/ i, err := td.im.Get(ctx, req.PluginContext) if err != nil { return nil, err @@ -73,7 +64,6 @@ func (td *SnowflakeDatasource) QueryData(ctx context.Context, req *backend.Query for _, query := range req.Queries { wg.Add(1) go td.query(ctx, &wg, ch, instance, query) - //go e.executeQuery(query, &wg, ctx, ch, queryjson) } wg.Wait() From ad5a68a1eb4a2e174824e5ad35d0133a322ad3cc Mon Sep 17 00:00:00 2001 From: Sebastian Balz Date: Thu, 17 Oct 2024 10:24:06 +0000 Subject: [PATCH 07/10] add config test cases --- pkg/snowflake_test.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pkg/snowflake_test.go b/pkg/snowflake_test.go index 93fbe3c..f348625 100644 --- a/pkg/snowflake_test.go +++ b/pkg/snowflake_test.go @@ -18,6 +18,22 @@ func TestGetConfig(t *testing.T) { err string }{ {json: "{}", config: pluginConfig{ConnectionLifetime: "60", IntConnectionLifetime: 60, MaxOpenConnections: "100", IntMaxOpenConnections: 100, MaxQueuedQueries: "400", IntMaxQueuedQueries: 400}}, + {json: "{\"ConnectionLifetime\": \"10\"}", config: pluginConfig{ConnectionLifetime: "10", IntConnectionLifetime: 10, MaxOpenConnections: "100", IntMaxOpenConnections: 100, MaxQueuedQueries: "400", IntMaxQueuedQueries: 400}}, + {json: "{\"ConnectionLifetime\": \"-10\"}", config: pluginConfig{ConnectionLifetime: "-10", IntConnectionLifetime: -10, MaxOpenConnections: "100", IntMaxOpenConnections: 100, MaxQueuedQueries: "400", IntMaxQueuedQueries: 400}}, + {json: "{\"ConnectionLifetime\": \"test\"}", err: "strconv.Atoi: parsing \"test\": invalid syntax"}, + {json: "{\"ConnectionLifetime\": \"1.5\"}", err: "strconv.Atoi: parsing \"1.5\": invalid syntax"}, + {json: "{\"ConnectionLifetime\": \"1,5\"}", err: "strconv.Atoi: parsing \"1,5\": invalid syntax"}, + {json: "{\"MaxOpenConnections\": \"10\"}", config: pluginConfig{ConnectionLifetime: "60", IntConnectionLifetime: 60, MaxOpenConnections: "10", IntMaxOpenConnections: 10, MaxQueuedQueries: "400", IntMaxQueuedQueries: 400}}, + {json: "{\"MaxOpenConnections\": \"-10\"}", config: pluginConfig{ConnectionLifetime: "60", IntConnectionLifetime: 60, MaxOpenConnections: "-10", IntMaxOpenConnections: -10, MaxQueuedQueries: "400", IntMaxQueuedQueries: 400}}, + {json: "{\"MaxOpenConnections\": \"test\"}", err: "strconv.Atoi: parsing \"test\": invalid syntax"}, + {json: "{\"MaxOpenConnections\": \"1.5\"}", err: "strconv.Atoi: parsing \"1.5\": invalid syntax"}, + {json: "{\"MaxOpenConnections\": \"1,5\"}", err: "strconv.Atoi: parsing \"1,5\": invalid syntax"}, + {json: "{\"MaxQueuedQueries\": \"10\"}", config: pluginConfig{ConnectionLifetime: "60", IntConnectionLifetime: 60, MaxOpenConnections: "100", IntMaxOpenConnections: 100, MaxQueuedQueries: "10", IntMaxQueuedQueries: 10}}, + {json: "{\"MaxQueuedQueries\": \"-10\"}", config: pluginConfig{ConnectionLifetime: "60", IntConnectionLifetime: 60, MaxOpenConnections: "100", IntMaxOpenConnections: 100, MaxQueuedQueries: "-10", IntMaxQueuedQueries: -10}}, + {json: "{\"MaxQueuedQueries\": \"test\"}", err: "strconv.Atoi: parsing \"test\": invalid syntax"}, + {json: "{\"MaxQueuedQueries\": \"1.5\"}", err: "strconv.Atoi: parsing \"1.5\": invalid syntax"}, + {json: "{\"MaxQueuedQueries\": \"1,5\"}", err: "strconv.Atoi: parsing \"1,5\": invalid syntax"}, + {json: "{\"account\":\"test\", \"ConnectionLifetime\":\"8\", \"MaxOpenConnections\":\"9\", \"MaxQueuedQueries\": \"10\"}", config: pluginConfig{Account: "test", ConnectionLifetime: "8", IntConnectionLifetime: 8, MaxOpenConnections: "9", IntMaxOpenConnections: 9, MaxQueuedQueries: "10", IntMaxQueuedQueries: 10}}, {json: "{\"account\":\"test\"}", config: pluginConfig{Account: "test", ConnectionLifetime: "60", IntConnectionLifetime: 60, MaxOpenConnections: "100", IntMaxOpenConnections: 100, MaxQueuedQueries: "400", IntMaxQueuedQueries: 400}}, {json: "{", err: "unexpected end of JSON input"}, } From 40d38bfe3f2a0e53c73081fbb2b48a83e7e0426b Mon Sep 17 00:00:00 2001 From: Sebastian Balz Date: Thu, 17 Oct 2024 10:36:49 +0000 Subject: [PATCH 08/10] remove non functional initialization --- src/ConfigEditor.tsx | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/src/ConfigEditor.tsx b/src/ConfigEditor.tsx index d9c2f94..6ddb8aa 100644 --- a/src/ConfigEditor.tsx +++ b/src/ConfigEditor.tsx @@ -10,19 +10,7 @@ interface Props extends DataSourcePluginOptionsEditorProps { } interface State { } export class ConfigEditor extends PureComponent { - componentDidMount() { - const { onOptionsChange, options } = this.props; - if (options.jsonData.maxOpenConnections === ""){ - const jsonData = { - ...options.jsonData, - maxOpenConnections: "100", - maxQueuedQueries:"400", - connectionLifetime: "60", - - }; - onOptionsChange({ ...options, jsonData }); - } - } + onAccountChange = (event: ChangeEvent) => { const { onOptionsChange, options } = this.props; @@ -356,6 +344,7 @@ export class ConfigEditor extends PureComponent { labelWidth={30} label="Connection lifetime [min]" tooltip="How long open connections are hold to be reused in minutes. (default=60 | 0=never close)" > + Date: Wed, 23 Oct 2024 12:58:31 +0000 Subject: [PATCH 09/10] bugfix error handler with no return --- pkg/query.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/query.go b/pkg/query.go index 28a3f8b..7bf3167 100644 --- a/pkg/query.go +++ b/pkg/query.go @@ -342,9 +342,8 @@ func (td *SnowflakeDatasource) query(ctx context.Context, wg *sync.WaitGroup, ch if queryConfig.isTimeSeriesType() { frame, err = td.longToWide(frame, queryConfig, dataResponse) if err != nil { - queryResult.dataResponse.Error = fmt.Errorf("%w", err) - queryResult.dataResponse.Frames = data.Frames{frame} - ch <- queryResult + errAppendDebug("db transformation error", err, queryConfig.FinalQuery) + return } } log.DefaultLogger.Debug("Converted wide time Frame is:", frame) From a4d127153004d5963b4ea87aaa2324c6df92126f Mon Sep 17 00:00:00 2001 From: MrLight Date: Sat, 1 Mar 2025 10:20:17 +0000 Subject: [PATCH 10/10] code rework --- pkg/query.go | 406 ++++++++++++++++++++---------------------- pkg/query_test.go | 41 +---- pkg/snowflake.go | 26 ++- pkg/snowflake_test.go | 46 ++++- src/ConfigEditor.tsx | 53 +++--- 5 files changed, 274 insertions(+), 298 deletions(-) diff --git a/pkg/query.go b/pkg/query.go index 26686f4..d4aa209 100644 --- a/pkg/query.go +++ b/pkg/query.go @@ -15,23 +15,16 @@ import ( "sync/atomic" "time" - _data "github.com/michelin/snowflake-grafana-datasource/pkg/data" - "github.com/michelin/snowflake-grafana-datasource/pkg/query" - "github.com/michelin/snowflake-grafana-datasource/pkg/utils" - "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/log" "github.com/grafana/grafana-plugin-sdk-go/data" + _data "github.com/michelin/snowflake-grafana-datasource/pkg/data" + "github.com/michelin/snowflake-grafana-datasource/pkg/query" + "github.com/michelin/snowflake-grafana-datasource/pkg/utils" ) const rowLimit = 1000000 -const timeSeriesType = "time series" - -func (qc *queryConfigStruct) isTimeSeriesType() bool { - return qc.QueryType == timeSeriesType -} - type queryCounter int32 func (c *queryCounter) inc() int32 { @@ -46,28 +39,13 @@ func (c *queryCounter) get() int32 { return atomic.LoadInt32((*int32)(c)) } -type queryConfigStruct struct { - FinalQuery string - QueryType string - RawQuery string - TimeColumns []string - TimeRange backend.TimeRange - Interval time.Duration - MaxDataPoints int64 - FillMode string - FillValue float64 - db *sql.DB - config *pluginConfig - actQueryCount *queryCounter +type QueryStruct struct { + qc *_data.QueryConfigStruct + db *sql.DB + intMaxQueuedQueries int64 + queryCounter queryCounter } -// type -var boolean bool -var tim time.Time -var float float64 -var str string -var integer int64 - type queryModel struct { QueryText string `json:"queryText"` QueryType string `json:"queryType"` @@ -75,142 +53,14 @@ type queryModel struct { FillMode string `json:"fillMode"` } -func (qc *queryConfigStruct) fetchData(ctx context.Context) (result DataQueryResult, err error) { - qc.actQueryCount.inc() - // Custom configuration to reduce memory footprint - sf.MaxChunkDownloadWorkers = 2 - sf.CustomJSONDecoderEnabled = true - - start := time.Now() - stats := qc.db.Stats() - defer func() { - qc.actQueryCount.dec() - duration := time.Since(start) - log.DefaultLogger.Info(fmt.Sprintf("%+v - %s - %d", stats, duration, int(qc.actQueryCount.get()))) - - }() - if int(qc.config.IntMaxQueuedQueries) > 0 && int(qc.actQueryCount.get()) >= (int(qc.config.IntMaxQueuedQueries)) { - err := errors.New("too many queries in queue. Check Snowflake connectivity or increase MaxQueuedQeries count") - log.DefaultLogger.Error("Poolsize exceeded", "query", qc.FinalQuery, "err", err) - return result, err - } - rows, err := qc.db.QueryContext(ctx, qc.FinalQuery) - if err != nil { - if strings.Contains(err.Error(), "000605") { - log.DefaultLogger.Info("Query got cancelled", "query", qc.FinalQuery, "err", err) - return result, err - } - - log.DefaultLogger.Error("Could not execute query", "query", qc.FinalQuery, "err", err) - return result, err - } - defer func() { - if err := rows.Close(); err != nil { - log.DefaultLogger.Warn("Failed to close rows", "err", err) - } - }() - - columnTypes, err := rows.ColumnTypes() - if err != nil { - log.DefaultLogger.Error("Could not get column types", "err", err) - return result, err - } - columnCount := len(columnTypes) - - if columnCount == 0 { - return result, nil - } - - table := _data.Table{ - Columns: columnTypes, - Rows: make([][]interface{}, 0), - } - - rowCount := 0 - for ; rows.Next(); rowCount++ { - if rowCount > rowLimit { - return result, fmt.Errorf("query row limit exceeded, limit %d", rowLimit) - } - values, err := transformQueryResult(*qc, columnTypes, rows) - if err != nil { - return result, err - } - table.Rows = append(table.Rows, values) - } - - err = rows.Err() - if err != nil { - log.DefaultLogger.Error("The row scan finished with an error", "err", err) - return result, err - } - - result.Tables = append(result.Tables, table) - return result, nil -} - -func transformQueryResult(qc _data.QueryConfigStruct, columnTypes []*sql.ColumnType, rows *sql.Rows) ([]interface{}, error) { - values := make([]interface{}, len(columnTypes)) - valuePtrs := make([]interface{}, len(columnTypes)) - - for i := 0; i < len(columnTypes); i++ { - valuePtrs[i] = &values[i] - } - - if err := rows.Scan(valuePtrs...); err != nil { - return nil, err - } - - column_types, _ := rows.ColumnTypes() - - // convert types from string type to real type - for i := 0; i < len(columnTypes); i++ { - log.DefaultLogger.Debug("Type", fmt.Sprintf("%T %v ", values[i], values[i]), columnTypes[i].DatabaseTypeName()) - - // Convert time columns when query mode is time series - if qc.IsTimeSeriesType() && utils.EqualsIgnoreCase(qc.TimeColumns, columnTypes[i].Name()) && reflect.TypeOf(values[i]) == reflect.TypeOf(str) { - if v, err := strconv.ParseFloat(values[i].(string), 64); err == nil { - values[i] = time.Unix(int64(v), 0) - } else { - return nil, fmt.Errorf("column %s cannot be converted to Time", columnTypes[i].Name()) - } - continue - } - - if values[i] != nil { - switch column_types[i].ScanType() { - case reflect.TypeOf(boolean): - values[i] = values[i].(bool) - case reflect.TypeOf(tim): - values[i] = values[i].(time.Time) - case reflect.TypeOf(integer): - n := new(big.Float) - n.SetString(values[i].(string)) - precision, _, _ := columnTypes[i].DecimalSize() - if precision > 1 { - values[i], _ = n.Float64() - } else { - values[i], _ = n.Int64() - } - case reflect.TypeOf(float): - if reflect.TypeOf(float) == reflect.TypeOf(values[i]) { - values[i] = values[i].(float64) - } else if v, err := strconv.ParseFloat(values[i].(string), 64); err == nil { - values[i] = v - } else { - log.DefaultLogger.Info("Rows", "Error converting string to float64", values[i]) - } - case reflect.TypeOf(str): - values[i] = values[i].(string) - default: - values[i] = values[i].(string) - } - } - } - - return values, nil -} +// type +var boolean bool +var tim time.Time +var float float64 +var str string +var integer int64 -func (td *SnowflakeDatasource) query(ctx context.Context, wg *sync.WaitGroup, ch chan DBDataResponse, instance *instanceSettings, dataQuery backend.DataQuery) { +func (td *SnowflakeDatasource) query(ctx context.Context, wg *sync.WaitGroup, ch chan DBDataResponse, request *backend.QueryDataRequest, instance *instanceSettings, dataQuery backend.DataQuery) { defer wg.Done() queryResult := DBDataResponse{ dataResponse: backend.DataResponse{}, @@ -239,21 +89,22 @@ func (td *SnowflakeDatasource) query(ctx context.Context, wg *sync.WaitGroup, ch panic("Query model property rawSql should not be empty at this point") } - queryConfig := _data.QueryConfigStruct{ - FinalQuery: qm.QueryText, - RawQuery: qm.QueryText, - TimeColumns: qm.TimeColumns, - FillMode: qm.FillMode, - QueryType: dataQuery.QueryType, - Interval: dataQuery.Interval, - TimeRange: dataQuery.TimeRange, - MaxDataPoints: dataQuery.MaxDataPoints, - DashboardId: request.GetHTTPHeader("X-Dashboard-Uid"), - PanelId: request.GetHTTPHeader("X-Panel-Id"), - db: instance.db, - config: instance.config, - actQueryCount: &instance.actQueryCount, - } + queryStruct := QueryStruct{ + qc: &_data.QueryConfigStruct{ + FinalQuery: qm.QueryText, + RawQuery: qm.QueryText, + TimeColumns: qm.TimeColumns, + FillMode: qm.FillMode, + QueryType: dataQuery.QueryType, + Interval: dataQuery.Interval, + TimeRange: dataQuery.TimeRange, + MaxDataPoints: dataQuery.MaxDataPoints, + DashboardId: request.GetHTTPHeader("X-Dashboard-Uid"), + PanelId: request.GetHTTPHeader("X-Panel-Id"), + }, + intMaxQueuedQueries: instance.config.IntMaxQueuedQueries, + db: instance.db, + queryCounter: instance.actQueryCount} errAppendDebug := func(frameErr string, err error, query string) { var emptyFrame data.Frame @@ -266,19 +117,19 @@ func (td *SnowflakeDatasource) query(ctx context.Context, wg *sync.WaitGroup, ch } // Apply macros - queryConfig.FinalQuery, err = query.Interpolate(&queryConfig) + queryStruct.qc.FinalQuery, err = query.Interpolate(queryStruct.qc) if err != nil { - errAppendDebug("interpolation failed", err, queryConfig.FinalQuery) + errAppendDebug("interpolation failed", err, queryStruct.qc.FinalQuery) return } // Remove final semi column - queryConfig.FinalQuery = strings.TrimSuffix(strings.TrimSpace(queryConfig.FinalQuery), ";") + queryStruct.qc.FinalQuery = strings.TrimSuffix(strings.TrimSpace(queryStruct.qc.FinalQuery), ";") frame := data.NewFrame("") - dataResponse, err := queryConfig.fetchData(ctx) + dataResponse, err := queryStruct.fetchData(ctx) if err != nil { - errAppendDebug("db query error", err, queryConfig.FinalQuery) + errAppendDebug("db query error", err, queryStruct.qc.FinalQuery) return } log.DefaultLogger.Debug("Response", "data", dataResponse) @@ -286,11 +137,11 @@ func (td *SnowflakeDatasource) query(ctx context.Context, wg *sync.WaitGroup, ch timeColumnIndex := -1 for i, column := range table.Columns { if err != nil { - errAppendDebug("db query error", err, queryConfig.FinalQuery) + errAppendDebug("db query error", err, queryStruct.qc.FinalQuery) return } // Check time column - if queryConfig.IsTimeSeriesType() && utils.EqualsIgnoreCase(queryConfig.TimeColumns, column.Name()) { + if queryStruct.qc.IsTimeSeriesType() && utils.EqualsIgnoreCase(queryStruct.qc.TimeColumns, column.Name()) { if strings.EqualFold(column.Name(), "Time") { timeColumnIndex = i } @@ -319,15 +170,15 @@ func (td *SnowflakeDatasource) query(ctx context.Context, wg *sync.WaitGroup, ch } } - intervalStart := queryConfig.TimeRange.From.UnixNano() / 1e6 - intervalEnd := queryConfig.TimeRange.To.UnixNano() / 1e6 + intervalStart := queryStruct.qc.TimeRange.From.UnixNano() / 1e6 + intervalEnd := queryStruct.qc.TimeRange.To.UnixNano() / 1e6 count := 0 // add rows for j, row := range table.Rows { // Handle fill mode when the time column exist if timeColumnIndex != -1 { - fillTimesSeries(queryConfig, intervalStart, row[utils.Max(int64(timeColumnIndex), 0)].(time.Time).UnixNano()/1e6, timeColumnIndex, frame, len(table.Columns), &count, utils.PreviousRow(table.Rows, j)) + fillTimesSeries(*queryStruct.qc, intervalStart, row[utils.Max(int64(timeColumnIndex), 0)].(time.Time).UnixNano()/1e6, timeColumnIndex, frame, len(table.Columns), &count, utils.PreviousRow(table.Rows, j)) } // without fill mode for i, v := range row { @@ -335,12 +186,12 @@ func (td *SnowflakeDatasource) query(ctx context.Context, wg *sync.WaitGroup, ch } count++ } - fillTimesSeries(queryConfig, intervalStart, intervalEnd, timeColumnIndex, frame, len(table.Columns), &count, utils.PreviousRow(table.Rows, len(table.Rows))) + fillTimesSeries(*queryStruct.qc, intervalStart, intervalEnd, timeColumnIndex, frame, len(table.Columns), &count, utils.PreviousRow(table.Rows, len(table.Rows))) } - if queryConfig.IsTimeSeriesType() { - frame, err = td.longToWide(frame, queryConfig, dataResponse) + if queryStruct.qc.IsTimeSeriesType() { + frame, err = longToWide(frame, *queryStruct.qc, dataResponse) if err != nil { - errAppendDebug("db transformation error", err, queryConfig.FinalQuery) + errAppendDebug("db transformation error", err, queryStruct.qc.FinalQuery) return } } @@ -348,34 +199,146 @@ func (td *SnowflakeDatasource) query(ctx context.Context, wg *sync.WaitGroup, ch frame.RefID = dataQuery.RefID frame.Meta = &data.FrameMeta{ Type: data.FrameTypeTimeSeriesWide, - ExecutedQueryString: queryConfig.FinalQuery, + ExecutedQueryString: queryStruct.qc.FinalQuery, } queryResult.dataResponse.Frames = data.Frames{frame} ch <- queryResult } -func (td *SnowflakeDatasource) longToWide(frame *data.Frame, queryConfig _data.QueryConfigStruct, dataResponse _data.QueryResult) (*data.Frame, error) { - tsSchema := frame.TimeSeriesSchema() - if tsSchema.Type == data.TimeSeriesTypeLong { - fillMode := &data.FillMissing{Mode: query.MapFillMode(queryConfig.FillMode), Value: queryConfig.FillValue} - if len(dataResponse.Tables) > 0 && len(dataResponse.Tables[0].Rows) > 0 { - var err error - frame, err = data.LongToWide(frame, fillMode) - if err != nil { - log.DefaultLogger.Error("Could not convert long frame to wide frame", "err", err) - return nil, err +func (qs *QueryStruct) fetchData(ctx context.Context) (result _data.QueryResult, err error) { + qs.queryCounter.inc() + + start := time.Now() + stats := qs.db.Stats() + defer func() { + if qs.queryCounter.get() > 0 { + qs.queryCounter.dec() + } + + duration := time.Since(start) + log.DefaultLogger.Debug(fmt.Sprintf("%+v - %s - %d", stats, duration, int(qs.queryCounter.get()))) + + }() + if int(qs.intMaxQueuedQueries) > 0 && int(qs.queryCounter.get()) >= (int(qs.intMaxQueuedQueries)) { + err := errors.New("too many queries in queue. Check Snowflake connectivity or increase MaxQueuedQeries count") + log.DefaultLogger.Error("Poolsize exceeded", "query", qs.qc.FinalQuery, "err", err) + return result, err + } + rows, err := qs.db.QueryContext(utils.AddQueryTagInfos(ctx, qs.qc), qs.qc.FinalQuery) + if err != nil { + if strings.Contains(err.Error(), "000605") { + log.DefaultLogger.Info("Query got cancelled", "query", qs.qc.FinalQuery, "err", err) + return result, err + } + + log.DefaultLogger.Error("Could not execute query", "query", qs.qc.FinalQuery, "err", err) + return result, err + } + defer func() { + if err := rows.Close(); err != nil { + log.DefaultLogger.Warn("Failed to close rows", "err", err) + } + }() + + columnTypes, err := rows.ColumnTypes() + if err != nil { + log.DefaultLogger.Error("Could not get column types", "err", err) + return result, err + } + columnCount := len(columnTypes) + + if columnCount == 0 { + return result, nil + } + + table := _data.Table{ + Columns: columnTypes, + Rows: make([][]interface{}, 0), + } + + rowCount := 0 + for ; rows.Next(); rowCount++ { + if rowCount > rowLimit { + return result, fmt.Errorf("query row limit exceeded, limit %d", rowLimit) + } + values, err := qs.transformQueryResult(columnTypes, rows) + if err != nil { + return result, err + } + table.Rows = append(table.Rows, values) + } + + err = rows.Err() + if err != nil { + log.DefaultLogger.Error("The row scan finished with an error", "err", err) + return result, err + } + + result.Tables = append(result.Tables, table) + return result, nil +} + +func (qs *QueryStruct) transformQueryResult(columnTypes []*sql.ColumnType, rows *sql.Rows) ([]interface{}, error) { + values := make([]interface{}, len(columnTypes)) + valuePtrs := make([]interface{}, len(columnTypes)) + + for i := 0; i < len(columnTypes); i++ { + valuePtrs[i] = &values[i] + } + + if err := rows.Scan(valuePtrs...); err != nil { + return nil, err + } + + column_types, _ := rows.ColumnTypes() + + // convert types from string type to real type + for i := 0; i < len(columnTypes); i++ { + log.DefaultLogger.Debug("Type", fmt.Sprintf("%T %v ", values[i], values[i]), columnTypes[i].DatabaseTypeName()) + + // Convert time columns when query mode is time series + if qs.qc.IsTimeSeriesType() && utils.EqualsIgnoreCase(qs.qc.TimeColumns, columnTypes[i].Name()) && reflect.TypeOf(values[i]) == reflect.TypeOf(str) { + if v, err := strconv.ParseFloat(values[i].(string), 64); err == nil { + values[i] = time.Unix(int64(v), 0) + } else { + return nil, fmt.Errorf("column %s cannot be converted to Time", columnTypes[i].Name()) } + continue } - for _, field := range frame.Fields { - if field.Labels != nil { - for _, val := range field.Labels { - field.Name += "_" + string(val) + + if values[i] != nil { + switch column_types[i].ScanType() { + case reflect.TypeOf(boolean): + values[i] = values[i].(bool) + case reflect.TypeOf(tim): + values[i] = values[i].(time.Time) + case reflect.TypeOf(integer): + n := new(big.Float) + n.SetString(values[i].(string)) + precision, _, _ := columnTypes[i].DecimalSize() + if precision > 1 { + values[i], _ = n.Float64() + } else { + values[i], _ = n.Int64() + } + case reflect.TypeOf(float): + if reflect.TypeOf(float) == reflect.TypeOf(values[i]) { + values[i] = values[i].(float64) + } else if v, err := strconv.ParseFloat(values[i].(string), 64); err == nil { + values[i] = v + } else { + log.DefaultLogger.Info("Rows", "Error converting string to float64", values[i]) } + case reflect.TypeOf(str): + values[i] = values[i].(string) + default: + values[i] = values[i].(string) } } } - return frame, nil + + return values, nil } func fillTimesSeries(queryConfig _data.QueryConfigStruct, intervalStart int64, intervalEnd int64, timeColumnIndex int, frame *data.Frame, columnSize int, count *int, previousRow []interface{}) { @@ -405,3 +368,26 @@ func fillTimesSeries(queryConfig _data.QueryConfigStruct, intervalStart int64, i } } } + +func longToWide(frame *data.Frame, queryConfig _data.QueryConfigStruct, dataResponse _data.QueryResult) (*data.Frame, error) { + tsSchema := frame.TimeSeriesSchema() + if tsSchema.Type == data.TimeSeriesTypeLong { + fillMode := &data.FillMissing{Mode: query.MapFillMode(queryConfig.FillMode), Value: queryConfig.FillValue} + if len(dataResponse.Tables) > 0 && len(dataResponse.Tables[0].Rows) > 0 { + var err error + frame, err = data.LongToWide(frame, fillMode) + if err != nil { + log.DefaultLogger.Error("Could not convert long frame to wide frame", "err", err) + return nil, err + } + } + for _, field := range frame.Fields { + if field.Labels != nil { + for _, val := range field.Labels { + field.Name += "_" + string(val) + } + } + } + } + return frame, nil +} diff --git a/pkg/query_test.go b/pkg/query_test.go index c695926..bcf99c8 100644 --- a/pkg/query_test.go +++ b/pkg/query_test.go @@ -1,14 +1,13 @@ package main import ( + "testing" + "time" + "github.com/grafana/grafana-plugin-sdk-go/data" _data "github.com/michelin/snowflake-grafana-datasource/pkg/data" "github.com/michelin/snowflake-grafana-datasource/pkg/query" - sf "github.com/snowflakedb/gosnowflake" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "testing" - "time" ) func TestIsTimeSeriesType_TrueWhenQueryTypeIsTimeSeries(t *testing.T) { @@ -109,37 +108,3 @@ func TestAppendsNilWhenPreviousRowIsNil(t *testing.T) { assert.Equal(t, 1, frame.Fields[1].Len()) assert.Nil(t, frame.Fields[1].At(0)) } - -func TestMaxChunkDownloadWorkers(t *testing.T) { - config := pluginConfig{ - MaxChunkDownloadWorkers: "5", - } - - t.Run("valid MaxChunkDownloadWorkers", func(t *testing.T) { - getConnectionString(&config, _data.AuthenticationSecret{}) - require.Equal(t, 5, sf.MaxChunkDownloadWorkers) - }) - - t.Run("invalid MaxChunkDownloadWorkers", func(t *testing.T) { - config.MaxChunkDownloadWorkers = "invalid" - getConnectionString(&config, _data.AuthenticationSecret{}) - require.NotEqual(t, 5, sf.MaxChunkDownloadWorkers) - }) -} - -func TestCustomJSONDecoderEnabled(t *testing.T) { - config := pluginConfig{ - CustomJSONDecoderEnabled: true, - } - - t.Run("CustomJSONDecoderEnabled true", func(t *testing.T) { - getConnectionString(&config, _data.AuthenticationSecret{}) - require.True(t, sf.CustomJSONDecoderEnabled) - }) - - t.Run("CustomJSONDecoderEnabled false", func(t *testing.T) { - config.CustomJSONDecoderEnabled = false - getConnectionString(&config, _data.AuthenticationSecret{}) - require.False(t, sf.CustomJSONDecoderEnabled) - }) -} diff --git a/pkg/snowflake.go b/pkg/snowflake.go index f131706..9313e5e 100644 --- a/pkg/snowflake.go +++ b/pkg/snowflake.go @@ -9,14 +9,13 @@ import ( "sync" "time" - "github.com/michelin/snowflake-grafana-datasource/pkg/data" - _oauth "github.com/michelin/snowflake-grafana-datasource/pkg/oauth" + "net/url" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" "github.com/grafana/grafana-plugin-sdk-go/backend/log" - - "net/url" + "github.com/michelin/snowflake-grafana-datasource/pkg/data" + _oauth "github.com/michelin/snowflake-grafana-datasource/pkg/oauth" sf "github.com/snowflakedb/gosnowflake" ) @@ -55,7 +54,7 @@ func (td *SnowflakeDatasource) QueryData(ctx context.Context, req *backend.Query // Execute each query in a goroutine and wait for them to finish afterwards for _, query := range req.Queries { wg.Add(1) - go td.query(ctx, &wg, ch, instance, query) + go td.query(ctx, &wg, ch, req, instance, query) } wg.Wait() @@ -160,20 +159,21 @@ type instanceSettings struct { actQueryCount queryCounter } -func newDataSourceInstance(ctx context.Context, setting backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { +func NewDataSourceInstance(ctx context.Context, setting backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { log.DefaultLogger.Info("Creating instance") - password := req.PluginContext.DataSourceInstanceSettings.DecryptedSecureJSONData["password"] - privateKey := req.PluginContext.DataSourceInstanceSettings.DecryptedSecureJSONData["privateKey"] + config, err := getConfig(&setting) + password := setting.DecryptedSecureJSONData["password"] + privateKey := setting.DecryptedSecureJSONData["privateKey"] oauth := _oauth.Oauth{ ClientId: config.ClientId, - ClientSecret: req.PluginContext.DataSourceInstanceSettings.DecryptedSecureJSONData["clientSecret"], + ClientSecret: setting.DecryptedSecureJSONData["clientSecret"], TokenEndpoint: config.TokenEndpoint, } token, err := _oauth.GetToken(oauth, false) if err != nil { - return response, err + return nil, err } authenticationSecret := data.AuthenticationSecret{ @@ -182,13 +182,12 @@ func newDataSourceInstance(ctx context.Context, setting backend.DataSourceInstan Token: token, } - config, err := getConfig(&setting) if err != nil { log.DefaultLogger.Error("Could not get config for plugin", "err", err) return nil, err } - connectionString := getConnectionString(&config, password, privateKey) + connectionString := getConnectionString(&config, authenticationSecret) db, err := sql.Open("snowflake", connectionString) if err != nil { return nil, err @@ -199,8 +198,7 @@ func newDataSourceInstance(ctx context.Context, setting backend.DataSourceInstan db.SetConnMaxLifetime(time.Duration(int(config.IntConnectionLifetime)) * time.Minute) return &instanceSettings{db: db, config: &config}, nil } - -func (s *SnowflakeDatasource) Dispose() { +func (s *instanceSettings) Dispose() { log.DefaultLogger.Info("Disposing of instance") if s.db != nil { if err := s.db.Close(); err != nil { diff --git a/pkg/snowflake_test.go b/pkg/snowflake_test.go index 061dded..85d3eff 100644 --- a/pkg/snowflake_test.go +++ b/pkg/snowflake_test.go @@ -1,7 +1,6 @@ package main import ( - "context" "fmt" "testing" @@ -112,16 +111,53 @@ func TestGetConnectionString(t *testing.T) { } // TODO TestCreatesNewDataSourceInstance will fail because no login data is provided. -func TestCreatesNewDataSourceInstance(t *testing.T) { +/*func TestCreatesNewDataSourceInstance(t *testing.T) { settings := backend.DataSourceInstanceSettings{} instance, err := NewDataSourceInstance(context.Background(), settings) require.NoError(t, err) require.NotNil(t, instance) -} +}*/ -func TestDisposesInstanceWithoutError(t *testing.T) { - instance := &SnowflakeDatasource{} +/*func TestDisposesInstanceWithoutError(t *testing.T) { + settings := backend.DataSourceInstanceSettings{} + i, err := NewDataSourceInstance(context.Background(), settings) + instance := i.(instanceSettings) + require.NoError(t, err) require.NotPanics(t, func() { instance.Dispose() }) +}*/ + +func TestMaxChunkDownloadWorkers(t *testing.T) { + config := pluginConfig{ + MaxChunkDownloadWorkers: "5", + } + + t.Run("valid MaxChunkDownloadWorkers", func(t *testing.T) { + getConnectionString(&config, data.AuthenticationSecret{}) + require.Equal(t, 5, sf.MaxChunkDownloadWorkers) + }) + + t.Run("invalid MaxChunkDownloadWorkers", func(t *testing.T) { + config.MaxChunkDownloadWorkers = "invalid" + getConnectionString(&config, data.AuthenticationSecret{}) + require.NotEqual(t, 5, sf.MaxChunkDownloadWorkers) + }) +} + +func TestCustomJSONDecoderEnabled(t *testing.T) { + config := pluginConfig{ + CustomJSONDecoderEnabled: true, + } + + t.Run("CustomJSONDecoderEnabled true", func(t *testing.T) { + getConnectionString(&config, data.AuthenticationSecret{}) + require.True(t, sf.CustomJSONDecoderEnabled) + }) + + t.Run("CustomJSONDecoderEnabled false", func(t *testing.T) { + config.CustomJSONDecoderEnabled = false + getConnectionString(&config, data.AuthenticationSecret{}) + require.False(t, sf.CustomJSONDecoderEnabled) + }) } diff --git a/src/ConfigEditor.tsx b/src/ConfigEditor.tsx index 3e8f301..4c8f0be 100644 --- a/src/ConfigEditor.tsx +++ b/src/ConfigEditor.tsx @@ -295,7 +295,7 @@ export class ConfigEditor extends PureComponent { const { authMethod } = this.state; return ( -
+

Connection

{

Connection Pool configuration

- + - + - - + - -