Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
test-enc
__debug*
__debug*
go-pxar
demo.pxar
demo.pcat1
68 changes: 67 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ I wanna begin by giving a special thanks to [https://github.com/tizbac/proxmoxba
- Create PXAR archives to disk
- Support for single-file, multi-file, multi-rootdir archives (via a virtual top-level directory in the archive)
- Support for files, directories, and symlinks
- **Asynchronous archive creation with concurrent file processing**
- Verified 1:1 against the official pxar cli-client

- Create catalog (pcat1) files to disk
Expand All @@ -21,7 +22,7 @@ I wanna begin by giving a special thanks to [https://github.com/tizbac/proxmoxba
- [ ] Misc. Todo
- [X] Convert all paths to abspath in noderef instead of adding relative paths as-is, that way we can determine if a symlink target is in-tree as well.
- [ ] Verify that link targets are in rootpath, possibly modify to convert to relative paths so they work no matter where an archive is unpacked
- [ ] Concurrent Uploads/PXAR Encoding
- [X] Concurrent Uploads/PXAR Encoding
- [ ] Add a `--debug` flag to the CLI to enable debug logging
- [ ] PXAR Archives
- [ ] PXAR Creation
Expand Down Expand Up @@ -55,6 +56,71 @@ I wanna begin by giving a special thanks to [https://github.com/tizbac/proxmoxba
- [ ] Windows Support


## Usage

### Basic Archive Creation

```go
// Create a new PXAR archive
pa := PBSArchive{
Filename: "backup.pxar",
}

// Add folders/files to archive
pa.AddFolder("./my-data")

// Create archive synchronously
buf := bytes.NewBuffer([]byte{})
err := pa.ToBuffer(buf)
if err != nil {
panic(err)
}
```

### Asynchronous Archive Creation

For improved performance with large archives or many files, you can enable asynchronous processing:

```go
// Create a new PXAR archive with async configuration
pa := PBSArchive{
Filename: "backup.pxar",
AsyncMode: true, // Enable async processing
AsyncWorkers: 4, // Use 4 concurrent workers (optional, defaults to CPU count)
}

// Add folders/files to archive
pa.AddFolder("./my-data")

// Create archive asynchronously
buf := bytes.NewBuffer([]byte{})
err := pa.ToBufferAsync(buf)
if err != nil {
panic(err)
}

// Or stream to channel asynchronously
ch := make(chan []byte, 100)
go func() {
for data := range ch {
// Process archive data chunks
processData(data)
}
}()

err = pa.ToChannelAsync(ch)
if err != nil {
panic(err)
}
close(ch)
```

The async implementation provides:
- **Concurrent file processing**: Multiple files are read and processed simultaneously
- **Ordered output**: The PXAR format requirements are maintained with correct ordering
- **Identical results**: Async and sync methods produce byte-for-byte identical archives
- **Configurable workers**: Control the level of concurrency based on your system

## Format Overview
The PXAR format is a tar-like archive format used by Proxmox Backup Server (PBS) to store backups. It is a custom format that is not compatible with the standard tar format.

Expand Down
75 changes: 75 additions & 0 deletions archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"bytes"
"fmt"
"runtime"
"syscall"
"time"

Expand All @@ -20,6 +21,12 @@ type PBSArchiveInterface interface {
// Write to buffer
ToBuffer(buf *bytes.Buffer) error

// Write to buffer asynchronously (concurrent file processing)
ToBufferAsync(buf *bytes.Buffer) error

// Write to channel asynchronously (concurrent file processing)
ToChannelAsync(ch chan []byte) error

// Create catalogue
WriteCatalogue(buf *bytes.Buffer) error
}
Expand All @@ -30,6 +37,12 @@ type PBSArchive struct {

// Filename of the resulting archive
Filename string

// Enable asynchronous archive creation (concurrent file processing)
AsyncMode bool

// Number of workers for async processing (default: number of CPUs)
AsyncWorkers int
}

// Add a top-level folder to the archive
Expand Down Expand Up @@ -178,3 +191,65 @@ func (pa *PBSArchive) WriteCatalogue(buf *bytes.Buffer) error {

return nil
}

// Get the number of workers for async processing
func (pa *PBSArchive) getWorkerCount() int {
if pa.AsyncWorkers > 0 {
return pa.AsyncWorkers
}
return runtime.NumCPU()
}

// Writes the pxar archive to a buffer asynchronously with concurrent file processing.
func (pa *PBSArchive) ToBufferAsync(buf *bytes.Buffer) error {
if len(pa.Trees) == 0 {
return fmt.Errorf("no items to write")
}

// Get the parent node
pos := uint64(0)
topTree, err := pa.GetParentNode()
if err != nil {
return err
}

topTree.IsRoot = true
topTree.Name = pa.Filename + ".didx"

// Use async writing with the specified number of workers
_, err = topTree.WritePayloadAsync(buf, &pos, pa.getWorkerCount())
if err != nil {
return err
}

fmt.Printf("Write buffer (async) finished on pos %d with len %d\r\n", pos, buf.Len())

return nil
}

// Writes the pxar archive to a channel asynchronously with concurrent file processing.
func (pa *PBSArchive) ToChannelAsync(ch chan []byte) error {
if len(pa.Trees) == 0 {
return fmt.Errorf("no items to write")
}

// Get the parent node
pos := uint64(0)
topTree, err := pa.GetParentNode()
if err != nil {
return err
}

topTree.IsRoot = true
topTree.Name = pa.Filename + ".didx"

// Use async writing with the specified number of workers
_, err = topTree.WritePayloadChannelAsync(ch, &pos, pa.getWorkerCount())
if err != nil {
return err
}

fmt.Printf("Write channel (async) finished on pos %d\r\n", pos)

return nil
}
174 changes: 174 additions & 0 deletions async_extended_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package main

import (
"bytes"
"fmt"
"os"
"testing"
"time"
)

// BenchmarkSyncVsAsync compares performance of sync vs async archive creation
func BenchmarkSyncVsAsync(b *testing.B) {
// Create test archive
pa := PBSArchive{
Filename: "benchmark.pxar",
}
pa.AddFolder("./test-enc")

b.Run("Sync", func(b *testing.B) {
for i := 0; i < b.N; i++ {
buf := bytes.NewBuffer([]byte{})
err := pa.ToBuffer(buf)
if err != nil {
b.Fatalf("Failed to create sync archive: %v", err)
}
}
})

b.Run("Async", func(b *testing.B) {
for i := 0; i < b.N; i++ {
buf := bytes.NewBuffer([]byte{})
err := pa.ToBufferAsync(buf)
if err != nil {
b.Fatalf("Failed to create async archive: %v", err)
}
}
})
}

// TestAsyncConfiguration tests different async configuration options
func TestAsyncConfiguration(t *testing.T) {
tests := []struct {
name string
asyncMode bool
asyncWorkers int
expectError bool
}{
{"DefaultSync", false, 0, false},
{"AsyncDefault", true, 0, false},
{"Async1Worker", true, 1, false},
{"Async4Workers", true, 4, false},
{"Async8Workers", true, 8, false},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pa := PBSArchive{
Filename: "test_config.pxar",
AsyncMode: tt.asyncMode,
AsyncWorkers: tt.asyncWorkers,
}
pa.AddFolder("./test-enc")

buf := bytes.NewBuffer([]byte{})
var err error

if tt.asyncMode {
err = pa.ToBufferAsync(buf)
} else {
err = pa.ToBuffer(buf)
}

if tt.expectError && err == nil {
t.Error("Expected error but got none")
} else if !tt.expectError && err != nil {
t.Errorf("Unexpected error: %v", err)
}

if err == nil {
t.Logf("Archive created successfully with %d bytes", buf.Len())
}
})
}
}

// TestAsyncConsistency verifies that async operations are deterministic
func TestAsyncConsistency(t *testing.T) {
pa := PBSArchive{
Filename: "consistency.pxar",
AsyncWorkers: 2,
}
pa.AddFolder("./test-enc")

// Create multiple async archives
var archives [][]byte
for i := 0; i < 5; i++ {
buf := bytes.NewBuffer([]byte{})
err := pa.ToBufferAsync(buf)
if err != nil {
t.Fatalf("Failed to create async archive %d: %v", i, err)
}
archives = append(archives, buf.Bytes())
}

// Verify all archives are identical
reference := archives[0]
for i, archive := range archives[1:] {
if !bytes.Equal(reference, archive) {
t.Errorf("Archive %d differs from reference. Reference len: %d, Archive len: %d",
i+1, len(reference), len(archive))
}
}

t.Logf("All 5 async archives are identical (%d bytes)", len(reference))
}

// TestAsyncWithLargeFiles tests async performance with larger test data
func TestAsyncWithLargeFiles(t *testing.T) {
// Create a temporary directory with larger files
tempDir := "/tmp/go-pxar-large-test"
err := os.MkdirAll(tempDir, 0755)
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tempDir)

// Create some larger test files
testData := make([]byte, 1024*1024) // 1MB of data
for i := range testData {
testData[i] = byte(i % 256)
}

for i := 0; i < 10; i++ {
filename := fmt.Sprintf("%s/largefile%d.dat", tempDir, i)
err := os.WriteFile(filename, testData, 0644)
if err != nil {
t.Fatalf("Failed to create test file %s: %v", filename, err)
}
}

pa := PBSArchive{
Filename: "large_test.pxar",
AsyncWorkers: 4,
}
pa.AddFolder(tempDir)

// Test sync version
syncStart := time.Now()
syncBuf := bytes.NewBuffer([]byte{})
err = pa.ToBuffer(syncBuf)
if err != nil {
t.Fatalf("Failed to create sync archive: %v", err)
}
syncDuration := time.Since(syncStart)

// Test async version
asyncStart := time.Now()
asyncBuf := bytes.NewBuffer([]byte{})
err = pa.ToBufferAsync(asyncBuf)
if err != nil {
t.Fatalf("Failed to create async archive: %v", err)
}
asyncDuration := time.Since(asyncStart)

// Verify results are identical
if !bytes.Equal(syncBuf.Bytes(), asyncBuf.Bytes()) {
t.Errorf("Sync and async archives differ. Sync: %d bytes, Async: %d bytes",
syncBuf.Len(), asyncBuf.Len())
}

t.Logf("Large file test completed successfully")
t.Logf("Sync duration: %v, Async duration: %v", syncDuration, asyncDuration)
t.Logf("Archive size: %d bytes", syncBuf.Len())
}
Loading