diff --git a/src/cli/Commands.ts b/src/cli/Commands.ts index 1210923..5cf12ba 100644 --- a/src/cli/Commands.ts +++ b/src/cli/Commands.ts @@ -3,6 +3,7 @@ import { buildCommand } from './CommandInterface'; import { createBucket } from './create-bucket'; import { deleteBucket } from './delete-bucket'; import { downloadFile } from './download-file'; +import { downloadFileMultipart } from './download-file-multipart'; import { getDownloadLinks } from './get-download-links'; import { getFileInfo } from './get-filo-info'; import { renameFile } from './rename-file'; @@ -106,6 +107,15 @@ export const getDownloadLinksCommand = buildCommand({ getDownloadLinks(bucketId, fileIdsSeparatedByCommas.split(',')).finally(notifyProgramFinished('get-download-links')); }); +export const downloadFileMultipartCommand = buildCommand({ + version: '0.0.1', + command: 'download-file-multipart ', + description: 'Downloads a file by parts', + options: [], +}).action((fileId: string, fileSize: string, path: string) => { + downloadFileMultipart(fileId, parseInt(fileSize), path).finally(notifyProgramFinished('download-file-multipart')); +}); + // export const downloadFolderZippedCommand = buildCommand({ // version: '0.0.1', // name: 'download-folder-zip', diff --git a/src/cli/download-file-multipart.ts b/src/cli/download-file-multipart.ts new file mode 100644 index 0000000..ba07f4e --- /dev/null +++ b/src/cli/download-file-multipart.ts @@ -0,0 +1,52 @@ +import { createWriteStream } from 'fs'; +import { pipeline, Readable } from 'stream'; + +import { logger } from '../lib/utils/logger'; +import { getEnvironment } from './CommandInterface'; + +export async function downloadFileMultipart(fileId: string, fileSize: number, path: string) { + logger.info('Downloading file %s', fileId); + + const network = getEnvironment(); + const bucketId = process.env.BUCKET_ID; + + const destination = createWriteStream(path); + + try { + await new Promise((resolve, reject) => { + const state = network.downloadMultipartFile( + fileId, + fileSize, + bucketId as string, + { + progressCallback: (progress: number) => { + logger.info('Progress: %s %', (progress * 100).toFixed(2)); + }, + finishedCallback: (err: Error | null, downloadStream: Readable | null) => { + if (err) { + return reject(err); + } + + pipeline(downloadStream as Readable, destination, (err) => { + if (err) { + return reject(err); + } + resolve(null); + }); + }, + }, + ); + + process.on('SIGINT', () => { + network.downloadCancel(state); + }); + }); + logger.info('File downloaded on path %s', path); + + process.exit(0); + } catch (err) { + logger.error('Error downloading file %s', err.message); + + process.exit(1); + } +} diff --git a/src/cli/index.ts b/src/cli/index.ts index 660eefd..3bf038c 100644 --- a/src/cli/index.ts +++ b/src/cli/index.ts @@ -16,6 +16,7 @@ program.addCommand(commands.getFileInfoCommand); program.addCommand(commands.createBucketCommand); program.addCommand(commands.deleteBucketCommand); program.addCommand(commands.getDownloadLinksCommand); +program.addCommand(commands.downloadFileMultipartCommand); // program.addCommand(commands.downloadFolderZippedCommand); program.parse(process.argv); diff --git a/src/index.ts b/src/index.ts index 8821f34..ac850aa 100644 --- a/src/index.ts +++ b/src/index.ts @@ -23,6 +23,7 @@ import { HashStream } from './lib/utils/streams'; import { downloadFileV2 } from './lib/core/download/downloadV2'; import { FileVersionOneError } from '@internxt/sdk/dist/network/download'; import { uploadFileMultipart, uploadFileV2 } from './lib/core/upload/uploadV2'; +import { downloadFileMultipart } from './lib/core/download/multipart'; type GetBucketsCallback = (err: Error | null, result: any) => void; @@ -334,6 +335,71 @@ export class Environment { return downloadState; }; + downloadMultipartFile( + fileId: string, + fileSize: number, + bucketId: string, + opts: DownloadOptions, + ) { + const abortController = new AbortController(); + const downloadState = new ActionState(ActionTypes.Download); + + downloadState.once(Events.Download.Abort, () => { + abortController.abort(); + }); + + if (!this.config.encryptionKey) { + opts.finishedCallback(Error(ENCRYPTION_KEY_NOT_PROVIDED), null); + + return downloadState; + } + + if (!bucketId) { + opts.finishedCallback(Error(BUCKET_ID_NOT_PROVIDED), null); + + return downloadState; + } + + if (!fileId) { + opts.finishedCallback(Error('File id not provided'), null); + + return downloadState; + } + + if (!this.config.bridgeUrl) { + opts.finishedCallback(Error('Missing bridge url'), null); + + return downloadState; + } + + const [downloadPromise, stream] = downloadFileMultipart( + fileId, + fileSize, + bucketId, + this.config.encryptionKey, + this.config.bridgeUrl, + { + user: this.config.bridgeUser, + pass: this.config.bridgePass + }, + opts.progressCallback, + () => { + opts.finishedCallback(null, stream); + }, + abortController.signal, + ); + + downloadPromise.catch((err) => { + if (err instanceof FileVersionOneError) { + opts.finishedCallback(new Error('Invalid method for v1 files'), null) + } else { + opts.finishedCallback(err, null); + } + }); + + return downloadState; + } + downloadCancel(state: ActionState): void { state.stop(); } diff --git a/src/lib/core/download/multipart.ts b/src/lib/core/download/multipart.ts new file mode 100644 index 0000000..9189053 --- /dev/null +++ b/src/lib/core/download/multipart.ts @@ -0,0 +1,146 @@ +import { request } from 'undici'; +import { createDecipheriv, randomBytes } from 'crypto'; +import { Writable, Readable, PassThrough } from 'stream' +import { pipeline } from 'stream/promises' +import { validateMnemonic } from 'bip39'; +import { ALGORITHMS, DecryptFileFunction, DownloadFileFunction, Network } from '@internxt/sdk/dist/network'; +import { downloadFile, FileVersionOneError } from '@internxt/sdk/dist/network/download'; + +import { Events as ProgressEvents, HashStream, ProgressNotifier } from '../../utils/streams'; +import { DownloadProgressCallback } from '.'; +import { GenerateFileKey, sha256 } from '../../utils/crypto'; +import Errors from './errors'; + +async function downloadPartStream( + downloadUrl: string, + from: number, + to: number, + signal?: AbortSignal +): Promise { + const { statusCode, body } = await request(downloadUrl, { + signal, + method: 'GET', + headers: { + Range: `bytes=${from}-${to}`, + } + }); + + if (statusCode !== 206) { + throw new Error(`Error al descargar el chunk: ${statusCode}`); + } + + return body; +} + +export function downloadFileMultipart( + fileId: string, + fileSize: number, + bucketId: string, + mnemonic: string, + bridgeUrl: string, + creds: { pass: string, user: string }, + notifyProgress: DownloadProgressCallback, + onV2Confirmed: () => void, + signal?: AbortSignal +): [Promise, PassThrough] { + const partLength = 50 * 1024 * 1024; + const outStream = new PassThrough(); + const network = Network.client(bridgeUrl, { + clientName: 'inxt-js', + clientVersion: '1.0' + }, { + bridgeUser: creds.user, + userId: sha256(Buffer.from(creds.pass)).toString('hex') + }); + + let downloadUrl = '' + + const ranges: { start: number, end: number }[] = []; + + for (let start = 0; start < fileSize; start += partLength) { + const end = Math.min(start + partLength - 1, fileSize - 1); + ranges.push({ start, end }); + } + + + const downloadFileStep: DownloadFileFunction = async (downloadables) => { + onV2Confirmed(); + + for (const downloadable of downloadables.sort((dA, dB) => dA.index - dB.index)) { + downloadUrl = downloadable.url + } + }; + + const decryptFileStep: DecryptFileFunction = async (algorithm, key, iv, fileSize) => { + if (algorithm !== ALGORITHMS.AES256CTR.type) { + throw Errors.downloadUnknownAlgorithmError; + } + + const decipher = createDecipheriv('aes-256-ctr', key as Buffer, iv as Buffer); + const progress = new ProgressNotifier(fileSize, 2000, { emitClose: false }); + + progress.on(ProgressEvents.Progress, (progress: number) => { + notifyProgress(progress, null, null); + }); + + const hasher = new HashStream(); + + const pipelinePromise = pipeline( + hasher, + decipher, + progress, + outStream, + signal ? { signal } : undefined + ); + + for (const range of ranges) { + const partStream = await downloadPartStream(downloadUrl, range.start, range.end, signal); + for await (const chunk of partStream) { + if (!hasher.write(chunk)) { + await new Promise((resolve) => hasher.once('drain', resolve)); + } + } + } + + hasher.end(); + + await pipelinePromise; + + // TODO: Enforce this one + // const calculatedHash = hasher.getHash().toString('hex'); + // const expectedHash = fileEncryptedSlice.hash; + + // if (calculatedHash !== expectedHash) { + // throw Errors.downloadHashMismatchError; + // } + + await new Promise((res) => progress.end(res)); + }; + + const downloadPromise = downloadFile( + fileId, + bucketId, + mnemonic, + network, + { + validateMnemonic: (mnemonic) => { + return validateMnemonic(mnemonic); + }, + algorithm: ALGORITHMS.AES256CTR, + randomBytes, + generateFileKey: (mnemonic, bucketId, index) => { + return GenerateFileKey(mnemonic, bucketId, index as Buffer | string); + } + }, + Buffer.from, + downloadFileStep, + decryptFileStep + ).catch((err) => { + if (err instanceof FileVersionOneError) { + throw err; + } + outStream.emit('error', err); + }); + + return [downloadPromise, outStream]; +}