Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions bin/cli/commands/empty-users-from-csv.command.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { PrepareFunctionReturnType } from '../init';
import { emptyUsersFromCsv } from '../tasks/empty-users-from-csv.task';
import { CommandId } from './id';

export default {
id: CommandId.EmptyUsersFromCsv,
version: '0.0.1',
fn: async (
{ repo: { usersRepository }, usecase: { bucketsUsecase, bucketEntriesUsecase } }: PrepareFunctionReturnType,
csvPath: string,
concurrency: number,
): Promise<void> => {
await emptyUsersFromCsv(csvPath, usersRepository, bucketsUsecase, bucketEntriesUsecase, concurrency);
},
};
1 change: 1 addition & 0 deletions bin/cli/commands/id.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ export enum CommandId {
EmptyBuckets = 'empty-buckets',
CleanStalledFrames = 'clean-stalled-frames',
CleanStalledBucketEntries = 'clean-stalled-bucket-entries',
EmptyUsersFromCsv = 'empty-users-from-csv',
}
13 changes: 13 additions & 0 deletions bin/cli/commands/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { default as emptyBucket } from "./empty-bucket.command";
import { default as emptyBuckets } from "./empty-buckets.command";
import { default as cleanStalledFrames } from "./clean-stalled-frames.command";
import { default as cleanStalledBucketEntries } from "./clean-stalled-bucket-entries.command";
import { default as emptyUsersFromCsv } from "./empty-users-from-csv.command";

export default (resources: PrepareFunctionReturnType, onFinish: () => void) => ({
[destroyUserBuckets.id]: buildCommand({
Expand Down Expand Up @@ -57,4 +58,16 @@ export default (resources: PrepareFunctionReturnType, onFinish: () => void) => (
await cleanStalledBucketEntries.fn(resources);
onFinish();
}),

[emptyUsersFromCsv.id]: buildCommand({
version: emptyUsersFromCsv.version,
command: `${emptyUsersFromCsv.id} <csv_path>`,
description: 'Empties buckets for all users listed in a CSV file (one email per line)',
options: [
{ flags: '-c, --concurrency <number>', description: 'Number of users to process in parallel', defaultValue: '3' },
],
}).action(async (csvPath, { concurrency }) => {
await emptyUsersFromCsv.fn(resources, csvPath, parseInt(concurrency, 10));
onFinish();
}),
});
32 changes: 9 additions & 23 deletions bin/cli/init.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import NetworkMessageQueue from '../../lib/server/queues/networkQueue';
import { connectToDatabase } from "../utils/database";

import { MongoDBUsersRepository } from '../../lib/core/users/MongoDBUsersRepository'
Expand Down Expand Up @@ -32,6 +31,7 @@ import { ContactsRepository } from '../../lib/core/contacts/Repository';
import { MongoDB } from '../delete-objects/temp-shard.model';
import { DatabaseFramesReader, DatabaseBucketEntriesReaderWithoutBucket } from '../delete-objects/ObjectStorage';
import { FileStateRepository } from '../../lib/core/fileState/Repository';
import * as bullQueueModule from '../../lib/core/queue/bullQueue';

const Config = require('../../lib/config');

Expand Down Expand Up @@ -63,28 +63,17 @@ export type PrepareFunctionReturnType = {
}

export async function prepare(): Promise<PrepareFunctionReturnType> {
const QUEUE_NAME = 'NETWORK_WORKER_TASKS_QUEUE';

const newDbConnection = new MongoDB(process.env.inxtbridge_storage__mongoUri as string);
const newDbConnection = new MongoDB(process.env.inxtbridge_storage__mongoUrl as string);
await newDbConnection.connect();
const models = await connectToDatabase('', '');
const { QUEUE_USERNAME, QUEUE_PASSWORD, QUEUE_HOST } = config;
try {
bullQueueModule.init(config);
console.log('bull queue initialized');
} catch (err) {
const error = err as any;
console.error('failed to initialize bull queue:', error && error.message ? error.message : error);
}

const networkQueue = new NetworkMessageQueue({
connection: {
url: `amqp://${QUEUE_USERNAME}:${QUEUE_PASSWORD}@${QUEUE_HOST}`,
},
exchange: {
name: 'exchangeName',
type: 'direct',
},
queue: {
name: QUEUE_NAME,
},
routingKey: {
name: 'routingKeyName',
},
});
const bucketEntriesRepository = new MongoDBBucketEntriesRepository(models.BucketEntry);
const bucketEntryShardsRepository = new MongoDBBucketEntryShardsRepository(models.BucketEntryShard);
const bucketsRepository = new MongoDBBucketsRepository(models.Bucket);
Expand All @@ -101,7 +90,6 @@ export async function prepare(): Promise<PrepareFunctionReturnType> {
const shardsUsecase = new ShardsUsecase(
mirrorsRepository,
contactsRepository,
networkQueue
);
const bucketEntriesUsecase = new BucketEntriesUsecase(
bucketEntriesRepository,
Expand Down Expand Up @@ -133,8 +121,6 @@ export async function prepare(): Promise<PrepareFunctionReturnType> {
const bucketEntriesReader = new DatabaseBucketEntriesReaderWithoutBucket(
newDbConnection.getCollections().bucketEntries
);
await networkQueue.connectAndRetry();

return {
readers: {
framesReader,
Expand Down
72 changes: 72 additions & 0 deletions bin/cli/tasks/empty-users-from-csv.task.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import fs from 'fs';
import readline from 'readline';
import { BucketEntriesUsecase } from '../../../lib/core/bucketEntries/usecase';
import { BucketsUsecase } from '../../../lib/core/buckets/usecase';
import { UsersRepository } from '../../../lib/core/users/Repository';
import { emptyBucket } from './empty-bucket.task';
import { emptyBuckets } from './empty-buckets.task';

const emptyUserBuckets = async (
email: string,
usersRepository: UsersRepository,
bucketsUsecase: BucketsUsecase,
bucketEntriesUsecase: BucketEntriesUsecase,
): Promise<void> => {
const user = await usersRepository.findByEmail(email);

if (!user) {
console.log(`User not found: ${email}`);
return;
}

console.log(`Emptying buckets for user ${email} (${user.id})`);

const limit = 20;
let offset = 0;
let moreToProcess = true;

do {
const buckets = await bucketsUsecase.listByUserId(user.id, limit, offset);

moreToProcess = buckets.length === limit;
offset += buckets.length;

await emptyBuckets(
buckets.map(({ id }) => id),
(id) => emptyBucket(id, bucketEntriesUsecase),
async (id) => console.log(` Bucket ${id} emptied`),
);
} while (moreToProcess);

console.log(`Done with user ${email}`);
};

export const emptyUsersFromCsv = async (
csvPath: string,
usersRepository: UsersRepository,
bucketsUsecase: BucketsUsecase,
bucketEntriesUsecase: BucketEntriesUsecase,
concurrency = 3,
): Promise<void> => {
const fileStream = fs.createReadStream(csvPath);
const rl = readline.createInterface({ input: fileStream, crlfDelay: Infinity });

const emails: string[] = [];
for await (const line of rl) {
const email = line.trim();
if (email && email.includes('@')) emails.push(email);
}

console.log(`Processing ${emails.length} users with concurrency ${concurrency}`);

let index = 0;

const worker = async (): Promise<void> => {
while (index < emails.length) {
const email = emails[index++];
await emptyUserBuckets(email, usersRepository, bucketsUsecase, bucketEntriesUsecase);
}
};

await Promise.all(Array.from({ length: concurrency }, worker));
};
13 changes: 4 additions & 9 deletions bin/utils/database.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { config as loadEnv } from 'dotenv';
import Config from '../../lib/config';
import DatabaseConnection from '../../lib/models/database';

loadEnv();

const Storage = require('storj-service-storage-models') as any;

const wait = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));

export interface Models {
Expand All @@ -22,7 +21,7 @@ export interface Models {
FileState: any,
};

export async function connectToDatabase(configJSON: any, mongoURL: string): Promise<Models> {
export async function connectToDatabase(configJSON: any, mongoURL: string): Promise<any> {
const config = new Config(process.env.NODE_ENV, configJSON, '') as {
storage: {
mongoUrl: string;
Expand All @@ -31,13 +30,9 @@ export async function connectToDatabase(configJSON: any, mongoURL: string): Prom
QUEUE_USERNAME: string;
QUEUE_PASSWORD: string;
QUEUE_HOST: string;
};
}

const storage = new Storage(
mongoURL || config.storage.mongoUrl,
config.storage.mongoOpts,
null
);
const storage = DatabaseConnection.createFromConfig(config.storage);

await wait(5000);

Expand Down
Loading