Skip to content

Commit 45b3d45

Browse files
authored
SNOW-1769605 Add timeout to cloud storage (#1272)
1 parent 4f89e5b commit 45b3d45

12 files changed

+462
-122
lines changed

azure_storage_client.go

+22-11
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
)
2424

2525
type snowflakeAzureClient struct {
26+
cfg *Config
2627
}
2728

2829
type azureLocation struct {
@@ -85,9 +86,11 @@ func (util *snowflakeAzureClient) getFileHeader(meta *fileMetadata, filename str
8586
if meta.mockAzureClient != nil {
8687
blobClient = meta.mockAzureClient
8788
}
88-
resp, err := blobClient.GetProperties(context.Background(), &blob.GetPropertiesOptions{
89-
AccessConditions: &blob.AccessConditions{},
90-
CPKInfo: &blob.CPKInfo{},
89+
resp, err := withCloudStorageTimeout(util.cfg, func(ctx context.Context) (blob.GetPropertiesResponse, error) {
90+
return blobClient.GetProperties(ctx, &blob.GetPropertiesOptions{
91+
AccessConditions: &blob.AccessConditions{},
92+
CPKInfo: &blob.CPKInfo{},
93+
})
9194
})
9295
if err != nil {
9396
var se *azcore.ResponseError
@@ -203,9 +206,11 @@ func (util *snowflakeAzureClient) uploadFile(
203206
if meta.realSrcStream != nil {
204207
uploadSrc = meta.realSrcStream
205208
}
206-
_, err = blobClient.UploadStream(context.Background(), uploadSrc, &azblob.UploadStreamOptions{
207-
BlockSize: int64(uploadSrc.Len()),
208-
Metadata: azureMeta,
209+
_, err = withCloudStorageTimeout(util.cfg, func(ctx context.Context) (azblob.UploadStreamResponse, error) {
210+
return blobClient.UploadStream(ctx, uploadSrc, &azblob.UploadStreamOptions{
211+
BlockSize: int64(uploadSrc.Len()),
212+
Metadata: azureMeta,
213+
})
209214
})
210215
} else {
211216
var f *os.File
@@ -228,7 +233,9 @@ func (util *snowflakeAzureClient) uploadFile(
228233
if meta.options.putAzureCallback != nil {
229234
blobOptions.Progress = meta.options.putAzureCallback.call
230235
}
231-
_, err = blobClient.UploadFile(context.Background(), f, blobOptions)
236+
_, err = withCloudStorageTimeout(util.cfg, func(ctx context.Context) (azblob.UploadFileResponse, error) {
237+
return blobClient.UploadFile(ctx, f, blobOptions)
238+
})
232239
}
233240
if err != nil {
234241
var se *azcore.ResponseError
@@ -279,7 +286,9 @@ func (util *snowflakeAzureClient) nativeDownloadFile(
279286
blobClient = meta.mockAzureClient
280287
}
281288
if meta.options.GetFileToStream {
282-
blobDownloadResponse, err := blobClient.DownloadStream(context.Background(), &azblob.DownloadStreamOptions{})
289+
blobDownloadResponse, err := withCloudStorageTimeout(util.cfg, func(ctx context.Context) (azblob.DownloadStreamResponse, error) {
290+
return blobClient.DownloadStream(ctx, &azblob.DownloadStreamOptions{})
291+
})
283292
if err != nil {
284293
return err
285294
}
@@ -295,9 +304,11 @@ func (util *snowflakeAzureClient) nativeDownloadFile(
295304
return err
296305
}
297306
defer f.Close()
298-
_, err = blobClient.DownloadFile(
299-
context.Background(), f, &azblob.DownloadFileOptions{
300-
Concurrency: uint16(maxConcurrency)})
307+
_, err = withCloudStorageTimeout(util.cfg, func(ctx context.Context) (any, error) {
308+
return blobClient.DownloadFile(
309+
ctx, f, &azblob.DownloadFileOptions{
310+
Concurrency: uint16(maxConcurrency)})
311+
})
301312
if err != nil {
302313
return err
303314
}

azure_storage_client_test.go

+48-3
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,11 @@ func TestUploadFileWithAzureUploadFailedError(t *testing.T) {
177177
return azblob.UploadFileResponse{}, errors.New("unexpected error uploading file")
178178
},
179179
},
180+
sfa: &snowflakeFileTransferAgent{
181+
sc: &snowflakeConn{
182+
cfg: &Config{},
183+
},
184+
},
180185
}
181186

182187
uploadMeta.realSrcFileName = uploadMeta.srcFileName
@@ -230,6 +235,11 @@ func TestUploadStreamWithAzureUploadFailedError(t *testing.T) {
230235
return azblob.UploadStreamResponse{}, errors.New("unexpected error uploading file")
231236
},
232237
},
238+
sfa: &snowflakeFileTransferAgent{
239+
sc: &snowflakeConn{
240+
cfg: &Config{},
241+
},
242+
},
233243
}
234244

235245
uploadMeta.realSrcStream = uploadMeta.srcStream
@@ -291,6 +301,11 @@ func TestUploadFileWithAzureUploadTokenExpired(t *testing.T) {
291301
}
292302
},
293303
},
304+
sfa: &snowflakeFileTransferAgent{
305+
sc: &snowflakeConn{
306+
cfg: &Config{},
307+
},
308+
},
294309
}
295310

296311
uploadMeta.realSrcFileName = uploadMeta.srcFileName
@@ -362,6 +377,11 @@ func TestUploadFileWithAzureUploadNeedsRetry(t *testing.T) {
362377
}
363378
},
364379
},
380+
sfa: &snowflakeFileTransferAgent{
381+
sc: &snowflakeConn{
382+
cfg: &Config{},
383+
},
384+
},
365385
}
366386

367387
uploadMeta.realSrcFileName = uploadMeta.srcFileName
@@ -418,6 +438,11 @@ func TestDownloadOneFileToAzureFailed(t *testing.T) {
418438
return blob.GetPropertiesResponse{}, nil
419439
},
420440
},
441+
sfa: &snowflakeFileTransferAgent{
442+
sc: &snowflakeConn{
443+
cfg: &Config{},
444+
},
445+
},
421446
}
422447
err = new(remoteStorageUtil).downloadOneFile(&downloadMeta)
423448
if err == nil {
@@ -444,9 +469,14 @@ func TestGetFileHeaderErrorStatus(t *testing.T) {
444469
return blob.GetPropertiesResponse{}, errors.New("failed to retrieve headers")
445470
},
446471
},
472+
sfa: &snowflakeFileTransferAgent{
473+
sc: &snowflakeConn{
474+
cfg: &Config{},
475+
},
476+
},
447477
}
448478

449-
if header, err := new(snowflakeAzureClient).getFileHeader(&meta, "file.txt"); header != nil || err == nil {
479+
if header, err := (&snowflakeAzureClient{cfg: &Config{}}).getFileHeader(&meta, "file.txt"); header != nil || err == nil {
450480
t.Fatalf("expected null header, got: %v", header)
451481
}
452482
if meta.resStatus != errStatus {
@@ -477,9 +507,14 @@ func TestGetFileHeaderErrorStatus(t *testing.T) {
477507
}
478508
},
479509
},
510+
sfa: &snowflakeFileTransferAgent{
511+
sc: &snowflakeConn{
512+
cfg: &Config{},
513+
},
514+
},
480515
}
481516

482-
if header, err := new(snowflakeAzureClient).getFileHeader(&meta, "file.txt"); header != nil || err == nil {
517+
if header, err := (&snowflakeAzureClient{cfg: &Config{}}).getFileHeader(&meta, "file.txt"); header != nil || err == nil {
483518
t.Fatalf("expected null header, got: %v", header)
484519
}
485520
if meta.resStatus != notFoundFile {
@@ -505,7 +540,7 @@ func TestGetFileHeaderErrorStatus(t *testing.T) {
505540
},
506541
}
507542

508-
if header, err := new(snowflakeAzureClient).getFileHeader(&meta, "file.txt"); header != nil || err == nil {
543+
if header, err := (&snowflakeAzureClient{cfg: &Config{}}).getFileHeader(&meta, "file.txt"); header != nil || err == nil {
509544
t.Fatalf("expected null header, got: %v", header)
510545
}
511546
if meta.resStatus != renewToken {
@@ -540,6 +575,11 @@ func TestUploadFileToAzureClientCastFail(t *testing.T) {
540575
options: &SnowflakeFileTransferOptions{
541576
MultiPartThreshold: dataSizeThreshold,
542577
},
578+
sfa: &snowflakeFileTransferAgent{
579+
sc: &snowflakeConn{
580+
cfg: &Config{},
581+
},
582+
},
543583
}
544584

545585
uploadMeta.realSrcFileName = uploadMeta.srcFileName
@@ -573,6 +613,11 @@ func TestAzureGetHeaderClientCastFail(t *testing.T) {
573613
return blob.GetPropertiesResponse{}, nil
574614
},
575615
},
616+
sfa: &snowflakeFileTransferAgent{
617+
sc: &snowflakeConn{
618+
cfg: &Config{},
619+
},
620+
},
576621
}
577622

578623
_, err = new(snowflakeAzureClient).getFileHeader(&meta, "file.txt")

dsn.go

+13
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ const (
2525
defaultRequestTimeout = 0 * time.Second // Timeout for retry for request EXCLUDING clientTimeout
2626
defaultJWTTimeout = 60 * time.Second
2727
defaultExternalBrowserTimeout = 120 * time.Second // Timeout for external browser login
28+
defaultCloudStorageTimeout = -1 // Timeout for calling cloud storage.
2829
defaultMaxRetryCount = 7 // specifies maximum number of subsequent retries
2930
defaultDomain = ".snowflakecomputing.com"
3031
cnDomain = ".snowflakecomputing.cn"
@@ -77,6 +78,7 @@ type Config struct {
7778
ClientTimeout time.Duration // Timeout for network round trip + read out http response
7879
JWTClientTimeout time.Duration // Timeout for network round trip + read out http response used when JWT token auth is taking place
7980
ExternalBrowserTimeout time.Duration // Timeout for external browser login
81+
CloudStorageTimeout time.Duration // Timeout for a single call to a cloud storage provider
8082
MaxRetryCount int // Specifies how many times non-periodic HTTP request can be retried
8183

8284
Application string // application name.
@@ -215,6 +217,9 @@ func DSN(cfg *Config) (dsn string, err error) {
215217
if cfg.ExternalBrowserTimeout != defaultExternalBrowserTimeout {
216218
params.Add("externalBrowserTimeout", strconv.FormatInt(int64(cfg.ExternalBrowserTimeout/time.Second), 10))
217219
}
220+
if cfg.CloudStorageTimeout != defaultCloudStorageTimeout {
221+
params.Add("cloudStorageTimeout", strconv.FormatInt(int64(cfg.CloudStorageTimeout/time.Second), 10))
222+
}
218223
if cfg.MaxRetryCount != defaultMaxRetryCount {
219224
params.Add("maxRetryCount", strconv.Itoa(cfg.MaxRetryCount))
220225
}
@@ -498,6 +503,9 @@ func fillMissingConfigParameters(cfg *Config) error {
498503
if cfg.ExternalBrowserTimeout == 0 {
499504
cfg.ExternalBrowserTimeout = defaultExternalBrowserTimeout
500505
}
506+
if cfg.CloudStorageTimeout == 0 {
507+
cfg.CloudStorageTimeout = defaultCloudStorageTimeout
508+
}
501509
if cfg.MaxRetryCount == 0 {
502510
cfg.MaxRetryCount = defaultMaxRetryCount
503511
}
@@ -714,6 +722,11 @@ func parseDSNParams(cfg *Config, params string) (err error) {
714722
if err != nil {
715723
return err
716724
}
725+
case "cloudStorageTimeout":
726+
cfg.CloudStorageTimeout, err = parseTimeout(value)
727+
if err != nil {
728+
return err
729+
}
717730
case "maxRetryCount":
718731
cfg.MaxRetryCount, err = strconv.Atoi(value)
719732
if err != nil {

0 commit comments

Comments
 (0)