Skip to content

Commit 18f7d87

Browse files
authored
SNOW-1854657 Fallback to rows parser if JSON returned while in Arrow batches mode (#1291)
1 parent 72a121f commit 18f7d87

12 files changed

+124
-86
lines changed

async.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func (sr *snowflakeRestful) getAsync(
119119
rows.errChannel <- err
120120
return err
121121
}
122-
rows.format = respd.Data.QueryResultFormat
122+
rows.format = resultFormat(respd.Data.QueryResultFormat)
123123
rows.errChannel <- nil // mark query status complete
124124
}
125125
} else {

chunk_downloader.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (scd *snowflakeChunkDownloader) nextResultSet() error {
9393
}
9494

9595
func (scd *snowflakeChunkDownloader) start() error {
96-
if usesArrowBatches(scd.ctx) {
96+
if usesArrowBatches(scd.ctx) && scd.getQueryResultFormat() == arrowFormat {
9797
return scd.startArrowBatches()
9898
}
9999
scd.CurrentChunkSize = len(scd.RowSet.JSON) // cache the size

chunk_downloader_test.go

-66
Original file line numberDiff line numberDiff line change
@@ -72,69 +72,3 @@ func TestWithArrowBatchesWhenQueryReturnsNoRowsAndReadingArrowBatches(t *testing
7272
assertEmptyE(t, batches)
7373
})
7474
}
75-
76-
func TestWithArrowBatchesWhenQueryReturnsSomeRowsInGivenFormatUsingNativeGoSQLInterface(t *testing.T) {
77-
for _, tc := range []struct {
78-
useJSON bool
79-
desc string
80-
}{
81-
{
82-
useJSON: true,
83-
desc: "json",
84-
},
85-
{
86-
useJSON: false,
87-
desc: "arrow",
88-
},
89-
} {
90-
t.Run(tc.desc, func(t *testing.T) {
91-
runDBTest(t, func(dbt *DBTest) {
92-
if tc.useJSON {
93-
dbt.mustExec(forceJSON)
94-
}
95-
var rows driver.Rows
96-
var err error
97-
err = dbt.conn.Raw(func(x interface{}) error {
98-
rows, err = x.(driver.QueryerContext).QueryContext(WithArrowBatches(context.Background()), "SELECT 1", nil)
99-
return err
100-
})
101-
assertNilF(t, err)
102-
defer func() {
103-
assertNilF(t, rows.Close())
104-
}()
105-
values := make([]driver.Value, 1)
106-
assertNotNilE(t, rows.Next(values)) // we deliberately check that there is an error, because we are in arrow batches mode
107-
assertEqualE(t, values[0], nil)
108-
})
109-
})
110-
}
111-
}
112-
113-
func TestWithArrowBatchesWhenQueryReturnsSomeRowsInGivenFormat(t *testing.T) {
114-
for _, tc := range []struct {
115-
useJSON bool
116-
desc string
117-
}{
118-
{
119-
useJSON: true,
120-
desc: "json",
121-
},
122-
{
123-
useJSON: false,
124-
desc: "arrow",
125-
},
126-
} {
127-
t.Run(tc.desc, func(t *testing.T) {
128-
runDBTest(t, func(dbt *DBTest) {
129-
if tc.useJSON {
130-
dbt.mustExec(forceJSON)
131-
}
132-
rows := dbt.mustQueryContext(WithArrowBatches(context.Background()), "SELECT 1")
133-
defer func() {
134-
assertNilF(t, rows.Close())
135-
}()
136-
assertFalseF(t, rows.Next())
137-
})
138-
})
139-
}
140-
}

chunk_test.go

+7-9
Original file line numberDiff line numberDiff line change
@@ -560,15 +560,13 @@ func testWithArrowBatchesButReturningJSON(t *testing.T, async bool) {
560560
_, err := rows.(SnowflakeRows).GetArrowBatches()
561561
assertNotNilF(t, err)
562562
var se *SnowflakeError
563-
errors.As(err, &se)
564-
assertEqualE(t, se.Message, errJSONResponseInArrowBatchesMode)
565-
566-
ctx = WithRequestID(context.Background(), requestID)
567-
rows2 := sct.mustQueryContext(ctx, "SELECT 'hello'", nil)
568-
defer rows2.Close()
569-
scanValues := make([]driver.Value, 1)
570-
assertNilF(t, rows2.Next(scanValues))
571-
assertEqualE(t, scanValues[0], "hello")
563+
assertTrueE(t, errors.As(err, &se))
564+
assertEqualE(t, se.Message, errMsgNonArrowResponseInArrowBatches)
565+
assertEqualE(t, se.Number, ErrNonArrowResponseInArrowBatches)
566+
567+
v := make([]driver.Value, 1)
568+
assertNilE(t, rows.Next(v))
569+
assertEqualE(t, v[0], "hello")
572570
})
573571
}
574572

cmd/arrow/.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
arrow_batches
2+
transform_batches_to_rows/transform_batches_to_rows
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
include ../../../gosnowflake.mak
2+
CMD_TARGET=transform_batches_to_rows
3+
4+
## Install
5+
install: cinstall
6+
7+
## Run
8+
run: crun
9+
10+
## Lint
11+
lint: clint
12+
13+
## Format source codes
14+
fmt: cfmt
15+
16+
.PHONY: install run lint fmt
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"database/sql/driver"
7+
"errors"
8+
"flag"
9+
sf "github.com/snowflakedb/gosnowflake"
10+
"io"
11+
"log"
12+
)
13+
14+
func main() {
15+
if !flag.Parsed() {
16+
flag.Parse()
17+
}
18+
19+
cfg, err := sf.GetConfigFromEnv([]*sf.ConfigParam{
20+
{Name: "Account", EnvName: "SNOWFLAKE_TEST_ACCOUNT", FailOnMissing: true},
21+
{Name: "User", EnvName: "SNOWFLAKE_TEST_USER", FailOnMissing: true},
22+
{Name: "Password", EnvName: "SNOWFLAKE_TEST_PASSWORD", FailOnMissing: true},
23+
{Name: "Host", EnvName: "SNOWFLAKE_TEST_HOST", FailOnMissing: false},
24+
{Name: "Port", EnvName: "SNOWFLAKE_TEST_PORT", FailOnMissing: false},
25+
{Name: "Protocol", EnvName: "SNOWFLAKE_TEST_PROTOCOL", FailOnMissing: false},
26+
})
27+
if err != nil {
28+
log.Fatalf("failed to create Config, err: %v", err)
29+
}
30+
31+
connector := sf.NewConnector(sf.SnowflakeDriver{}, *cfg)
32+
db := sql.OpenDB(connector)
33+
defer db.Close()
34+
35+
conn, err := db.Conn(context.Background())
36+
if err != nil {
37+
log.Fatalf("cannot create a connection. %v", err)
38+
}
39+
defer conn.Close()
40+
41+
_, err = conn.ExecContext(context.Background(), "ALTER SESSION SET GO_QUERY_RESULT_FORMAT = json")
42+
if err != nil {
43+
log.Fatalf("cannot force JSON as result format. %v", err)
44+
}
45+
46+
var rows driver.Rows
47+
err = conn.Raw(func(x any) error {
48+
rows, err = x.(driver.QueryerContext).QueryContext(sf.WithArrowBatches(context.Background()), "SELECT 1, 'hello' UNION SELECT 2, 'hi' UNION SELECT 3, 'howdy'", nil)
49+
return err
50+
})
51+
if err != nil {
52+
log.Fatalf("cannot run a query. %v", err)
53+
}
54+
defer rows.Close()
55+
56+
_, err = rows.(sf.SnowflakeRows).GetArrowBatches()
57+
var se *sf.SnowflakeError
58+
if !errors.As(err, &se) || se.Number != sf.ErrNonArrowResponseInArrowBatches {
59+
log.Fatalf("expected to fail while retrieving arrow batches")
60+
}
61+
62+
res := make([]driver.Value, 2)
63+
for {
64+
err = rows.Next(res)
65+
if err == io.EOF {
66+
break
67+
}
68+
println(res[0].(string), res[1].(string))
69+
}
70+
}

connection.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ func (sc *snowflakeConn) queryContextInternal(
435435
rows.sc = sc
436436
rows.queryID = data.Data.QueryID
437437
rows.ctx = ctx
438-
rows.format = data.Data.QueryResultFormat
438+
rows.format = resultFormat(data.Data.QueryResultFormat)
439439

440440
if isMultiStmt(&data.Data) {
441441
// handleMultiQuery is responsible to fill rows with childResults

doc.go

+12-1
Original file line numberDiff line numberDiff line change
@@ -702,7 +702,7 @@ the underlying data has already been loaded, and downloads it if not.
702702
703703
Limitations:
704704
705-
1. For some queries Snowflake may decide to return data in JSON format (examples: `SHOW PARAMETERS` or `ls @stage`). You cannot use JSON with Arrow batches context.
705+
1. For some queries Snowflake may decide to return data in JSON format (examples: `SHOW PARAMETERS` or `ls @stage`). You cannot use JSON with Arrow batches context. See alternative below.
706706
2. Snowflake handles timestamps in a range which is broader than available space in Arrow timestamp type. Because of that special treatment should be used (see below).
707707
3. When using numbers, Snowflake chooses the smallest type that covers all values in a batch. So even when your column is NUMBER(38, 0), if all values are 8bits, array.Int8 is used.
708708
@@ -741,6 +741,17 @@ WHen using NUMBERs with non zero scale, the value is returned as an integer type
741741
Example. When we have a 123.45 value that comes from NUMBER(9, 4), it will be represented as 1234500 with scale equal to 4. It is a client responsibility to interpret it correctly.
742742
Also - see limitations section above.
743743
744+
How to handle JSON responses in Arrow batches:
745+
746+
Due to technical limitations Snowflake backend may return JSON even if client expects Arrow.
747+
In that case Arrow batches are not available and the error with code ErrNonArrowResponseInArrowBatches is returned.
748+
The response is parsed to regular rows.
749+
You can read rows in a way described in transform_batches_to_rows.go example.
750+
This has a very strong limitation though - this is a very low level API (Go driver API), so there are no conversions ready.
751+
All values are returned as strings.
752+
Alternative approach is to rerun a query, but without enabling Arrow batches and use a general Go SQL API instead of driver API.
753+
It can be optimized by using `WithRequestID`, so backend returns results from cache.
754+
744755
# Binding Parameters
745756
746757
Binding allows a SQL statement to use a value that is stored in a Golang variable.

dsn.go

+1
Original file line numberDiff line numberDiff line change
@@ -974,6 +974,7 @@ func GetConfigFromEnv(properties []*ConfigParam) (*Config, error) {
974974
Region: region,
975975
Passcode: passcode,
976976
Application: application,
977+
Params: map[string]*string{},
977978
}
978979
return cfg, nil
979980
}

errors.go

+11-1
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,8 @@ const (
163163

164164
// ErrFailedToGetChunk is an error code for the case where it failed to get chunk of result set
165165
ErrFailedToGetChunk = 262000
166+
// ErrNonArrowResponseInArrowBatches is an error code for case where ArrowBatches mode is enabled, but response is not Arrow-based
167+
ErrNonArrowResponseInArrowBatches = 262001
166168

167169
/* transaction*/
168170

@@ -308,7 +310,7 @@ const (
308310
errMsgFailedToParseTomlFile = "failed to parse toml file. the params %v occurred error with value %v"
309311
errMsgFailedToFindDSNInTomlFile = "failed to find DSN in toml file."
310312
errMsgInvalidPermissionToTomlFile = "file permissions different than read/write for user. Your Permission: %v"
311-
errJSONResponseInArrowBatchesMode = "arrow batches enabled, but the response is not Arrow based"
313+
errMsgNonArrowResponseInArrowBatches = "arrow batches enabled, but the response is not Arrow based"
312314
)
313315

314316
// Returned if a DNS doesn't include account parameter.
@@ -374,3 +376,11 @@ func errNullValueInMap() *SnowflakeError {
374376
Message: errMsgNullValueInMap,
375377
}
376378
}
379+
380+
func errNonArrowResponseForArrowBatches(queryID string) *SnowflakeError {
381+
return &SnowflakeError{
382+
QueryID: queryID,
383+
Number: ErrNonArrowResponseInArrowBatches,
384+
Message: errMsgNonArrowResponseInArrowBatches,
385+
}
386+
}

rows.go

+3-6
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type snowflakeRows struct {
4646
errChannel chan error
4747
location *time.Location
4848
ctx context.Context
49-
format string
49+
format resultFormat
5050
}
5151

5252
func (rows *snowflakeRows) getLocation() *time.Location {
@@ -169,11 +169,8 @@ func (rows *snowflakeRows) GetArrowBatches() ([]*ArrowBatch, error) {
169169
return nil, err
170170
}
171171

172-
if rows.format != "arrow" {
173-
return nil, (&SnowflakeError{
174-
QueryID: rows.queryID,
175-
Message: errJSONResponseInArrowBatchesMode,
176-
}).exceptionTelemetry(rows.sc)
172+
if rows.format != arrowFormat {
173+
return nil, errNonArrowResponseForArrowBatches(rows.queryID).exceptionTelemetry(rows.sc)
177174
}
178175

179176
return rows.ChunkDownloader.getArrowBatches(), nil

0 commit comments

Comments
 (0)