@@ -26,7 +26,6 @@ import (
2626 "os"
2727 "path/filepath"
2828 "sort"
29- "strings"
3029 "sync"
3130 "time"
3231
@@ -165,49 +164,9 @@ func WaitForLogger(dataStore, ns, id string) error {
165164 })
166165}
167166
168- func getContainerWait (ctx context.Context , address string , config * logging.Config ) (<- chan containerd.ExitStatus , error ) {
169- client , err := containerd .New (strings .TrimPrefix (address , "unix://" ), containerd .WithDefaultNamespace (config .Namespace ))
170- if err != nil {
171- return nil , err
172- }
173- con , err := client .LoadContainer (ctx , config .ID )
174- if err != nil {
175- return nil , err
176- }
177-
178- task , err := con .Task (ctx , nil )
179- if err == nil {
180- return task .Wait (ctx )
181- }
182- if ! errdefs .IsNotFound (err ) {
183- return nil , err
184- }
185-
186- // If task was not found, it's possible that the container runtime is still being created.
187- // Retry every 100ms.
188- ticker := time .NewTicker (100 * time .Millisecond )
189- defer ticker .Stop ()
190-
191- for {
192- select {
193- case <- ctx .Done ():
194- return nil , errors .New ("timed out waiting for container task to start" )
195- case <- ticker .C :
196- task , err = con .Task (ctx , nil )
197- if err != nil {
198- if errdefs .IsNotFound (err ) {
199- continue
200- }
201- return nil , err
202- }
203- return task .Wait (ctx )
204- }
205- }
206- }
207-
208167type ContainerWaitFunc func (ctx context.Context , address string , config * logging.Config ) (<- chan containerd.ExitStatus , error )
209168
210- func loggingProcessAdapter (ctx context.Context , driver Driver , dataStore , address string , getContainerWait ContainerWaitFunc , config * logging.Config ) error {
169+ func loggingProcessAdapter (ctx context.Context , driver Driver , dataStore string , config * logging.Config ) error {
211170 if err := driver .PreProcess (ctx , dataStore , config ); err != nil {
212171 return err
213172 }
@@ -220,6 +179,15 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, addres
220179 if err != nil {
221180 return err
222181 }
182+ stdoutChan , err := waitIOClose (config .Stdout )
183+ if err != nil {
184+ return err
185+ }
186+ stderrChan , err := waitIOClose (config .Stderr )
187+ if err != nil {
188+ return err
189+ }
190+
223191 go func () {
224192 <- ctx .Done () // delivered on SIGTERM
225193 stdoutR .Cancel ()
@@ -230,9 +198,7 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, addres
230198 pipeStdoutR , pipeStdoutW := io .Pipe ()
231199 pipeStderrR , pipeStderrW := io .Pipe ()
232200 copyStream := func (reader io.Reader , writer * io.PipeWriter ) {
233- // copy using a buffer of size 32K
234- buf := make ([]byte , 32 << 10 )
235- _ , err := io .CopyBuffer (writer , reader , buf )
201+ _ , err := io .Copy (writer , reader )
236202 if err != nil {
237203 log .G (ctx ).Errorf ("failed to copy stream: %s" , err )
238204 }
@@ -273,13 +239,8 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, addres
273239 // close pipeStdoutW and pipeStderrW upon container exit
274240 defer pipeStdoutW .Close ()
275241 defer pipeStderrW .Close ()
276-
277- exitCh , err := getContainerWait (ctx , address , config )
278- if err != nil {
279- log .G (ctx ).Errorf ("failed to get container task wait channel: %v" , err )
280- return
281- }
282- <- exitCh
242+ <- stdoutChan
243+ <- stderrChan
283244 }()
284245 wg .Wait ()
285246 return driver .PostProcess ()
@@ -314,7 +275,7 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) {
314275 return err
315276 }
316277 // getContainerWait is extracted as parameter to allow mocking in tests.
317- return loggingProcessAdapter (ctx , driver , dataStore , logConfig . Address , getContainerWait , config )
278+ return loggingProcessAdapter (ctx , driver , dataStore , config )
318279 })
319280 } else if ! errors .Is (err , os .ErrNotExist ) {
320281 // the file does not exist if the container was created with nerdctl < 0.20
0 commit comments