Skip to content

Commit f20a464

Browse files
SNOW-1006312: Cancel context not propagated to snowflakeFileTransferAgent on PUT/GET command (#1108)
Added context to snowflakeFileTransferAgent to support cancel for file transfer process
1 parent f434413 commit f20a464

6 files changed

+66
-3
lines changed

connection.go

+14-3
Original file line numberDiff line numberDiff line change
@@ -169,10 +169,21 @@ func (sc *snowflakeConn) exec(
169169
}
170170

171171
// handle PUT/GET commands
172+
fileTransferChan := make(chan error, 1)
172173
if isFileTransfer(query) {
173-
data, err = sc.processFileTransfer(ctx, data, query, isInternal)
174-
if err != nil {
175-
return nil, err
174+
go func() {
175+
data, err = sc.processFileTransfer(ctx, data, query, isInternal)
176+
fileTransferChan <- err
177+
}()
178+
179+
select {
180+
case <-ctx.Done():
181+
logger.WithContext(ctx).Info("File transfer has been cancelled")
182+
return nil, ctx.Err()
183+
case err := <-fileTransferChan:
184+
if err != nil {
185+
return nil, err
186+
}
176187
}
177188
}
178189

connection_util.go

+1
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ func (sc *snowflakeConn) processFileTransfer(
8989
isInternal bool) (
9090
*execResponse, error) {
9191
sfa := snowflakeFileTransferAgent{
92+
ctx: ctx,
9293
sc: sc,
9394
data: &data.Data,
9495
command: query,

file_transfer_agent.go

+1
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ type SnowflakeFileTransferOptions struct {
106106
}
107107

108108
type snowflakeFileTransferAgent struct {
109+
ctx context.Context
109110
sc *snowflakeConn
110111
data *execResponseData
111112
command string

file_transfer_agent_test.go

+14
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ func TestGetBucketAccelerateConfiguration(t *testing.T) {
3030
}
3131
runSnowflakeConnTest(t, func(sct *SCTest) {
3232
sfa := &snowflakeFileTransferAgent{
33+
ctx: context.Background(),
3334
sc: sct.sc,
3435
commandType: uploadCommand,
3536
srcFiles: make([]string, 0),
@@ -81,6 +82,7 @@ func TestUnitDownloadWithInvalidLocalPath(t *testing.T) {
8182
func TestUnitGetLocalFilePathFromCommand(t *testing.T) {
8283
runSnowflakeConnTest(t, func(sct *SCTest) {
8384
sfa := &snowflakeFileTransferAgent{
85+
ctx: context.Background(),
8486
sc: sct.sc,
8587
commandType: uploadCommand,
8688
srcFiles: make([]string, 0),
@@ -110,6 +112,7 @@ func TestUnitGetLocalFilePathFromCommand(t *testing.T) {
110112
func TestUnitProcessFileCompressionType(t *testing.T) {
111113
runSnowflakeConnTest(t, func(sct *SCTest) {
112114
sfa := &snowflakeFileTransferAgent{
115+
ctx: context.Background(),
113116
sc: sct.sc,
114117
commandType: uploadCommand,
115118
srcFiles: make([]string, 0),
@@ -156,6 +159,7 @@ func TestUnitProcessFileCompressionType(t *testing.T) {
156159
func TestParseCommandWithInvalidStageLocation(t *testing.T) {
157160
runSnowflakeConnTest(t, func(sct *SCTest) {
158161
sfa := &snowflakeFileTransferAgent{
162+
ctx: context.Background(),
159163
sc: sct.sc,
160164
commandType: uploadCommand,
161165
srcFiles: make([]string, 0),
@@ -190,6 +194,7 @@ func TestParseCommandEncryptionMaterialMismatchError(t *testing.T) {
190194
}
191195

192196
sfa := &snowflakeFileTransferAgent{
197+
ctx: context.Background(),
193198
sc: sct.sc,
194199
commandType: uploadCommand,
195200
srcFiles: make([]string, 0),
@@ -226,6 +231,7 @@ func TestParseCommandInvalidStorageClientException(t *testing.T) {
226231
}
227232

228233
sfa := &snowflakeFileTransferAgent{
234+
ctx: context.Background(),
229235
sc: sct.sc,
230236
commandType: uploadCommand,
231237
srcFiles: make([]string, 0),
@@ -253,6 +259,7 @@ func TestParseCommandInvalidStorageClientException(t *testing.T) {
253259
func TestInitFileMetadataError(t *testing.T) {
254260
runSnowflakeConnTest(t, func(sct *SCTest) {
255261
sfa := &snowflakeFileTransferAgent{
262+
ctx: context.Background(),
256263
sc: sct.sc,
257264
commandType: uploadCommand,
258265
srcFiles: []string{"fileDoesNotExist.txt"},
@@ -352,6 +359,7 @@ func TestUpdateMetadataWithPresignedUrl(t *testing.T) {
352359

353360
sct.sc.rest.FuncPostQuery = presignedURLMock
354361
sfa := &snowflakeFileTransferAgent{
362+
ctx: context.Background(),
355363
sc: sct.sc,
356364
commandType: uploadCommand,
357365
command: "put file:///tmp/test_data/data1.txt @~",
@@ -400,6 +408,7 @@ func TestUpdateMetadataWithPresignedUrlForDownload(t *testing.T) {
400408
}
401409

402410
sfa := &snowflakeFileTransferAgent{
411+
ctx: context.Background(),
403412
sc: sct.sc,
404413
commandType: downloadCommand,
405414
command: "get @~/data1.txt.gz file:///tmp/testData",
@@ -421,6 +430,7 @@ func TestUpdateMetadataWithPresignedUrlForDownload(t *testing.T) {
421430
func TestUpdateMetadataWithPresignedUrlError(t *testing.T) {
422431
runSnowflakeConnTest(t, func(sct *SCTest) {
423432
sfa := &snowflakeFileTransferAgent{
433+
ctx: context.Background(),
424434
sc: sct.sc,
425435
command: "get @~/data1.txt.gz file:///tmp/testData",
426436
stageLocationType: gcsClient,
@@ -486,6 +496,7 @@ func TestUploadWhenFilesystemReadOnlyError(t *testing.T) {
486496
}
487497

488498
sfa := &snowflakeFileTransferAgent{
499+
ctx: context.Background(),
489500
sc: &snowflakeConn{
490501
cfg: &Config{},
491502
},
@@ -585,6 +596,7 @@ func TestCustomTmpDirPath(t *testing.T) {
585596
}
586597

587598
sfa := snowflakeFileTransferAgent{
599+
ctx: context.Background(),
588600
sc: &snowflakeConn{
589601
cfg: &Config{
590602
TmpDirPath: tmpDir,
@@ -646,6 +658,7 @@ func TestReadonlyTmpDirPathShouldFail(t *testing.T) {
646658
}
647659

648660
sfa := snowflakeFileTransferAgent{
661+
ctx: context.Background(),
649662
sc: &snowflakeConn{
650663
cfg: &Config{
651664
TmpDirPath: tmpDir,
@@ -720,6 +733,7 @@ func testUploadDownloadOneFile(t *testing.T, isStream bool) {
720733
}
721734

722735
sfa := snowflakeFileTransferAgent{
736+
ctx: context.Background(),
723737
sc: &snowflakeConn{
724738
cfg: &Config{
725739
TmpDirPath: tmpDir,

put_get_test.go

+35
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@ import (
1111
"math/rand"
1212
"os"
1313
"os/user"
14+
"path"
1415
"path/filepath"
1516
"strconv"
1617
"strings"
1718
"testing"
19+
"time"
1820
)
1921

2022
const createStageStmt = "CREATE OR REPLACE STAGE %v URL = '%v' CREDENTIALS = (%v)"
@@ -48,6 +50,7 @@ func TestPutError(t *testing.T) {
4850
}
4951

5052
fta := &snowflakeFileTransferAgent{
53+
ctx: context.Background(),
5154
data: data,
5255
options: &SnowflakeFileTransferOptions{
5356
RaisePutGetError: false,
@@ -64,6 +67,7 @@ func TestPutError(t *testing.T) {
6467
}
6568

6669
fta = &snowflakeFileTransferAgent{
70+
ctx: context.Background(),
6771
data: data,
6872
options: &SnowflakeFileTransferOptions{
6973
RaisePutGetError: true,
@@ -829,3 +833,34 @@ func TestPutGetMaxLOBSize(t *testing.T) {
829833
}
830834
})
831835
}
836+
837+
func TestPutCancel(t *testing.T) {
838+
sourceDir, err := os.Getwd()
839+
assertNilF(t, err)
840+
testData := path.Join(sourceDir, "/test_data/largefile.txt")
841+
842+
runDBTest(t, func(dbt *DBTest) {
843+
c := make(chan error)
844+
ctx, cancel := context.WithCancel(context.Background())
845+
go func() {
846+
// attempt to upload a large file, but it should be canceled in 3 seconds
847+
_, err := dbt.conn.ExecContext(
848+
ctx,
849+
fmt.Sprintf("put 'file://%v' @~/test_put_cancel overwrite=true",
850+
strings.ReplaceAll(testData, "\\", "/")))
851+
if err != nil {
852+
c <- err
853+
return
854+
}
855+
c <- nil
856+
}()
857+
// cancel after 3 seconds
858+
time.Sleep(3 * time.Second)
859+
fmt.Println("Canceled")
860+
cancel()
861+
ret := <-c
862+
assertNotNilF(t, ret)
863+
assertStringContainsF(t, ret.Error(), "context canceled", "failed to cancel.")
864+
close(c)
865+
})
866+
}

telemetry_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ func TestTelemetrySQLException(t *testing.T) {
5353
flushSize: defaultFlushSize,
5454
}
5555
sfa := &snowflakeFileTransferAgent{
56+
ctx: context.Background(),
5657
sc: sct.sc,
5758
commandType: uploadCommand,
5859
srcFiles: make([]string, 0),

0 commit comments

Comments
 (0)