Skip to content

Commit 80f0722

Browse files
committed
feature:support live-restore
Signed-off-by: ningmingxiao <[email protected]>
1 parent dfb811b commit 80f0722

File tree

6 files changed

+191
-71
lines changed

6 files changed

+191
-71
lines changed

cmd/nerdctl/container/container_run_restart_linux_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ func TestRunRestart(t *testing.T) {
5353
"--name", testContainerName,
5454
"-p", fmt.Sprintf("127.0.0.1:%d:80", hostPort),
5555
testutil.NginxAlpineImage).AssertOK()
56-
56+
inspectedContainer := base.InspectContainer(testContainerName)
57+
pid := inspectedContainer.State.Pid
5758
check := func(httpGetRetry int) error {
5859
resp, err := nettestutil.HTTPGet(fmt.Sprintf("http://127.0.0.1:%d", hostPort), httpGetRetry, false)
5960
if err != nil {
@@ -87,6 +88,9 @@ func TestRunRestart(t *testing.T) {
8788
}
8889
time.Sleep(sleep)
8990
}
91+
inspectedContainer = base.InspectContainer(testContainerName)
92+
assert.Equal(t, inspectedContainer.State.Status, "running")
93+
assert.Equal(t, inspectedContainer.State.Pid, pid)
9094
base.DumpDaemonLogs(10)
9195
t.Fatalf("the container does not seem to be restarted")
9296
}

pkg/logging/logging.go

Lines changed: 14 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"os"
2727
"path/filepath"
2828
"sort"
29-
"strings"
3029
"sync"
3130
"time"
3231

@@ -165,49 +164,9 @@ func WaitForLogger(dataStore, ns, id string) error {
165164
})
166165
}
167166

168-
func getContainerWait(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error) {
169-
client, err := containerd.New(strings.TrimPrefix(address, "unix://"), containerd.WithDefaultNamespace(config.Namespace))
170-
if err != nil {
171-
return nil, err
172-
}
173-
con, err := client.LoadContainer(ctx, config.ID)
174-
if err != nil {
175-
return nil, err
176-
}
177-
178-
task, err := con.Task(ctx, nil)
179-
if err == nil {
180-
return task.Wait(ctx)
181-
}
182-
if !errdefs.IsNotFound(err) {
183-
return nil, err
184-
}
185-
186-
// If task was not found, it's possible that the container runtime is still being created.
187-
// Retry every 100ms.
188-
ticker := time.NewTicker(100 * time.Millisecond)
189-
defer ticker.Stop()
190-
191-
for {
192-
select {
193-
case <-ctx.Done():
194-
return nil, errors.New("timed out waiting for container task to start")
195-
case <-ticker.C:
196-
task, err = con.Task(ctx, nil)
197-
if err != nil {
198-
if errdefs.IsNotFound(err) {
199-
continue
200-
}
201-
return nil, err
202-
}
203-
return task.Wait(ctx)
204-
}
205-
}
206-
}
207-
208167
type ContainerWaitFunc func(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error)
209168

210-
func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, address string, getContainerWait ContainerWaitFunc, config *logging.Config) error {
169+
func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore string, config *logging.Config) error {
211170
if err := driver.PreProcess(ctx, dataStore, config); err != nil {
212171
return err
213172
}
@@ -220,6 +179,15 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, addres
220179
if err != nil {
221180
return err
222181
}
182+
stdoutChan, err := waitIOClose(config.Stdout)
183+
if err != nil {
184+
return err
185+
}
186+
stderrChan, err := waitIOClose(config.Stderr)
187+
if err != nil {
188+
return err
189+
}
190+
223191
go func() {
224192
<-ctx.Done() // delivered on SIGTERM
225193
stdoutR.Cancel()
@@ -230,9 +198,7 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, addres
230198
pipeStdoutR, pipeStdoutW := io.Pipe()
231199
pipeStderrR, pipeStderrW := io.Pipe()
232200
copyStream := func(reader io.Reader, writer *io.PipeWriter) {
233-
// copy using a buffer of size 32K
234-
buf := make([]byte, 32<<10)
235-
_, err := io.CopyBuffer(writer, reader, buf)
201+
_, err := io.Copy(writer, reader)
236202
if err != nil {
237203
log.G(ctx).Errorf("failed to copy stream: %s", err)
238204
}
@@ -273,13 +239,8 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, addres
273239
// close pipeStdoutW and pipeStderrW upon container exit
274240
defer pipeStdoutW.Close()
275241
defer pipeStderrW.Close()
276-
277-
exitCh, err := getContainerWait(ctx, address, config)
278-
if err != nil {
279-
log.G(ctx).Errorf("failed to get container task wait channel: %v", err)
280-
return
281-
}
282-
<-exitCh
242+
<-stdoutChan
243+
<-stderrChan
283244
}()
284245
wg.Wait()
285246
return driver.PostProcess()
@@ -314,7 +275,7 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) {
314275
return err
315276
}
316277
// getContainerWait is extracted as parameter to allow mocking in tests.
317-
return loggingProcessAdapter(ctx, driver, dataStore, logConfig.Address, getContainerWait, config)
278+
return loggingProcessAdapter(ctx, driver, dataStore, config)
318279
})
319280
} else if !errors.Is(err, os.ErrNotExist) {
320281
// the file does not exist if the container was created with nerdctl < 0.20

pkg/logging/logging_bsd.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
//go:build darwin || freebsd || netbsd || openbsd || dragonfly
2+
// +build darwin freebsd netbsd openbsd dragonfly
3+
4+
/*
5+
Copyright The containerd Authors.
6+
7+
Licensed under the Apache License, Version 2.0 (the "License");
8+
you may not use this file except in compliance with the License.
9+
You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing, software
14+
distributed under the License is distributed on an "AS IS" BASIS,
15+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
See the License for the specific language governing permissions and
17+
limitations under the License.
18+
*/
19+
20+
package logging
21+
22+
import (
23+
"fmt"
24+
"io"
25+
26+
"github.com/muesli/cancelreader"
27+
"golang.org/x/sys/unix"
28+
)
29+
30+
func waitIOClose(reader io.Reader) (chan struct{}, error) {
31+
closeIO := make(chan struct{})
32+
file, ok := reader.(cancelreader.File)
33+
if !ok {
34+
return nil, fmt.Errorf("reader is not an cancelreader.File")
35+
}
36+
37+
kq, err := unix.Kqueue()
38+
if err != nil {
39+
return nil, fmt.Errorf("create kqueue: %w", err)
40+
}
41+
kev := unix.Kevent_t{
42+
Ident: uint64(file.Fd()),
43+
Filter: unix.EVFILT_READ,
44+
Flags: unix.EV_ADD | unix.EV_ENABLE,
45+
}
46+
47+
events := make([]unix.Kevent_t, 1)
48+
_, err = unix.Kevent(kq, []unix.Kevent_t{kev}, events, nil)
49+
if err != nil {
50+
return nil, err
51+
}
52+
go func() {
53+
for {
54+
n, err := unix.Kevent(kq, nil, events, nil)
55+
if err != nil {
56+
continue
57+
}
58+
for i := 0; i < n; i++ {
59+
if events[i].Flags&unix.EV_EOF != 0 {
60+
close(closeIO)
61+
break
62+
}
63+
}
64+
}
65+
}()
66+
return closeIO, nil
67+
}

pkg/logging/logging_linux.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
//go:build linux
2+
// +build linux
3+
4+
/*
5+
Copyright The containerd Authors.
6+
7+
Licensed under the Apache License, Version 2.0 (the "License");
8+
you may not use this file except in compliance with the License.
9+
You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing, software
14+
distributed under the License is distributed on an "AS IS" BASIS,
15+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
See the License for the specific language governing permissions and
17+
limitations under the License.
18+
*/
19+
20+
package logging
21+
22+
import (
23+
"fmt"
24+
"io"
25+
"os"
26+
27+
"golang.org/x/sys/unix"
28+
)
29+
30+
func waitIOClose(reader io.Reader) (chan struct{}, error) {
31+
closeIO := make(chan struct{})
32+
epfd, err := unix.EpollCreate1(0)
33+
if err != nil {
34+
return nil, err
35+
}
36+
file, ok := reader.(*os.File)
37+
if !ok {
38+
return nil, fmt.Errorf("reader is not an cancelreader.File")
39+
}
40+
fd := file.Fd()
41+
event := unix.EpollEvent{
42+
Events: unix.EPOLLHUP,
43+
Fd: int32(fd),
44+
}
45+
if err := unix.EpollCtl(epfd, unix.EPOLL_CTL_ADD, int(fd), &event); err != nil {
46+
return nil, err
47+
}
48+
events := make([]unix.EpollEvent, 1)
49+
go func() {
50+
for {
51+
n, err := unix.EpollWait(epfd, events, -1)
52+
if err != nil {
53+
continue
54+
}
55+
for i := 0; i < n; i++ {
56+
if events[i].Events&unix.EPOLLHUP != 0 {
57+
close(closeIO)
58+
return
59+
}
60+
}
61+
}
62+
}()
63+
return closeIO, nil
64+
}

pkg/logging/logging_test.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,13 @@ package logging
1818

1919
import (
2020
"bufio"
21-
"bytes"
2221
"context"
2322
"math/rand"
23+
"os"
2424
"strings"
2525
"testing"
2626
"time"
2727

28-
containerd "github.com/containerd/containerd/v2/client"
2928
"github.com/containerd/containerd/v2/core/runtime/v2/logging"
3029
)
3130

@@ -68,37 +67,38 @@ func TestLoggingProcessAdapter(t *testing.T) {
6867

6968
// Prepare mock driver and logging config
7069
driver := &MockDriver{}
71-
stdoutBuffer := bytes.NewBufferString(normalString)
72-
stderrBuffer := bytes.NewBufferString(hugeString)
70+
stdoutReader, stdoutWriter, _ := os.Pipe()
71+
stderrReader, stderrWriter, _ := os.Pipe()
7372
config := &logging.Config{
74-
Stdout: stdoutBuffer,
75-
Stderr: stderrBuffer,
73+
Stdout: stdoutReader,
74+
Stderr: stderrReader,
7675
}
77-
7876
// Execute the logging process adapter
7977
ctx, cancel := context.WithCancel(context.Background())
8078
defer cancel()
8179

82-
var getContainerWaitMock ContainerWaitFunc = func(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error) {
83-
exitChan := make(chan containerd.ExitStatus, 1)
80+
go func() {
81+
stdoutWriter.Write([]byte(normalString))
82+
}()
83+
go func() {
84+
stderrWriter.Write([]byte(hugeString))
85+
}()
86+
87+
go func() {
8488
time.Sleep(50 * time.Millisecond)
85-
exitChan <- containerd.ExitStatus{}
86-
return exitChan, nil
87-
}
89+
stdoutWriter.Close()
90+
stderrWriter.Close()
91+
}()
8892

89-
err := loggingProcessAdapter(ctx, driver, "testDataStore", "", getContainerWaitMock, config)
93+
err := loggingProcessAdapter(ctx, driver, "testDataStore", config)
9094
if err != nil {
9195
t.Fatal(err)
9296
}
9397

94-
// let bufio read the buffer
95-
time.Sleep(50 * time.Millisecond)
96-
9798
// Verify that the driver methods were called
9899
if !driver.processed {
99100
t.Fatal("process should be processed")
100101
}
101-
102102
// Verify that the driver received the expected data
103103
stdout := strings.Join(driver.receivedStdout, "\n")
104104
stderr := strings.Join(driver.receivedStderr, "\n")

pkg/logging/logging_windows.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package logging
18+
19+
import "io"
20+
21+
// TODO: support windows
22+
func waitIOClose(reader io.Reader) (chan struct{}, error) {
23+
return nil, nil
24+
}

0 commit comments

Comments
 (0)