Skip to content

Commit 7e85e80

Browse files
committed
fix: fixing up @matrixai/workers usage
1 parent 030c048 commit 7e85e80

10 files changed

+129
-111
lines changed

scripts/prebuild.mjs

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import url from 'node:url';
77
import process from 'node:process';
88
import childProcess from 'node:child_process';
99
import semver from 'semver';
10-
import packageJSON from '../package.json' assert { type: "json" };
10+
import packageJSON from '../package.json' assert { type: 'json' };
1111

1212
const projectPath = path.dirname(
1313
path.dirname(url.fileURLToPath(import.meta.url)),

src/DB.ts

+31-22
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@ import type {
1313
DBClearOptions,
1414
DBCountOptions,
1515
} from './types.js';
16-
import type { RocksDBDatabase, RocksDBDatabaseOptions } from './native/index.js';
16+
import type {
17+
RocksDBDatabase,
18+
RocksDBDatabaseOptions,
19+
} from './native/index.js';
1720
import nodeFs from 'node:fs';
18-
import { Transfer } from 'threads';
1921
import Logger from '@matrixai/logger';
2022
import { withF, withG } from '@matrixai/resources';
2123
import {
@@ -652,18 +654,20 @@ class DB {
652654
// Slice-copy for transferring to worker threads
653655
const key = utils.toArrayBuffer(this.crypto.key);
654656
const plainText = utils.toArrayBuffer(plainTextBuf);
655-
cipherText = await this.workerManager.call(async (w) => {
656-
return await w.encrypt(
657-
Transfer(key),
658-
// @ts-ignore: threads.js types are wrong
659-
Transfer(plainText),
660-
);
661-
});
657+
const result = await this.workerManager.methods.encrypt(
658+
{ key, plainText },
659+
[key, plainText],
660+
);
661+
cipherText = result.data;
662662
} else {
663-
cipherText = await this.crypto.ops.encrypt(
664-
this.crypto.key,
665-
plainTextBuf,
663+
const result = await this.crypto.ops.encrypt(
664+
{
665+
key: this.crypto.key,
666+
plainText: plainTextBuf,
667+
},
668+
[this.crypto.key, plainTextBuf],
666669
);
670+
cipherText = result.data;
667671
}
668672
return utils.fromArrayBuffer(cipherText);
669673
}
@@ -689,18 +693,23 @@ class DB {
689693
// Slice-copy for transferring to worker threads
690694
const key = utils.toArrayBuffer(this.crypto.key);
691695
const cipherText = utils.toArrayBuffer(cipherTextBuf);
692-
decrypted = await this.workerManager.call(async (w) => {
693-
return await w.decrypt(
694-
Transfer(key),
695-
// @ts-ignore: threads.js types are wrong
696-
Transfer(cipherText),
697-
);
698-
});
696+
const result = await this.workerManager.methods.decrypt(
697+
{
698+
key: this.crypto.key,
699+
cipherText: cipherTextBuf,
700+
},
701+
[this.crypto.key, cipherTextBuf],
702+
);
703+
decrypted = result.data;
699704
} else {
700-
decrypted = await this.crypto.ops.decrypt(
701-
this.crypto.key,
702-
cipherTextBuf,
705+
const result = await this.crypto.ops.decrypt(
706+
{
707+
key: this.crypto.key,
708+
cipherText: cipherTextBuf,
709+
},
710+
[this.crypto.key, cipherTextBuf],
703711
);
712+
decrypted = result.data;
704713
}
705714
if (decrypted == null) {
706715
throw new errors.ErrorDBDecrypt();

src/types.ts

+15-10
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type fs from 'node:fs';
2-
import type { WorkerManagerInterface } from '@matrixai/workers';
2+
import type { WorkerManager } from '@matrixai/workers';
33
import type {
44
RocksDBDatabaseOptions,
55
RocksDBIteratorOptions,
@@ -50,19 +50,24 @@ interface FileSystem {
5050
};
5151
}
5252

53-
/**
54-
* Crypto utility object
55-
* Remember ever Node Buffer is an ArrayBuffer
56-
*/
5753
type Crypto = {
58-
encrypt(key: ArrayBuffer, plainText: ArrayBuffer): Promise<ArrayBuffer>;
54+
encrypt(
55+
data: { key: ArrayBuffer; plainText: ArrayBuffer },
56+
transferList: [ArrayBuffer, ArrayBuffer],
57+
): Promise<{ data: ArrayBuffer; transferList: [ArrayBuffer] }>;
5958
decrypt(
60-
key: ArrayBuffer,
61-
cipherText: ArrayBuffer,
62-
): Promise<ArrayBuffer | undefined>;
59+
data: {
60+
key: ArrayBuffer;
61+
cipherText: ArrayBuffer;
62+
},
63+
transferList: [ArrayBuffer, ArrayBuffer],
64+
): Promise<
65+
| { data: ArrayBuffer; transferList: [ArrayBuffer] }
66+
| { data: undefined; transferList: [] }
67+
>;
6368
};
6469

65-
type DBWorkerManagerInterface = WorkerManagerInterface<Crypto>;
70+
type DBWorkerManagerInterface = WorkerManager<Crypto>;
6671

6772
/**
6873
* Path to a key

tests/DB.test.ts

+18-23
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,28 @@
11
import type DBTransaction from '#DBTransaction.js';
22
import type { KeyPath } from '#types.js';
33
import type { ResourceRelease } from '@matrixai/resources';
4-
import type { DBWorkerModule } from './workers/dbWorkerModule.js';
4+
import type { DBWorker } from './workers/dbWorker.js';
55
import os from 'node:os';
66
import path from 'node:path';
77
import fs from 'node:fs';
88
import nodeCrypto from 'node:crypto';
9+
import { Worker } from 'node:worker_threads';
910
import Logger, { LogLevel, StreamHandler } from '@matrixai/logger';
10-
// import { WorkerManager } from '@matrixai/workers';
11-
import matrixaiWorkers from '@matrixai/workers';
11+
import { WorkerManager } from '@matrixai/workers';
1212
import { withF } from '@matrixai/resources';
13-
import { spawn, Worker } from 'threads';
13+
import * as testsUtils from './utils.js';
14+
import dbWorker from './workers/dbWorker.js';
1415
import DB from '#DB.js';
1516
import * as errors from '#errors.js';
1617
import * as utils from '#utils.js';
17-
import * as testsUtils from './utils.js';
18-
19-
const { WorkerManager } = matrixaiWorkers;
2018

2119
describe(DB.name, () => {
2220
const logger = new Logger(`${DB.name} Test`, LogLevel.WARN, [
2321
new StreamHandler(),
2422
]);
2523
const crypto = {
2624
key: testsUtils.generateKeySync(256),
27-
ops: {
28-
encrypt: testsUtils.encrypt,
29-
decrypt: testsUtils.decrypt,
30-
},
25+
ops: dbWorker,
3126
};
3227
let dataDir: string;
3328
beforeEach(async () => {
@@ -346,12 +341,12 @@ describe(DB.name, () => {
346341
test('parallelized get and put and del', async () => {
347342
const dbPath = `${dataDir}/db`;
348343
const db = await DB.createDB({ dbPath, crypto, logger });
349-
const workerManager =
350-
await WorkerManager.createWorkerManager<DBWorkerModule>({
351-
workerFactory: () => spawn(new Worker('./workers/dbWorker')),
352-
cores: 1,
353-
logger,
354-
});
344+
const workerManager = await WorkerManager.createWorkerManager<DBWorker>({
345+
workerFactory: () => new Worker('./workers/dbWorker'),
346+
cores: 1,
347+
manifest: dbWorker,
348+
logger,
349+
});
355350
db.setWorkerManager(workerManager);
356351
await db.start();
357352
await db.put('a', 'value0');
@@ -408,12 +403,12 @@ describe(DB.name, () => {
408403
test('parallelized batch put and del', async () => {
409404
const dbPath = `${dataDir}/db`;
410405
const db = await DB.createDB({ dbPath, crypto, logger });
411-
const workerManager =
412-
await WorkerManager.createWorkerManager<DBWorkerModule>({
413-
workerFactory: () => spawn(new Worker('./workers/dbWorker')),
414-
cores: 4,
415-
logger,
416-
});
406+
const workerManager = await WorkerManager.createWorkerManager<DBWorker>({
407+
workerFactory: () => new Worker('./workers/dbWorker'),
408+
cores: 4,
409+
manifest: dbWorker,
410+
logger,
411+
});
417412
db.setWorkerManager(workerManager);
418413
await db.start();
419414
await db.batch([

tests/DBIterator.test.ts

+7-9
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,24 @@
1-
import type { KeyPath } from '@/types';
1+
import type { KeyPath } from '#types.js';
22
import os from 'os';
33
import path from 'path';
44
import fs from 'fs';
55
import nodeCrypto from 'crypto';
66
import nodeUtil from 'util';
77
import lexi from 'lexicographic-integer';
88
import Logger, { LogLevel, StreamHandler } from '@matrixai/logger';
9-
import DB from '@/DB';
10-
import DBIterator from '@/DBIterator';
11-
import rocksdbP from '@/native/rocksdbP';
12-
import * as testsUtils from './utils';
9+
import * as testsUtils from './utils.js';
10+
import dbWorker from './workers/dbWorker.js';
11+
import DB from '#DB.js';
12+
import DBIterator from '#DBIterator.js';
13+
import rocksdbP from '#native/rocksdbP.js';
1314

1415
describe(DBIterator.name, () => {
1516
const logger = new Logger(`${DBIterator.name} test`, LogLevel.WARN, [
1617
new StreamHandler(),
1718
]);
1819
const crypto = {
1920
key: testsUtils.generateKeySync(256),
20-
ops: {
21-
encrypt: testsUtils.encrypt,
22-
decrypt: testsUtils.decrypt,
23-
},
21+
ops: dbWorker,
2422
};
2523
let dataDir: string;
2624
let db: DB;

tests/DBTransaction.test.ts

+7-9
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,23 @@
1-
import type { KeyPath } from '@/types';
1+
import type { KeyPath } from '#types.js';
22
import os from 'os';
33
import path from 'path';
44
import fs from 'fs';
55
import Logger, { LogLevel, StreamHandler } from '@matrixai/logger';
66
import { withF } from '@matrixai/resources';
77
import { Barrier, errors as locksErrors } from '@matrixai/async-locks';
8-
import DB from '@/DB';
9-
import DBTransaction from '@/DBTransaction';
10-
import * as errors from '@/errors';
11-
import * as testsUtils from './utils';
8+
import * as testsUtils from './utils.js';
9+
import dbWorker from './workers/dbWorker.js';
10+
import DB from '#DB.js';
11+
import DBTransaction from '#DBTransaction.js';
12+
import * as errors from '#errors.js';
1213

1314
describe(DBTransaction.name, () => {
1415
const logger = new Logger(`${DBTransaction.name} test`, LogLevel.WARN, [
1516
new StreamHandler(),
1617
]);
1718
const crypto = {
1819
key: testsUtils.generateKeySync(256),
19-
ops: {
20-
encrypt: testsUtils.encrypt,
21-
decrypt: testsUtils.decrypt,
22-
},
20+
ops: dbWorker,
2321
};
2422
let dataDir: string;
2523
let db: DB;

tests/rocksdb/rocksdbP.test.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
import type { RocksDBDatabase } from '@/native/types';
1+
import type { RocksDBDatabase } from '#native/types.js';
22
import os from 'os';
33
import path from 'path';
44
import fs from 'fs';
55
import { Barrier } from '@matrixai/async-locks';
6-
import rocksdbP from '@/native/rocksdbP';
6+
import rocksdbP from '#native/rocksdbP.js';
77

88
describe('rocksdbP', () => {
99
let dataDir: string;

tests/utils.test.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { KeyPath } from '#types.js';
22
import nodeCrypto from 'node:crypto';
3-
import * as utils from '#utils.js';
43
import * as testUtils from './utils.js';
4+
import * as utils from '#utils.js';
55

66
describe('utils', () => {
77
const keyPaths: Array<KeyPath> = [

tests/workers/dbWorker.ts

+47-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,50 @@
1-
import type { DBWorkerModule } from './dbWorkerModule.js';
2-
import { expose } from 'threads/worker';
3-
import dbWorker from './dbWorkerModule.js';
1+
import type { Crypto } from '#types.js';
2+
import type { WorkerManifest } from '@matrixai/workers';
3+
import { expose } from '@matrixai/workers';
4+
import * as utils from '../utils.js';
5+
6+
const dbWorker: Crypto = {
7+
async encrypt(
8+
{
9+
key,
10+
plainText,
11+
}: {
12+
key: ArrayBuffer;
13+
plainText: ArrayBuffer;
14+
},
15+
// eslint-disable-line @typescript-eslint/no-unused-vars
16+
transferList: [ArrayBuffer, ArrayBuffer],
17+
): Promise<{ data: ArrayBuffer; transferList: [ArrayBuffer] }> {
18+
const cipherText = await utils.encrypt(key, plainText);
19+
return { data: cipherText, transferList: [cipherText] };
20+
},
21+
async decrypt(
22+
{
23+
key,
24+
cipherText,
25+
}: {
26+
key: ArrayBuffer;
27+
cipherText: ArrayBuffer;
28+
},
29+
// eslint-disable-line @typescript-eslint/no-unused-vars
30+
transferList: [ArrayBuffer, ArrayBuffer],
31+
): Promise<
32+
| { data: ArrayBuffer; transferList: [ArrayBuffer] }
33+
| { data: undefined; transferList: [] }
34+
> {
35+
const plainText = await utils.decrypt(key, cipherText);
36+
if (plainText != null) {
37+
return { data: plainText, transferList: [plainText] };
38+
} else {
39+
return { data: undefined, transferList: [] };
40+
}
41+
},
42+
} satisfies WorkerManifest;
443

544
expose(dbWorker);
645

7-
export type { DBWorkerModule };
46+
type DBWorker = typeof dbWorker;
47+
48+
export type { DBWorker };
49+
50+
export default dbWorker;

tests/workers/dbWorkerModule.ts

-30
This file was deleted.

0 commit comments

Comments
 (0)