Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions src/commands/upload-file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,18 @@ export default class UploadFile extends Command {
reporter: this.log.bind(this),
})) ?? user.rootFolderId;

const timings = {
networkUpload: 0,
driveUpload: 0,
thumbnailUpload: 0,
};

// 1. Prepare the network
const networkFacade = await CLIUtils.prepareNetwork({ loginUserDetails: user, jsonFlag: flags['json'] });

// 2. Upload file to the Network
const readStream = createReadStream(filePath);
const timer = CLIUtils.timer();
const networkUploadTimer = CLIUtils.timer();
const progressBar = CLIUtils.progress(
{
format: 'Uploading file [{bar}] {percentage}%',
Expand Down Expand Up @@ -107,8 +113,10 @@ export default class UploadFile extends Command {
process.exit(1);
});
});
timings.networkUpload = networkUploadTimer.stop();

// 3. Create the file in Drive
const driveUploadTimer = CLIUtils.timer();
const createdDriveFile = await DriveFileService.instance.createFile({
plainName: fileInfo.name,
type: fileType,
Expand All @@ -120,7 +128,9 @@ export default class UploadFile extends Command {
creationTime: stats.birthtime?.toISOString(),
modificationTime: stats.mtime?.toISOString(),
});
timings.driveUpload = driveUploadTimer.stop();

const thumbnailTimer = CLIUtils.timer();
try {
if (isThumbnailable && bufferStream) {
const thumbnailBuffer = bufferStream.getBuffer();
Expand All @@ -138,14 +148,24 @@ export default class UploadFile extends Command {
} catch (error) {
ErrorUtils.report(error, { command: this.id });
}
timings.thumbnailUpload = thumbnailTimer.stop();

progressBar?.update(100);
progressBar?.stop();

const uploadTime = timer.stop();
const totalTime = Object.values(timings).reduce((sum, time) => sum + time, 0);
const throughputMBps = CLIUtils.calculateThroughputMBps(stats.size, timings.networkUpload);

this.log('\n');
this.log(
`[PUT] Timing breakdown:\n
Network upload: ${CLIUtils.formatDuration(timings.networkUpload)} (${throughputMBps.toFixed(2)} MB/s)\n
Drive upload: ${CLIUtils.formatDuration(timings.driveUpload)}\n
Thumbnail: ${CLIUtils.formatDuration(timings.thumbnailUpload)}\n`,
);
this.log('\n');
// eslint-disable-next-line max-len
const message = `File uploaded in ${uploadTime}ms, view it at ${ConfigService.instance.get('DRIVE_WEB_URL')}/file/${createdDriveFile.uuid}`;
const message = `File uploaded successfully in ${CLIUtils.formatDuration(totalTime)}, view it at ${ConfigService.instance.get('DRIVE_WEB_URL')}/file/${createdDriveFile.uuid}`;
CLIUtils.success(this.log.bind(this), message);
return {
success: true,
Expand Down
2 changes: 1 addition & 1 deletion src/services/network/upload/upload-facade.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export class UploadFacade {
// This aims to prevent this issue: https://inxt.atlassian.net/browse/PB-1446
await AsyncUtils.sleep(500);

const totalBytes = await UploadFileService.instance.uploadFilesInChunks({
const totalBytes = await UploadFileService.instance.uploadFilesConcurrently({
network,
filesToUpload: scanResult.files,
folderMap,
Expand Down
46 changes: 35 additions & 11 deletions src/services/network/upload/upload-file.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
DELAYS_MS,
MAX_CONCURRENT_UPLOADS,
MAX_RETRIES,
UploadFilesInBatchesParams,
UploadFilesConcurrentlyParams,
UploadFileWithRetryParams,
} from './upload.types';
import { DriveFileService } from '../../drive/drive-file.service';
Expand All @@ -12,26 +12,27 @@ import { isAlreadyExistsError } from '../../../utils/errors.utils';
import { stat } from 'node:fs/promises';
import { EncryptionVersion } from '@internxt/sdk/dist/drive/storage/types';
import { createFileStreamWithBuffer, tryUploadThumbnail } from '../../../utils/thumbnail.utils';
import { CLIUtils } from '../../../utils/cli.utils';

export class UploadFileService {
static readonly instance = new UploadFileService();

async uploadFilesInChunks({
async uploadFilesConcurrently({
network,
filesToUpload,
folderMap,
bucket,
destinationFolderUuid,
currentProgress,
emitProgress,
}: UploadFilesInBatchesParams): Promise<number> {
}: UploadFilesConcurrentlyParams): Promise<number> {
let bytesUploaded = 0;

const chunks = this.chunkArray(filesToUpload, MAX_CONCURRENT_UPLOADS);
const concurrentFiles = this.concurrencyArray(filesToUpload, MAX_CONCURRENT_UPLOADS);

for (const chunk of chunks) {
for (const fileArray of concurrentFiles) {
await Promise.allSettled(
chunk.map(async (file) => {
fileArray.map(async (file) => {
const parentPath = dirname(file.relativePath);
const parentFolderUuid =
parentPath === '.' || parentPath === '' ? destinationFolderUuid : folderMap.get(parentPath);
Expand Down Expand Up @@ -78,6 +79,13 @@ export class UploadFileService {
fileType,
});

const timings = {
networkUpload: 0,
driveUpload: 0,
thumbnailUpload: 0,
};

const uploadTimer = CLIUtils.timer();
const fileId = await new Promise<string>((resolve, reject) => {
network.uploadFile(
fileStream,
Expand All @@ -92,7 +100,9 @@ export class UploadFileService {
() => {},
);
});
timings.networkUpload = uploadTimer.stop();

const driveTimer = CLIUtils.timer();
const createdDriveFile = await DriveFileService.instance.createFile({
plainName: file.name,
type: fileType,
Expand All @@ -104,7 +114,9 @@ export class UploadFileService {
creationTime: stats.birthtime?.toISOString(),
modificationTime: stats.mtime?.toISOString(),
});
timings.driveUpload = driveTimer.stop();

const thumbnailTimer = CLIUtils.timer();
if (bufferStream) {
void tryUploadThumbnail({
bufferStream,
Expand All @@ -114,6 +126,18 @@ export class UploadFileService {
networkFacade: network,
});
}
timings.thumbnailUpload = thumbnailTimer.stop();

const totalTime = Object.values(timings).reduce((sum, time) => sum + time, 0);
const throughputMBps = CLIUtils.calculateThroughputMBps(stats.size, timings.networkUpload);
logger.info(`Uploaded '${file.name}' (${CLIUtils.formatBytesToString(stats.size)})`);
logger.info(
`Timing breakdown:\n
Network upload: ${CLIUtils.formatDuration(timings.networkUpload)} (${throughputMBps.toFixed(2)} MB/s)\n
Drive upload: ${CLIUtils.formatDuration(timings.driveUpload)}\n
Thumbnail: ${CLIUtils.formatDuration(timings.thumbnailUpload)}\n
Total: ${CLIUtils.formatDuration(totalTime)}\n`,
);

return createdDriveFile.fileId;
} catch (error: unknown) {
Expand All @@ -136,11 +160,11 @@ export class UploadFileService {
}
return null;
}
private chunkArray<T>(array: T[], chunkSize: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < array.length; i += chunkSize) {
chunks.push(array.slice(i, i + chunkSize));
private concurrencyArray<T>(array: T[], arraySize: number): T[][] {
const arrays: T[][] = [];
for (let i = 0; i < array.length; i += arraySize) {
arrays.push(array.slice(i, i + arraySize));
}
return chunks;
return arrays;
}
}
4 changes: 2 additions & 2 deletions src/services/network/upload/upload.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export interface CreateFolderWithRetryParams {
parentFolderUuid: string;
}

export interface UploadFilesInBatchesParams {
export interface UploadFilesConcurrentlyParams {
network: NetworkFacade;
filesToUpload: FileSystemNode[];
folderMap: Map<string, string>;
Expand All @@ -49,6 +49,6 @@ export interface UploadFileWithRetryParams {
bucket: string;
parentFolderUuid: string;
}
export const MAX_CONCURRENT_UPLOADS = 5;
export const MAX_CONCURRENT_UPLOADS = 10;
export const DELAYS_MS = [500, 1000, 2000];
export const MAX_RETRIES = 2;
37 changes: 37 additions & 0 deletions src/utils/cli.utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,43 @@ export class CLIUtils {
};
};

static readonly formatDuration = (milliseconds: number): string => {
if (milliseconds <= 0) {
return '00:00:00.000';
}
const totalSeconds = Math.floor(milliseconds / 1000);
const hours = Math.floor(totalSeconds / 3600);
const minutes = Math.floor((totalSeconds % 3600) / 60);
const seconds = totalSeconds % 60;
const ms = Math.floor(milliseconds % 1000);
const hoursFormated = hours.toString().padStart(2, '0');
const minutesFormated = minutes.toString().padStart(2, '0');
const secondsFormated = seconds.toString().padStart(2, '0');
const msFormated = ms.toString().padStart(3, '0');
return `${hoursFormated}:${minutesFormated}:${secondsFormated}.${msFormated}`;
};

static readonly formatBytesToString = (bytes: number): string => {
if (bytes <= 0) {
return '0.00 KB';
}
const kb = bytes / 1024;
if (kb < 1024) {
return `${kb.toFixed(2)} KB`;
}
const mb = kb / 1024;
return `${mb.toFixed(2)} MB`;
};

static readonly calculateThroughputMBps = (bytes: number, milliseconds: number): number => {
if (bytes <= 0 || milliseconds <= 0) {
return 0;
}
const megabytes = bytes / 1024 / 1024;
const seconds = milliseconds / 1000;
return megabytes / seconds;
};

static readonly catchError = ({
error,
logReporter,
Expand Down
33 changes: 27 additions & 6 deletions src/webdav/handlers/PUT.handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,15 @@ export class PUTRequestHandler implements WebDavMethodHandler {
throw new NotFoundError('Folders cannot be created with PUT. Use MKCOL instead.');
}
webdavLogger.info(`[PUT] Request received for file at ${resource.url}`);
webdavLogger.info(`[PUT] Uploading '${resource.name}' to '${resource.parentPath}'`);
webdavLogger.info(
`[PUT] Uploading '${resource.name}' (${CLIUtils.formatBytesToString(contentLength)}) to '${resource.parentPath}'`,
);

const timings = {
networkUpload: 0,
driveUpload: 0,
thumbnailUpload: 0,
};

const parentDriveFolderItem =
(await this.dependencies.webDavFolderService.getDriveFolderItemFromPath(resource.parentPath)) ??
Expand All @@ -69,8 +77,6 @@ export class PUTRequestHandler implements WebDavMethodHandler {

const fileType = resource.path.ext.replace('.', '');

const timer = CLIUtils.timer();

let bufferStream: BufferStream | undefined;
let fileStream: Readable = req;
const isThumbnailable = isFileThumbnailable(fileType);
Expand All @@ -88,6 +94,7 @@ export class PUTRequestHandler implements WebDavMethodHandler {
}
};

const networkUploadTimer = CLIUtils.timer();
const fileId = await new Promise((resolve: (fileId: string) => void, reject) => {
const state = this.dependencies.networkFacade.uploadFile(
fileStream,
Expand All @@ -111,9 +118,11 @@ export class PUTRequestHandler implements WebDavMethodHandler {
});
});
uploaded = true;
timings.networkUpload = networkUploadTimer.stop();

webdavLogger.info('[PUT] ✅ File uploaded to network');

const driveTimer = CLIUtils.timer();
const file = await DriveFileService.instance.createFile({
plainName: resource.path.name,
type: fileType,
Expand All @@ -123,7 +132,9 @@ export class PUTRequestHandler implements WebDavMethodHandler {
bucket: user.bucket,
encryptVersion: EncryptionVersion.Aes03,
});
timings.driveUpload = driveTimer.stop();

const thumbnailTimer = CLIUtils.timer();
try {
if (isThumbnailable && bufferStream) {
const thumbnailBuffer = bufferStream.getBuffer();
Expand All @@ -141,15 +152,25 @@ export class PUTRequestHandler implements WebDavMethodHandler {
} catch (error) {
webdavLogger.info(`[PUT] ❌ File thumbnail upload failed ${(error as Error).message}`);
}
timings.thumbnailUpload = thumbnailTimer.stop();

const uploadTime = timer.stop();
webdavLogger.info(`[PUT] ✅ File uploaded in ${uploadTime}ms to Internxt Drive`);
const totalTime = Object.values(timings).reduce((sum, time) => sum + time, 0);
const throughputMBps = CLIUtils.calculateThroughputMBps(contentLength, timings.networkUpload);

webdavLogger.info(`[PUT] ✅ File uploaded in ${CLIUtils.formatDuration(totalTime)} to Internxt Drive`);

webdavLogger.info(
`[PUT] Timing breakdown:\n
Network upload: ${CLIUtils.formatDuration(timings.networkUpload)} (${throughputMBps.toFixed(2)} MB/s)\n
Drive upload: ${CLIUtils.formatDuration(timings.driveUpload)}\n
Thumbnail: ${CLIUtils.formatDuration(timings.thumbnailUpload)}\n`,
);

// Wait for backend search index to propagate (same as folder creation delay in PB-1446)
await AsyncUtils.sleep(500);

webdavLogger.info(
`[PUT] [RESPONSE-201] ${resource.url} - Returning 201 Created after ${uploadTime}ms (+ 500ms propagation delay)`,
`[PUT] [RESPONSE-201] ${resource.url} - Returning 201 Created after ${CLIUtils.formatDuration(totalTime)}`,
);

res.status(201).send();
Expand Down
14 changes: 7 additions & 7 deletions test/services/network/upload/upload-facade.service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ vi.mock('../../../../src/services/network/upload/upload-folder.service', () => (
vi.mock('../../../../src/services/network/upload/upload-file.service', () => ({
UploadFileService: {
instance: {
uploadFilesInChunks: vi.fn(),
uploadFilesConcurrently: vi.fn(),
},
},
}));
Expand Down Expand Up @@ -84,7 +84,7 @@ describe('UploadFacade', () => {
totalBytes: 500,
});
vi.mocked(UploadFolderService.instance.createFolders).mockResolvedValue(folderMap);
vi.mocked(UploadFileService.instance.uploadFilesInChunks).mockResolvedValue(500);
vi.mocked(UploadFileService.instance.uploadFilesConcurrently).mockResolvedValue(500);
vi.mocked(CLIUtils.timer).mockReturnValue({
stop: vi.fn().mockReturnValue(1000),
});
Expand Down Expand Up @@ -116,7 +116,7 @@ describe('UploadFacade', () => {
).rejects.toThrow('Failed to create folders, cannot upload files');

expect(UploadFolderService.instance.createFolders).toHaveBeenCalled();
expect(UploadFileService.instance.uploadFilesInChunks).not.toHaveBeenCalled();
expect(UploadFileService.instance.uploadFilesConcurrently).not.toHaveBeenCalled();
});

it('should properly handle the upload of folder and the creation of file and return proper result', async () => {
Expand All @@ -133,7 +133,7 @@ describe('UploadFacade', () => {
expect(result.rootFolderId).toBe('folder-uuid-123');
expect(result.uploadTimeMs).toBe(1000);
expect(UploadFolderService.instance.createFolders).toHaveBeenCalled();
expect(UploadFileService.instance.uploadFilesInChunks).toHaveBeenCalled();
expect(UploadFileService.instance.uploadFilesConcurrently).toHaveBeenCalled();
expect(logger.info).toHaveBeenCalledWith(`Scanned folder ${localPath}: found 2 items, total size 500 bytes.`);
});

Expand All @@ -148,7 +148,7 @@ describe('UploadFacade', () => {
},
);

vi.mocked(UploadFileService.instance.uploadFilesInChunks).mockImplementation(
vi.mocked(UploadFileService.instance.uploadFilesConcurrently).mockImplementation(
async ({ currentProgress, emitProgress }) => {
currentProgress.itemsUploaded = 2;
currentProgress.bytesUploaded = 500;
Expand All @@ -174,7 +174,7 @@ describe('UploadFacade', () => {
vi.useFakeTimers();

vi.mocked(UploadFolderService.instance.createFolders).mockResolvedValue(folderMap);
vi.mocked(UploadFileService.instance.uploadFilesInChunks).mockResolvedValue(100);
vi.mocked(UploadFileService.instance.uploadFilesConcurrently).mockResolvedValue(100);

const uploadPromise = sut.uploadFolder({
localPath,
Expand All @@ -190,7 +190,7 @@ describe('UploadFacade', () => {
expect(AsyncUtils.sleep).toHaveBeenCalledWith(500);
expect(AsyncUtils.sleep).toHaveBeenCalledTimes(1);
expect(UploadFolderService.instance.createFolders).toHaveBeenCalled();
expect(UploadFileService.instance.uploadFilesInChunks).toHaveBeenCalled();
expect(UploadFileService.instance.uploadFilesConcurrently).toHaveBeenCalled();

vi.useRealTimers();
});
Expand Down
Loading