Skip to content

Commit 777b678

Browse files
authored
Merge pull request #5 from codex-storage/feat/add-context-cancellation
feat: context cancellation
2 parents 277f856 + e024905 commit 777b678

File tree

15 files changed

+315
-229
lines changed

15 files changed

+315
-229
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,7 @@ nimcache
1919

2020
# Test files
2121
codex/testdata/hello.downloaded.txt
22-
codex/testdata/hello.downloaded.writer.txt
22+
codex/testdata/hello.downloaded.writer.txt
23+
24+
# Bin
25+
codex-go

.vscode/settings.json

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
{
22
"go.toolsEnvVars": {
3+
"CGO_ENABLED": "1",
34
"CGO_CFLAGS": "-I${workspaceFolder}/vendor/nim-codex/library",
4-
"CGO_LDFLAGS": "-L${workspaceFolder}/vendor/nim-codex/build -Wl,-rpath,${workspaceFolder}/vendor/nim-codex/build",
5+
"CGO_LDFLAGS": "-L${workspaceFolder}/vendor/nim-codex/build -lcodex -Wl,-rpath,${workspaceFolder}/vendor/nim-codex/build",
56
"LD_LIBRARY_PATH": "${workspaceFolder}/vendor/nim-codex/build:${env:LD_LIBRARY_PATH}"
6-
}
7+
},
8+
"go.testTimeout": "2m"
79
}

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ build:
2929

3030
test:
3131
@echo "Running tests..."
32-
CGO_ENABLED=1 CGO_CFLAGS="$(CGO_CFLAGS)" CGO_LDFLAGS="$(CGO_LDFLAGS)" go test ./...
32+
CGO_ENABLED=1 CGO_CFLAGS="$(CGO_CFLAGS)" CGO_LDFLAGS="$(CGO_LDFLAGS)" GOTESTFLAGS="-timeout=2m" go test ./...
3333

3434
clean:
3535
@echo "Cleaning up..."

README.md

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ Now the module is ready for use in your project.
9898

9999
The release process is defined [here](./RELEASE.md).
100100

101-
## Usage
101+
## API
102102

103103
### Init
104104

@@ -171,11 +171,10 @@ buf := bytes.NewBuffer([]byte("Hello World!"))
171171
onProgress := func(read, total int, percent float64, err error) {
172172
// Do something with the data
173173
}
174-
cid, err := codex.UploadReader(UploadOptions{filepath: "hello.txt", onProgress: onProgress}, buf)
174+
ctx := context.Background()
175+
cid, err := codex.UploadReader(ctx, UploadOptions{filepath: "hello.txt", onProgress: onProgress}, buf)
175176
```
176177

177-
Caveat: once started, the upload cannot be cancelled.
178-
179178
#### file
180179

181180
The `file` strategy allows you to upload a file on Codex using the path.
@@ -189,11 +188,10 @@ The `UploadFile` returns the cid of the content uploaded.
189188
onProgress := func(read, total int, percent float64, err error) {
190189
// Do something with the data
191190
}
192-
cid, err := codex.UploadFile(UploadOptions{filepath: "./testdata/hello.txt", onProgress: onProgress})
191+
ctx := context.Background()
192+
cid, err := codex.UploadFile(ctx, UploadOptions{filepath: "./testdata/hello.txt", onProgress: onProgress})
193193
```
194194

195-
Caveat: once started, the upload cannot be cancelled.
196-
197195
#### chunks
198196

199197
The `chunks` strategy allows you to manage the upload by yourself. It requires more code
@@ -246,11 +244,10 @@ opt := DownloadStreamOptions{
246244
// Handle progress
247245
},
248246
}
249-
err := codex.DownloadStream(cid, opt)
247+
ctx := context.Background()
248+
err := codex.DownloadStream(ctx, cid, opt)
250249
```
251250

252-
Caveat: once started, the download cannot be cancelled.
253-
254251
#### chunks
255252

256253
The `chunks` strategy allows to manage the download by yourself. It requires more code
@@ -310,4 +307,10 @@ peerId := "..."
310307
record, err := node.CodexPeerDebug(peerId)
311308
```
312309

313-
`CodexPeerDebug` is only available if you built with `-d:codex_enable_api_debug_peers=true` flag.
310+
`CodexPeerDebug` is only available if you built with `-d:codex_enable_api_debug_peers=true` flag.
311+
312+
### Context and cancellation
313+
314+
Go contexts are exposed only on the long-running operations as `UploadReader`, `UploadFile`, and `DownloadFile`. If the
315+
context is cancelled, those methods cancel the active upload or download. Short lived API calls don’t take a context
316+
because they usually finish before a cancellation signal could matter.

codex/codex.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ const (
9292

9393
type Config struct {
9494
// Default: INFO
95-
LogLevel LogLevel `json:"log-level,omitempty"`
95+
LogLevel string `json:"log-level,omitempty"`
9696

9797
// Specifies what kind of logs should be written to stdout
9898
// Default: auto
@@ -280,8 +280,12 @@ func (node CodexNode) Destroy() error {
280280
return bridge.callError("cGoCodexDestroy")
281281
}
282282

283-
_, err = bridge.wait()
284-
return err
283+
// We don't wait for the bridge here.
284+
// The destroy function does not call the worker thread,
285+
// it destroys the context directly and return the return
286+
// value synchronously.
287+
288+
return nil
285289
}
286290

287291
// Version returns the version of the Codex node.

codex/codex_test.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@ package codex
33
import "testing"
44

55
func TestCodexVersion(t *testing.T) {
6-
node := newCodexNode(t, withNoStart())
6+
config := defaultConfigHelper(t)
7+
node, err := New(config)
8+
if err != nil {
9+
t.Fatalf("Failed to create Codex node: %v", err)
10+
}
711

812
version, err := node.Version()
913
if err != nil {
@@ -17,7 +21,11 @@ func TestCodexVersion(t *testing.T) {
1721
}
1822

1923
func TestCodexRevision(t *testing.T) {
20-
node := newCodexNode(t, withNoStart())
24+
config := defaultConfigHelper(t)
25+
node, err := New(config)
26+
if err != nil {
27+
t.Fatalf("Failed to create Codex node: %v", err)
28+
}
2129

2230
revision, err := node.Revision()
2331
if err != nil {

codex/debug_test.go

Lines changed: 23 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -32,45 +32,31 @@ func TestUpdateLogLevel(t *testing.T) {
3232
}
3333
defer os.Remove(tmpFile.Name())
3434

35-
node, err := New(Config{
36-
LogFile: tmpFile.Name(),
37-
MetricsEnabled: false,
35+
node := newCodexNode(t, Config{
36+
LogLevel: "INFO",
37+
LogFile: tmpFile.Name(),
38+
LogFormat: LogFormatNoColors,
3839
})
39-
if err != nil {
40-
t.Fatalf("Failed to create Codex node: %v", err)
41-
}
42-
43-
t.Cleanup(func() {
44-
if err := node.Stop(); err != nil {
45-
t.Logf("cleanup codex: %v", err)
46-
}
47-
48-
if err := node.Destroy(); err != nil {
49-
t.Logf("cleanup codex: %v", err)
50-
}
51-
})
52-
53-
if err := node.Start(); err != nil {
54-
t.Fatalf("Failed to start Codex node: %v", err)
55-
}
5640

5741
content, err := os.ReadFile(tmpFile.Name())
42+
5843
if err != nil {
5944
t.Fatalf("Failed to read log file: %v", err)
6045
}
61-
if !strings.Contains(string(content), "Started codex node") {
62-
t.Errorf("Log file does not contain 'Started codex node' %s", string(content))
63-
}
64-
65-
if err := node.Stop(); err != nil {
66-
t.Fatalf("Failed to stop Codex node: %v", err)
46+
if !strings.Contains(string(content), "INF") {
47+
t.Errorf("Log file does not contain INFO statement %s", string(content))
6748
}
6849

6950
err = node.UpdateLogLevel("ERROR")
7051
if err != nil {
7152
t.Fatalf("UpdateLogLevel call failed: %v", err)
7253
}
7354

55+
if err := node.Stop(); err != nil {
56+
t.Fatalf("Failed to stop Codex node: %v", err)
57+
}
58+
59+
// Clear the file
7460
if err := os.WriteFile(tmpFile.Name(), []byte{}, 0644); err != nil {
7561
t.Fatalf("Failed to clear log file: %v", err)
7662
}
@@ -85,58 +71,18 @@ func TestUpdateLogLevel(t *testing.T) {
8571
t.Fatalf("Failed to read log file: %v", err)
8672
}
8773

88-
if strings.Contains(string(content), "Starting discovery node") {
89-
t.Errorf("Log file contains 'Starting discovery node'")
74+
if strings.Contains(string(content), "INF") {
75+
t.Errorf("Log file contains INFO statement after log level update: %s", string(content))
9076
}
9177
}
9278

9379
func TestCodexPeerDebug(t *testing.T) {
9480
var bootstrap, node1, node2 *CodexNode
9581
var err error
9682

97-
t.Cleanup(func() {
98-
if bootstrap != nil {
99-
if err := bootstrap.Stop(); err != nil {
100-
t.Logf("cleanup bootstrap: %v", err)
101-
}
102-
103-
if err := bootstrap.Destroy(); err != nil {
104-
t.Logf("cleanup bootstrap: %v", err)
105-
}
106-
}
107-
if node1 != nil {
108-
if err := node1.Stop(); err != nil {
109-
t.Logf("cleanup node1: %v", err)
110-
}
111-
112-
if err := node1.Destroy(); err != nil {
113-
t.Logf("cleanup node1: %v", err)
114-
}
115-
}
116-
if node2 != nil {
117-
if err := node2.Stop(); err != nil {
118-
t.Logf("cleanup node2: %v", err)
119-
}
120-
121-
if err := node2.Destroy(); err != nil {
122-
t.Logf("cleanup node2: %v", err)
123-
}
124-
}
125-
})
126-
127-
bootstrap, err = New(Config{
128-
DataDir: t.TempDir(),
129-
LogFormat: LogFormatNoColors,
130-
MetricsEnabled: false,
131-
DiscoveryPort: 8092,
83+
bootstrap = newCodexNode(t, Config{
84+
DiscoveryPort: 8092,
13285
})
133-
if err != nil {
134-
t.Fatalf("Failed to create bootstrap: %v", err)
135-
}
136-
137-
if err := bootstrap.Start(); err != nil {
138-
t.Fatalf("Failed to start bootstrap: %v", err)
139-
}
14086

14187
spr, err := bootstrap.Spr()
14288
if err != nil {
@@ -145,35 +91,15 @@ func TestCodexPeerDebug(t *testing.T) {
14591

14692
bootstrapNodes := []string{spr}
14793

148-
node1, err = New(Config{
149-
DataDir: t.TempDir(),
150-
LogFormat: LogFormatNoColors,
151-
MetricsEnabled: false,
94+
node1 = newCodexNode(t, Config{
15295
DiscoveryPort: 8090,
15396
BootstrapNodes: bootstrapNodes,
15497
})
155-
if err != nil {
156-
t.Fatalf("Failed to create codex: %v", err)
157-
}
15898

159-
if err := node1.Start(); err != nil {
160-
t.Fatalf("Failed to start codex: %v", err)
161-
}
162-
163-
node2, err = New(Config{
164-
DataDir: t.TempDir(),
165-
LogFormat: LogFormatNoColors,
166-
MetricsEnabled: false,
99+
node2 = newCodexNode(t, Config{
167100
DiscoveryPort: 8091,
168101
BootstrapNodes: bootstrapNodes,
169102
})
170-
if err != nil {
171-
t.Fatalf("Failed to create codex2: %v", err)
172-
}
173-
174-
if err := node2.Start(); err != nil {
175-
t.Fatalf("Failed to start codex2: %v", err)
176-
}
177103

178104
peerId, err := node2.PeerId()
179105
if err != nil {
@@ -186,9 +112,14 @@ func TestCodexPeerDebug(t *testing.T) {
186112
if err == nil {
187113
break
188114
}
115+
189116
time.Sleep(1 * time.Second)
190117
}
191118

119+
if err != nil {
120+
t.Fatalf("CodexPeerDebug call failed: %v", err)
121+
}
122+
192123
if record.PeerId == "" {
193124
t.Fatalf("CodexPeerDebug call failed: %v", err)
194125
}

codex/download.go

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ package codex
2626
*/
2727
import "C"
2828
import (
29+
"context"
2930
"encoding/json"
31+
"fmt"
3032
"io"
3133
"unsafe"
3234
)
@@ -145,7 +147,7 @@ func (node CodexNode) DownloadManifest(cid string) (Manifest, error) {
145147
// If options.writer is set, the data will be written into that writer.
146148
// The options filepath and writer are not mutually exclusive, i.e you can write
147149
// in different places in a same call.
148-
func (node CodexNode) DownloadStream(cid string, options DownloadStreamOptions) error {
150+
func (node CodexNode) DownloadStream(ctx context.Context, cid string, options DownloadStreamOptions) error {
149151
bridge := newBridgeCtx()
150152
defer bridge.free()
151153

@@ -189,6 +191,16 @@ func (node CodexNode) DownloadStream(cid string, options DownloadStreamOptions)
189191
var cCid = C.CString(cid)
190192
defer C.free(unsafe.Pointer(cCid))
191193

194+
err := node.DownloadInit(cid, DownloadInitOptions{
195+
ChunkSize: options.ChunkSize,
196+
Local: options.Local,
197+
})
198+
if err != nil {
199+
return err
200+
}
201+
202+
defer node.DownloadCancel(cid)
203+
192204
var cFilepath = C.CString(options.Filepath)
193205
defer C.free(unsafe.Pointer(cFilepath))
194206

@@ -198,8 +210,39 @@ func (node CodexNode) DownloadStream(cid string, options DownloadStreamOptions)
198210
return bridge.callError("cGoCodexDownloadLocal")
199211
}
200212

201-
_, err := bridge.wait()
202-
return err
213+
// Create a done channel to signal the goroutine to stop
214+
// when the download is complete and avoid goroutine leaks.
215+
done := make(chan struct{})
216+
defer close(done)
217+
218+
channelError := make(chan error, 1)
219+
go func() {
220+
select {
221+
case <-ctx.Done():
222+
channelError <- node.DownloadCancel(cid)
223+
case <-done:
224+
// Nothing to do, download finished
225+
}
226+
}()
227+
228+
_, err = bridge.wait()
229+
230+
// Extract the potential cancellation error
231+
var cancelError error
232+
select {
233+
case cancelError = <-channelError:
234+
default:
235+
}
236+
237+
if err != nil {
238+
if cancelError != nil {
239+
return fmt.Errorf("download canceled: %v, but failed to cancel download session: %v", ctx.Err(), cancelError)
240+
}
241+
242+
return err
243+
}
244+
245+
return cancelError
203246
}
204247

205248
// DownloadInit initializes the download process for a specific CID.

0 commit comments

Comments
 (0)