Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ nydus-static/
.goreleaser.yml
metadata.db
tests/texture/zran/233c72f2b6b698c07021c4da367cfe2dff4f049efbaa885ca0ff760ea297865a
vendor
35 changes: 35 additions & 0 deletions contrib/nydusify/cmd/nydusify.go
Original file line number Diff line number Diff line change
Expand Up @@ -1458,6 +1458,32 @@ func main() {
Usage: "Skip verifying server certs for HTTPS target registry",
EnvVars: []string{"TARGET_INSECURE"},
},

&cli.StringFlag{
Name: "backend-type",
Value: "",
Usage: "Type of storage backend for blobs, possible values: 'oss', 's3'",
EnvVars: []string{"BACKEND_TYPE"},
},
&cli.StringFlag{
Name: "backend-config",
Value: "",
Usage: "Json configuration string for storage backend",
EnvVars: []string{"BACKEND_CONFIG"},
},
&cli.PathFlag{
Name: "backend-config-file",
Value: "",
TakesFile: true,
Usage: "Json configuration file for storage backend",
EnvVars: []string{"BACKEND_CONFIG_FILE"},
},
&cli.BoolFlag{
Name: "backend-force-push",
Value: false, Usage: "Force to push Nydus blobs even if they already exist in storage backend",
EnvVars: []string{"BACKEND_FORCE_PUSH"},
},

&cli.IntFlag{
Name: "maximum-times",
Required: false,
Expand Down Expand Up @@ -1494,6 +1520,11 @@ func main() {
return withPaths, withoutPaths
}

backendType, backendConfig, err := getBackendConfig(c, "", false)
if err != nil {
return err
}

withPaths, withoutPaths := parsePaths(c.StringSlice("with-path"))
opt := committer.Opt{
WorkDir: c.String("work-dir"),
Expand All @@ -1507,6 +1538,10 @@ func main() {
MaximumTimes: c.Int("maximum-times"),
WithPaths: withPaths,
WithoutPaths: withoutPaths,

BackendType: backendType,
BackendConfig: backendConfig,
BackendForcePush: c.Bool("backend-force-push"),
}
cm, err := committer.NewCommitter(opt)
if err != nil {
Expand Down
30 changes: 22 additions & 8 deletions contrib/nydusify/pkg/backend/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,20 @@ type S3Backend struct {
bucketName string
endpointWithScheme string
client *s3.Client
numUploadThreads int
partSize int
}

type S3Config struct {
AccessKeyID string `json:"access_key_id,omitempty"`
AccessKeySecret string `json:"access_key_secret,omitempty"`
Endpoint string `json:"endpoint,omitempty"`
Scheme string `json:"scheme,omitempty"`
BucketName string `json:"bucket_name,omitempty"`
Region string `json:"region,omitempty"`
ObjectPrefix string `json:"object_prefix,omitempty"`
AccessKeyID string `json:"access_key_id,omitempty"`
AccessKeySecret string `json:"access_key_secret,omitempty"`
Endpoint string `json:"endpoint,omitempty"`
Scheme string `json:"scheme,omitempty"`
BucketName string `json:"bucket_name,omitempty"`
Region string `json:"region,omitempty"`
ObjectPrefix string `json:"object_prefix,omitempty"`
NumUploadThreads *int `json:"num_upload_threads,omitempty"`
PartSize *int `json:"part_size,omitempty"`
}

func newS3Backend(rawConfig []byte) (*S3Backend, error) {
Expand All @@ -71,6 +75,13 @@ func newS3Backend(rawConfig []byte) (*S3Backend, error) {
return nil, errors.Wrap(err, "load default AWS config")
}

if cfg.NumUploadThreads == nil {
cfg.NumUploadThreads = aws.Int(5)
}
if cfg.PartSize == nil {
cfg.PartSize = aws.Int(multipartChunkSize)
}

client := s3.NewFromConfig(s3AWSConfig, func(o *s3.Options) {
o.BaseEndpoint = &endpointWithScheme
o.Region = cfg.Region
Expand All @@ -86,6 +97,8 @@ func newS3Backend(rawConfig []byte) (*S3Backend, error) {
bucketName: cfg.BucketName,
endpointWithScheme: endpointWithScheme,
client: client,
numUploadThreads: *cfg.NumUploadThreads,
partSize: *cfg.PartSize,
}, nil
}

Expand Down Expand Up @@ -113,7 +126,8 @@ func (b *S3Backend) Upload(ctx context.Context, blobID, blobPath string, size in
defer blobFile.Close()

uploader := manager.NewUploader(b.client, func(u *manager.Uploader) {
u.PartSize = multipartChunkSize
u.PartSize = int64(b.partSize)
u.Concurrency = b.numUploadThreads
})
_, err = uploader.Upload(ctx, &s3.PutObjectInput{
Bucket: aws.String(b.bucketName),
Expand Down
128 changes: 101 additions & 27 deletions contrib/nydusify/pkg/committer/commiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"

"github.com/dragonflyoss/nydus/contrib/nydusify/pkg/backend"
"github.com/dragonflyoss/nydus/contrib/nydusify/pkg/committer/diff"
parserPkg "github.com/dragonflyoss/nydus/contrib/nydusify/pkg/parser"
"github.com/dragonflyoss/nydus/contrib/nydusify/pkg/provider"
Expand All @@ -55,6 +56,10 @@ type Opt struct {

WithPaths []string
WithoutPaths []string

BackendType string
BackendConfig string
BackendForcePush bool
}

type Committer struct {
Expand Down Expand Up @@ -124,7 +129,7 @@ func (cm *Committer) Commit(ctx context.Context, opt Opt) error {
for idx, layer := range image.Manifest.Layers {
if layer.MediaType == utils.MediaTypeNydusBlob {
name := fmt.Sprintf("blob-mount-%d", idx)
if _, err := cm.pushBlob(ctx, name, layer.Digest, originalSourceRef, targetRef, opt.TargetInsecure, image); err != nil {
if _, err := cm.pushBlob(ctx, name, layer.Digest, originalSourceRef, targetRef, opt.TargetInsecure, image, opt); err != nil {
return errors.Wrap(err, "push lower blob")
}
}
Expand All @@ -146,7 +151,7 @@ func (cm *Committer) Commit(ctx context.Context, opt Opt) error {
}
logrus.Infof("pushing blob for upper")
start := time.Now()
upperBlobDesc, err := cm.pushBlob(ctx, "blob-upper", *upperBlobDigest, originalSourceRef, targetRef, opt.TargetInsecure, image)
upperBlobDesc, err := cm.pushBlob(ctx, "blob-upper", *upperBlobDigest, originalSourceRef, targetRef, opt.TargetInsecure, image, opt)
if err != nil {
return errors.Wrap(err, "push upper blob")
}
Expand All @@ -173,7 +178,7 @@ func (cm *Committer) Commit(ctx context.Context, opt Opt) error {
}
logrus.Infof("pushing blob for mount")
start := time.Now()
mountBlobDesc, err := cm.pushBlob(ctx, name, *mountBlobDigest, originalSourceRef, targetRef, opt.TargetInsecure, image)
mountBlobDesc, err := cm.pushBlob(ctx, name, *mountBlobDigest, originalSourceRef, targetRef, opt.TargetInsecure, image, opt)
if err != nil {
return errors.Wrap(err, "push mount blob")
}
Expand Down Expand Up @@ -211,7 +216,7 @@ func (cm *Committer) Commit(ctx context.Context, opt Opt) error {
}
logrus.Infof("pushing blob for appended mount")
start := time.Now()
mountBlobDesc, err := cm.pushBlob(ctx, name, *mountBlobDigest, originalSourceRef, targetRef, opt.TargetInsecure, image)
mountBlobDesc, err := cm.pushBlob(ctx, name, *mountBlobDigest, originalSourceRef, targetRef, opt.TargetInsecure, image, opt)
if err != nil {
return errors.Wrap(err, "push appended mount blob")
}
Expand Down Expand Up @@ -249,7 +254,7 @@ func (cm *Committer) Commit(ctx context.Context, opt Opt) error {
}

logrus.Infof("pushing committed image to %s", targetRef)
if err := cm.pushManifest(ctx, *image, *bootstrapDiffID, targetRef, "bootstrap-merged.tar", opt.FsVersion, upperBlob, mountBlobs, opt.TargetInsecure); err != nil {
if err := cm.pushManifest(ctx, *image, *bootstrapDiffID, targetRef, "bootstrap-merged.tar", opt.FsVersion, upperBlob, mountBlobs, opt.TargetInsecure, opt); err != nil {
return errors.Wrap(err, "push manifest")
}

Expand Down Expand Up @@ -368,13 +373,20 @@ func getDistributionSourceLabel(sourceRef string) (string, string) {
return labelKey, labelValue
}

// pushBlob pushes a blob to the target registry
func (cm *Committer) pushBlob(ctx context.Context, blobName string, blobDigest digest.Digest, sourceRef string, targetRef string, insecure bool, image *parserPkg.Image) (*ocispec.Descriptor, error) {
// pushBlob pushes a blob to the target registry or backend storage
func (cm *Committer) pushBlob(ctx context.Context, blobName string, blobDigest digest.Digest, sourceRef string, targetRef string, insecure bool, image *parserPkg.Image, opt Opt) (*ocispec.Descriptor, error) {
logrus.Infof("pushing blob: %s, digest: %s", blobName, blobDigest)

targetRemoter, err := provider.DefaultRemote(targetRef, insecure)
if err != nil {
return nil, errors.Wrap(err, "create target remote")
// Check if backend storage is configured for blobs
var blobBackend backend.Backend
var err error

if opt.BackendType != "" && strings.TrimSpace(opt.BackendConfig) != "" {
blobBackend, err = createBlobBackend(opt.BackendType, opt.BackendConfig)
if err != nil {
return nil, errors.Wrap(err, "create blob backend")
}
logrus.Debugf("using backend storage for blobs: %s", opt.BackendType)
}

// Check if this is a lower blob (starts with "blob-mount-" but not in workDir)
Expand Down Expand Up @@ -486,16 +498,60 @@ func (cm *Committer) pushBlob(ctx context.Context, blobName string, blobDigest d

logrus.Debugf("pushing blob: digest=%s, size=%d", blobDesc.Digest, blobDesc.Size)

if err := targetRemoter.Push(ctx, blobDesc, true, reader); err != nil {
if utils.RetryWithHTTP(err) {
targetRemoter.MaybeWithHTTP(err)
logrus.Debugf("retrying push with HTTP")
if err := targetRemoter.Push(ctx, blobDesc, true, reader); err != nil {
return nil, errors.Wrap(err, "push blob with HTTP")
// Push to backend storage if configured, otherwise use registry
if blobBackend != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the previous interface definition enforcing the use of paths wasn't very reasonable, which led to this workaround. It caused an extra disk write, resulting in additional overhead. Perhaps in the future, we could consider revising this backend's Upload interface to accept a reader instead.

// For backend storage, we need to save the blob to a file first (for local blobs, reuse existing path)
var finalBlobPath string
if isLowerBlob {
// For lower blobs, we need to create a temporary file
tempFile, err := os.CreateTemp(cm.workDir, fmt.Sprintf("temp-blob-%s-*", blobDigest.Encoded()[:12]))
if err != nil {
return nil, errors.Wrap(err, "create temp file for lower blob")
}
finalBlobPath = tempFile.Name()
defer os.Remove(finalBlobPath)

// Copy the reader to the temp file
if _, err := io.Copy(tempFile, reader); err != nil {
tempFile.Close()
return nil, errors.Wrap(err, "copy lower blob to temp file")
}
tempFile.Close()
} else {
return nil, errors.Wrap(err, "push blob")
finalBlobPath = blobPath
}

// Upload to backend storage (use hex digest without sha256: prefix)
_, err := blobBackend.Upload(ctx, blobDigest.Hex(), finalBlobPath, blobDesc.Size, opt.BackendForcePush)
if err != nil {
return nil, errors.Wrap(err, "upload blob to backend storage")
}

// Finalize the backend upload
if err := blobBackend.Finalize(false); err != nil {
return nil, errors.Wrap(err, "finalize blob backend upload")
}

logrus.Infof("successfully pushed blob to backend storage: %s", blobDigest)
} else {
// Use registry storage
targetRemoter, err := provider.DefaultRemote(targetRef, insecure)
if err != nil {
return nil, errors.Wrap(err, "create target remote")
}

if err := targetRemoter.Push(ctx, blobDesc, true, reader); err != nil {
if utils.RetryWithHTTP(err) {
targetRemoter.MaybeWithHTTP(err)
logrus.Debugf("retrying push with HTTP")
if err := targetRemoter.Push(ctx, blobDesc, true, reader); err != nil {
return nil, errors.Wrap(err, "push blob with HTTP")
}
} else {
return nil, errors.Wrap(err, "push blob")
}
}
logrus.Infof("successfully pushed blob to registry: %s", blobDigest)
}

if closeErr != nil {
Expand Down Expand Up @@ -541,7 +597,8 @@ func (cm *Committer) syncFilesystem(ctx context.Context, containerID string) err

stderr, err := config.ExecuteContext(ctx, io.Discard, "sync")
if err != nil {
return errors.Wrap(err, fmt.Sprintf("execute sync in container namespace: %s", strings.TrimSpace(stderr)))
// Warn, as sync can be unavailable in some container environments (i.e. gvisor)
logrus.Warnf("execute sync in container namespace: %s", strings.TrimSpace(stderr))
}

// Also sync the host filesystem to ensure overlay changes are written
Expand All @@ -554,7 +611,7 @@ func (cm *Committer) syncFilesystem(ctx context.Context, containerID string) err
}

func (cm *Committer) pushManifest(
ctx context.Context, nydusImage parserPkg.Image, bootstrapDiffID digest.Digest, targetRef, bootstrapName, fsversion string, upperBlob *Blob, mountBlobs []Blob, insecure bool,
ctx context.Context, nydusImage parserPkg.Image, bootstrapDiffID digest.Digest, targetRef, bootstrapName, fsversion string, upperBlob *Blob, mountBlobs []Blob, insecure bool, opt Opt,
) error {
lowerBlobLayers := []ocispec.Descriptor{}
for idx := range nydusImage.Manifest.Layers {
Expand All @@ -571,11 +628,15 @@ func (cm *Committer) pushManifest(
for idx := range lowerBlobLayers {
config.RootFS.DiffIDs = append(config.RootFS.DiffIDs, lowerBlobLayers[idx].Digest)
}
for idx := range mountBlobs {
mountBlob := mountBlobs[idx]
config.RootFS.DiffIDs = append(config.RootFS.DiffIDs, mountBlob.Desc.Digest)

// When using S3 backend, skip adding new blob DiffIDs since they won't be in manifest
if opt.BackendType != "registry" {
for idx := range mountBlobs {
mountBlob := mountBlobs[idx]
config.RootFS.DiffIDs = append(config.RootFS.DiffIDs, mountBlob.Desc.Digest)
}
config.RootFS.DiffIDs = append(config.RootFS.DiffIDs, upperBlob.Desc.Digest)
}
config.RootFS.DiffIDs = append(config.RootFS.DiffIDs, upperBlob.Desc.Digest)
config.RootFS.DiffIDs = append(config.RootFS.DiffIDs, bootstrapDiffID)

configBytes, configDesc, err := cm.makeDesc(config, nydusImage.Manifest.Config)
Expand Down Expand Up @@ -657,11 +718,16 @@ func (cm *Committer) pushManifest(

// Push image manifest
layers := lowerBlobLayers
for idx := range mountBlobs {
mountBlob := mountBlobs[idx]
layers = append(layers, mountBlob.Desc)

// When using S3 backend, skip adding new blobs to manifest since they're referenced via URLs
// and Nydus only needs the bootstrap layer to function
if opt.BackendType != "registry" {
for idx := range mountBlobs {
mountBlob := mountBlobs[idx]
layers = append(layers, mountBlob.Desc)
}
layers = append(layers, upperBlob.Desc)
}
layers = append(layers, upperBlob.Desc)
layers = append(layers, bootstrapDesc)

nydusImage.Manifest.Config = *configDesc
Expand Down Expand Up @@ -851,6 +917,14 @@ func ValidateRef(ref string) (string, error) {
return named.String(), nil
}

// createBlobBackend creates a backend instance for blob storage if backend configuration is provided
func createBlobBackend(backendType, backendConfig string) (backend.Backend, error) {
if backendType == "" || strings.TrimSpace(backendConfig) == "" {
return nil, nil
}
return backend.NewBackend(backendType, []byte(backendConfig), nil)
}

type outputJSON struct {
FsVersion string `json:"fs_version"`
Compressor string `json:"compressor"`
Expand Down
13 changes: 10 additions & 3 deletions contrib/nydusify/pkg/committer/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,16 @@ func (m *Manager) Inspect(ctx context.Context, containerID string) (*InspectResu
if err != nil {
return nil, errors.Wrapf(err, "get snapshot mount")
}
// snapshot Mount Options[0] "workdir=$workdir", Options[1] "upperdir=$upperdir", Options[2] "lowerdir=$lowerdir".
lowerDirs = strings.TrimPrefix(mount[0].Options[2], "lowerdir=")
upperDir = strings.TrimPrefix(mount[0].Options[1], "upperdir=")

// Parse overlay mount options properly - they can be in any order
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work.

for _, option := range mount[0].Options {
if strings.HasPrefix(option, "lowerdir=") {
lowerDirs = strings.TrimPrefix(option, "lowerdir=")
} else if strings.HasPrefix(option, "upperdir=") {
upperDir = strings.TrimPrefix(option, "upperdir=")
}
// Skip workdir and other options as they're not needed
}

return &InspectResult{
LowerDirs: lowerDirs,
Expand Down
4 changes: 4 additions & 0 deletions contrib/nydusify/pkg/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ func Convert(ctx context.Context, opt Opt) error {
pvd.UsePlainHTTP()
}

if opt.WithPlainHTTP {
pvd.UsePlainHTTP()
}

cvt, err := converter.New(
converter.WithProvider(pvd),
converter.WithDriver("nydus", getConfig(opt)),
Expand Down
Loading