@@ -124,16 +124,16 @@ func (scd *snowflakeChunkDownloader) start() error {
124
124
// start downloading chunks if exists
125
125
chunkMetaLen := len (scd .ChunkMetas )
126
126
if chunkMetaLen > 0 {
127
- logger .Debugf ("MaxChunkDownloadWorkers: %v" , MaxChunkDownloadWorkers )
128
- logger .Debugf ("chunks: %v, total bytes: %d" , chunkMetaLen , scd .totalUncompressedSize ())
127
+ logger .WithContext ( scd . ctx ). Debugf ("MaxChunkDownloadWorkers: %v" , MaxChunkDownloadWorkers )
128
+ logger .WithContext ( scd . ctx ). Debugf ("chunks: %v, total bytes: %d" , chunkMetaLen , scd .totalUncompressedSize ())
129
129
scd .ChunksMutex = & sync.Mutex {}
130
130
scd .DoneDownloadCond = sync .NewCond (scd .ChunksMutex )
131
131
scd .Chunks = make (map [int ][]chunkRowType )
132
132
scd .ChunksChan = make (chan int , chunkMetaLen )
133
133
scd .ChunksError = make (chan * chunkError , MaxChunkDownloadWorkers )
134
134
for i := 0 ; i < chunkMetaLen ; i ++ {
135
135
chunk := scd .ChunkMetas [i ]
136
- logger .Debugf ("add chunk to channel ChunksChan: %v, URL: %v, RowCount: %v, UncompressedSize: %v, ChunkResultFormat: %v" ,
136
+ logger .WithContext ( scd . ctx ). Debugf ("add chunk to channel ChunksChan: %v, URL: %v, RowCount: %v, UncompressedSize: %v, ChunkResultFormat: %v" ,
137
137
i + 1 , chunk .URL , chunk .RowCount , chunk .UncompressedSize , scd .QueryResultFormat )
138
138
scd .ChunksChan <- i
139
139
}
@@ -147,11 +147,11 @@ func (scd *snowflakeChunkDownloader) start() error {
147
147
func (scd * snowflakeChunkDownloader ) schedule () {
148
148
select {
149
149
case nextIdx := <- scd .ChunksChan :
150
- logger .Infof ("schedule chunk: %v" , nextIdx + 1 )
150
+ logger .WithContext ( scd . ctx ). Infof ("schedule chunk: %v" , nextIdx + 1 )
151
151
go scd .FuncDownload (scd .ctx , scd , nextIdx )
152
152
default :
153
153
// no more download
154
- logger .Info ("no more download" )
154
+ logger .WithContext ( scd . ctx ). Info ("no more download" )
155
155
}
156
156
}
157
157
@@ -164,15 +164,15 @@ func (scd *snowflakeChunkDownloader) checkErrorRetry() (err error) {
164
164
// add the index to the chunks channel so that the download will be retried.
165
165
go scd .FuncDownload (scd .ctx , scd , errc .Index )
166
166
scd .ChunksErrorCounter ++
167
- logger .Warningf ("chunk idx: %v, err: %v. retrying (%v/%v)..." ,
167
+ logger .WithContext ( scd . ctx ). Warningf ("chunk idx: %v, err: %v. retrying (%v/%v)..." ,
168
168
errc .Index , errc .Error , scd .ChunksErrorCounter , maxChunkDownloaderErrorCounter )
169
169
} else {
170
170
scd .ChunksFinalErrors = append (scd .ChunksFinalErrors , errc )
171
- logger .Warningf ("chunk idx: %v, err: %v. no further retry" , errc .Index , errc .Error )
171
+ logger .WithContext ( scd . ctx ). Warningf ("chunk idx: %v, err: %v. no further retry" , errc .Index , errc .Error )
172
172
return errc .Error
173
173
}
174
174
default :
175
- logger .Info ("no error is detected." )
175
+ logger .WithContext ( scd . ctx ). Info ("no error is detected." )
176
176
}
177
177
return nil
178
178
}
@@ -195,7 +195,7 @@ func (scd *snowflakeChunkDownloader) next() (chunkRowType, error) {
195
195
}
196
196
197
197
for scd .Chunks [scd .CurrentChunkIndex ] == nil {
198
- logger .Debugf ("waiting for chunk idx: %v/%v" ,
198
+ logger .WithContext ( scd . ctx ). Debugf ("waiting for chunk idx: %v/%v" ,
199
199
scd .CurrentChunkIndex + 1 , len (scd .ChunkMetas ))
200
200
201
201
if err := scd .checkErrorRetry (); err != nil {
@@ -207,7 +207,7 @@ func (scd *snowflakeChunkDownloader) next() (chunkRowType, error) {
207
207
// 1) one chunk download finishes or 2) an error occurs.
208
208
scd .DoneDownloadCond .Wait ()
209
209
}
210
- logger .Debugf ("ready: chunk %v" , scd .CurrentChunkIndex + 1 )
210
+ logger .WithContext ( scd . ctx ). Debugf ("ready: chunk %v" , scd .CurrentChunkIndex + 1 )
211
211
scd .CurrentChunk = scd .Chunks [scd .CurrentChunkIndex ]
212
212
scd .ChunksMutex .Unlock ()
213
213
scd .CurrentChunkSize = len (scd .CurrentChunk )
@@ -216,7 +216,7 @@ func (scd *snowflakeChunkDownloader) next() (chunkRowType, error) {
216
216
scd .schedule ()
217
217
}
218
218
219
- logger .Debugf ("no more data" )
219
+ logger .WithContext ( scd . ctx ). Debugf ("no more data" )
220
220
if len (scd .ChunkMetas ) > 0 {
221
221
close (scd .ChunksError )
222
222
close (scd .ChunksChan )
@@ -342,11 +342,11 @@ func (r *largeResultSetReader) Read(p []byte) (n int, err error) {
342
342
}
343
343
344
344
func downloadChunk (ctx context.Context , scd * snowflakeChunkDownloader , idx int ) {
345
- logger .Infof ("download start chunk: %v" , idx + 1 )
345
+ logger .WithContext ( ctx ). Infof ("download start chunk: %v" , idx + 1 )
346
346
defer scd .DoneDownloadCond .Broadcast ()
347
347
348
348
if err := scd .FuncDownloadHelper (ctx , scd , idx ); err != nil {
349
- logger .Errorf (
349
+ logger .WithContext ( ctx ). Errorf (
350
350
"failed to extract HTTP response body. URL: %v, err: %v" , scd .ChunkMetas [idx ].URL , err )
351
351
scd .ChunksError <- & chunkError {Index : idx , Error : err }
352
352
} else if scd .ctx .Err () == context .Canceled || scd .ctx .Err () == context .DeadlineExceeded {
@@ -357,9 +357,9 @@ func downloadChunk(ctx context.Context, scd *snowflakeChunkDownloader, idx int)
357
357
func downloadChunkHelper (ctx context.Context , scd * snowflakeChunkDownloader , idx int ) error {
358
358
headers := make (map [string ]string )
359
359
if len (scd .ChunkHeader ) > 0 {
360
- logger .Debug ("chunk header is provided." )
360
+ logger .WithContext ( ctx ). Debug ("chunk header is provided." )
361
361
for k , v := range scd .ChunkHeader {
362
- logger .Debugf ("adding header: %v, value: %v" , k , v )
362
+ logger .WithContext ( ctx ). Debugf ("adding header: %v, value: %v" , k , v )
363
363
364
364
headers [k ] = v
365
365
}
@@ -374,14 +374,14 @@ func downloadChunkHelper(ctx context.Context, scd *snowflakeChunkDownloader, idx
374
374
}
375
375
bufStream := bufio .NewReader (resp .Body )
376
376
defer resp .Body .Close ()
377
- logger .Debugf ("response returned chunk: %v for URL: %v" , idx + 1 , scd .ChunkMetas [idx ].URL )
377
+ logger .WithContext ( ctx ). Debugf ("response returned chunk: %v for URL: %v" , idx + 1 , scd .ChunkMetas [idx ].URL )
378
378
if resp .StatusCode != http .StatusOK {
379
379
b , err := io .ReadAll (bufStream )
380
380
if err != nil {
381
381
return err
382
382
}
383
- logger .Infof ("HTTP: %v, URL: %v, Body: %v" , resp .StatusCode , scd .ChunkMetas [idx ].URL , b )
384
- logger .Infof ("Header: %v" , resp .Header )
383
+ logger .WithContext ( ctx ). Infof ("HTTP: %v, URL: %v, Body: %v" , resp .StatusCode , scd .ChunkMetas [idx ].URL , b )
384
+ logger .WithContext ( ctx ). Infof ("Header: %v" , resp .Header )
385
385
return & SnowflakeError {
386
386
Number : ErrFailedToGetChunk ,
387
387
SQLState : SQLStateConnectionFailure ,
@@ -463,7 +463,7 @@ func decodeChunk(ctx context.Context, scd *snowflakeChunkDownloader, idx int, bu
463
463
return err
464
464
}
465
465
}
466
- logger .Debugf (
466
+ logger .WithContext ( scd . ctx ). Debugf (
467
467
"decoded %d rows w/ %d bytes in %s (chunk %v)" ,
468
468
scd .ChunkMetas [idx ].RowCount ,
469
469
scd .ChunkMetas [idx ].UncompressedSize ,
0 commit comments