diff --git a/README.md b/README.md index 175bfa6..e70d70e 100644 --- a/README.md +++ b/README.md @@ -144,12 +144,15 @@ queries: execution_strategy: FIXED_REQUEST_NUMBER requests: 10000 query: | - query AlbumByPK { - albums_by_pk(id: 1) { + query AlbumByPK($id: Int!) { + albums_by_pk(id: $id) { id title } } + # Optional variables (if the query/mutation uses variables) + variables: + id: 1 - name: AlbumByPKMultiStage tools: [k6] execution_strategy: MULTI_STAGE @@ -160,12 +163,15 @@ queries: - duration: 5s target: 1000 query: | - query AlbumByPK { - albums_by_pk(id: 1) { + query AlbumByPK($id: Int!) { + albums_by_pk(id: $id) { id title } - } + } + # Supply variables (Optional) from CSV file. Each row in the CSV is used in a request. (If 100 rows are provided, 100 queries with different variables are fired). The first row in the CSV must be name of the the variable. This can be used in tandem with the above variables. + variable_file: + file_path: './text.csv' # CSV file path releative to the config file ``` ##### Run with Docker @@ -241,6 +247,9 @@ config: artistIds: [1, 2, 3, 4] some_object: a_key: a_value + # Supply variables (Optional) from CSV file. Each row in the CSV is used in a Subscription. The first row in the CSV must be name of the the variable. This can be used in tandem with the above variables. + variable_file: + file_path: './text.csv' # CSV file path releative to the config file ``` ##### Note: Required Table diff --git a/app/cli/test.csv b/app/cli/test.csv new file mode 100644 index 0000000..6a7d813 --- /dev/null +++ b/app/cli/test.csv @@ -0,0 +1,5 @@ +id,name +1,sandeep +2,raj +3,kumar +4,kandasamy \ No newline at end of file diff --git a/app/queries/bin/k6/loadScript.js b/app/queries/bin/k6/loadScript.js index 910738c..c703be2 100644 --- a/app/queries/bin/k6/loadScript.js +++ b/app/queries/bin/k6/loadScript.js @@ -1,15 +1,39 @@ import http from 'k6/http' import { check } from 'k6' +var cachedFileVariables; +var fileVariablesCurrentIndex = 0; + +/** + * Use ES5 syntax + */ export default function () { - let { url, headers, query, variables } = __ENV + let { url, headers, query, variables, fileVariables } = __ENV // Can't pass nested JSON in config file, need to parse here because stringified if (headers) headers = JSON.parse(headers) if (variables) variables = JSON.parse(variables) + // Storing the JSONified fileVariables to not deserialize in every iteration + if (!cachedFileVariables) { + cachedFileVariables = JSON.parse(fileVariables) + } + + let combinedVariables; + + if (cachedFileVariables.length!=0) { + // TODO: can look at getting a random value beween 0 and cachedFileVariables.length rather than iterating for ensured randomness + if (fileVariablesCurrentIndex >= cachedFileVariables.length) { + fileVariablesCurrentIndex = 0; + } + combinedVariables = Object.assign({}, variables, cachedFileVariables[fileVariablesCurrentIndex]); + fileVariablesCurrentIndex++; + } else { + combinedVariables = variables; + } + // Prepare query & variables (if provided) - let body = JSON.stringify({ query, variables }) + let body = JSON.stringify({ query, variables: combinedVariables }) // Send the request let res = http.post(url, body, { headers }) diff --git a/app/queries/bin/wrk/graphql-bench.lua b/app/queries/bin/wrk/graphql-bench.lua index 8f013f7..a206fb8 100644 --- a/app/queries/bin/wrk/graphql-bench.lua +++ b/app/queries/bin/wrk/graphql-bench.lua @@ -4,10 +4,6 @@ -- done = function(summary, latency, requests) json = require "json" --- Set the default HTTP method and empty "headers" table here -wrk.method = "POST" -wrk.headers = {} - function tprint (tbl, indent) if not indent then indent = 0 end for k, v in pairs(tbl) do @@ -45,33 +41,67 @@ function print_wrk_config() print('-----') end +local threads = {} + +function setup(thread) + table.insert(threads, thread) +end + function init(args) + errorCount = 0 url, params = args[0], args[1] -- print('url', url) -- print('params', params) if not params then print('ERROR: NO PARAMS PASSED TO WRK2') end params = json.decode(params) - --print_params(params) - - if params['headers'] ~= nil then - for header, val in pairs(params['headers']) do - wrk.headers[header] = val - end - end +end + +function request() + method = 'POST' + path = url + headers = params['headers'] + + mergedVariables = merge_config_variables_and_file_variables() - wrk.body = json.encode({ + body = json.encode({ query = params['query'], - variables = params['variables'], + variables = mergedVariables }) + return wrk.format(method, path, headers, body) +end + +-- Merges one row of the variables from CSV file with the variables from config. +-- Row is selected by running the iterator fileVariablesCurrentIndex and resetting it when it reaches the end of the variables from CSV file. +fileVariablesCurrentIndex = 1 +function merge_config_variables_and_file_variables() + bodyVariables = {} + if params['variables'] then + bodyVariables = params['variables'] + end - --print_wrk_config() + if params['fileVariables'] and table.getn(params['fileVariables']) ~=0 then + if fileVariablesCurrentIndex > table.getn(params['fileVariables']) then + fileVariablesCurrentIndex = 1 + end + for k,v in pairs(params['fileVariables'][fileVariablesCurrentIndex]) do + bodyVariables[k] = v + end + fileVariablesCurrentIndex = fileVariablesCurrentIndex + 1 + end + return bodyVariables end -function request() - return wrk.request() +-- TODO: Better error processing. Currently, only the count of errors are maintained and printed to console. +function response(status, headers, body) + jsonBody = json.decode(body) + if jsonBody['errors'] or status ~= 200 then + errorCount = errorCount + 1 + end end +-- For smaller number of connections and requests, the Mean latency value turns out to be nan (in JSON encoding) and this function then throws error. +-- So keep the rps/connections 10+ to get around the issue function format_summary_to_json(summary, latency) local stats = { requests = summary.requests, @@ -123,6 +153,17 @@ end function done(summary, latency, requests) stats_table, json_stats = format_summary_to_json(summary, latency) io.stderr:write(json_stats) + allThreadsErrorCount = 0 + for index, thread in ipairs(threads) do + allThreadsErrorCount = allThreadsErrorCount + thread:get("errorCount") + end + --Red colored output to console if there are any errors + if allThreadsErrorCount ~= 0 then + io.write("\x1B[31m") + print('Total error responses = ' ..allThreadsErrorCount) + io.write("\x1B[m") + end + -- Commenting out this file write, just grab it and parse it from stderr for now -- write_file('/tmp/wrk2-stats.json', json_stats) end diff --git a/app/queries/package.json b/app/queries/package.json index 5e821e4..05d40d4 100644 --- a/app/queries/package.json +++ b/app/queries/package.json @@ -16,7 +16,8 @@ "fs-extra": "^9.0.1", "hdr-histogram-js": "^2.0.0-beta6", "js-yaml": "^3.14.0", - "lookpath": "^1.1.0" + "lookpath": "^1.1.0", + "neat-csv": "^6.0.1" }, "devDependencies": { "@types/autocannon": "^4.1.0" diff --git a/app/queries/src/executors/autocannon/index.ts b/app/queries/src/executors/autocannon/index.ts index 73f361a..da53922 100644 --- a/app/queries/src/executors/autocannon/index.ts +++ b/app/queries/src/executors/autocannon/index.ts @@ -4,7 +4,7 @@ * ======================== */ -import { makeBenchmarkMetrics, BenchmarkExecutor } from '../base' +import { makeBenchmarkMetrics, BenchmarkExecutor } from "../base"; import { BenchmarkMetrics, @@ -15,58 +15,92 @@ import { MaxRequestsInDurationBenchmark, MultiStageBenchmark, RequestsPerSecondBenchmark, -} from '../base/types' +} from "../base/types"; -import { RunAutocannonMetadata } from './types' +import { RunAutocannonMetadata } from "./types"; -import autocannon from 'autocannon' -import { Options as AutocannonOptions } from 'autocannon' +import autocannon, { Client, Request } from "autocannon"; +import { Options as AutocannonOptions } from "autocannon"; -import fs from 'fs-extra' -import path from 'path' -import * as hdr from 'hdr-histogram-js' +import fs from "fs-extra"; +import path from "path"; +import * as hdr from "hdr-histogram-js"; +import { Utils } from "../base/utils"; export class AutocannonExecutor extends BenchmarkExecutor { - public tool = BenchmarkTool.AUTOCANNON - private reportPath = path.join(this.baseReportPath, 'autocannon') + public tool = BenchmarkTool.AUTOCANNON; + private reportPath = path.join(this.baseReportPath, "autocannon"); + + private benchMarkConfig: Benchmark; + private fileVariables: any[]; + private fileVariablesCurrentIndex = 0; private _makeSharedFields(bench: Benchmark): AutocannonOptions { return { url: this.config.url, - headers: this.config.headers, - method: 'POST', connections: bench.connections || 10, - body: JSON.stringify({ - query: bench.query, - variables: bench.variables, - }), - } + /** + * Sets the request body in each client. No. of clients created = no. of connections. + * For each client iterate over the variables (one row) read from the CSV file and set it in the request, thus each client queries with different variables. + * Suggested number of rows in CSV = Number of connections. + * @param client Autocannon client + */ + setupClient: (client) => { + // If debug, log each response body + if (this.config.debug) { + client.on("body", console.log); + } + if (this.fileVariablesCurrentIndex >= this.fileVariables.length) { + this.fileVariablesCurrentIndex = 0; + } + let req: Request = { + headers: this.config.headers, + method: "POST", + body: JSON.stringify({ + query: this.benchMarkConfig.query, + variables: { ...this.benchMarkConfig.variables, ...this.fileVariables[this.fileVariablesCurrentIndex] }, + }), + }; + client.setRequest(req); + this.fileVariablesCurrentIndex++; + } + }; } - public runCustomBench(bench: CustomBenchmark) { - const baseOpts = this._makeSharedFields(bench) + + + private _generateAutocannonMetadata(bench: Benchmark): RunAutocannonMetadata { const queryName = this._makeBenchmarkName(bench) - const metadata = { + return { queryName, - outputFile: `${queryName}.json`, + outputFile: `${queryName}.json` } + } + + private async _generateAutoCannonOptions(bench: Benchmark): Promise { + this.benchMarkConfig = bench; + this.fileVariables = await Utils.readVariablesFromFile(bench); + + return this._makeSharedFields(bench) + } + + public async runCustomBench(bench: CustomBenchmark) { + const baseOpts = await this._generateAutocannonMetadata(bench); + const metadata = this._generateAutocannonMetadata(bench) return this._runAutocannon(metadata, { ...baseOpts, ...bench.options.autocannon, - }) + }); } public runMultiStageBench(bench: MultiStageBenchmark): never { - throw new Error('Not Implemented') + throw new Error("Not Implemented"); } - public runRequestsPerSecondBench(bench: RequestsPerSecondBenchmark) { - const baseOpts = this._makeSharedFields(bench) - const queryName = this._makeBenchmarkName(bench) - const metadata = { - queryName, - outputFile: `${queryName}.json`, - } + public async runRequestsPerSecondBench(bench: RequestsPerSecondBenchmark) { + + const baseOpts = await this._generateAutoCannonOptions(bench) + const metadata = this._generateAutocannonMetadata(bench); return this._runAutocannon(metadata, { ...baseOpts, duration: bench.duration, @@ -74,63 +108,53 @@ export class AutocannonExecutor extends BenchmarkExecutor { }) } - public runFixedRequestNumberBench(bench: FixedRequestNumberBenchmark) { - const baseOpts = this._makeSharedFields(bench) - const queryName = this._makeBenchmarkName(bench) - const metadata = { - queryName, - outputFile: `${queryName}.json`, - } + public async runFixedRequestNumberBench(bench: FixedRequestNumberBenchmark) { + const baseOpts = await this._generateAutoCannonOptions(bench) + const metadata = this._generateAutocannonMetadata(bench); return this._runAutocannon(metadata, { ...baseOpts, amount: bench.requests, - }) + }); } - public runMaxRequestsInDurationBench(bench: MaxRequestsInDurationBenchmark) { - const baseOpts = this._makeSharedFields(bench) - const queryName = this._makeBenchmarkName(bench) - const metadata = { - queryName, - outputFile: `${queryName}.json`, - } + public async runMaxRequestsInDurationBench(bench: MaxRequestsInDurationBenchmark) { + const baseOpts = await this._generateAutoCannonOptions(bench) + const metadata = this._generateAutocannonMetadata(bench); return this._runAutocannon(metadata, { ...baseOpts, duration: bench.duration, - }) + }); } + // TODO: use worker threads, that means any function input to autocannon ned to go to separate js files which are "require"able private async _runAutocannon( metadata: RunAutocannonMetadata, config: AutocannonOptions ) { - const { queryName, outputFile } = metadata - // If debug, log each response body - if (this.config.debug) - config.setupClient = (client) => client.on('body', console.log) + const { queryName, outputFile } = metadata; const instance = autocannon(config, (err, results) => { - if (err) throw err - }) + if (err) throw err; + }); - const histogram = hdr.build() - instance.on('response', (client, statusCode, resBytes, responseTime) => { - histogram.recordValue(responseTime) - }) + const histogram = hdr.build(); + instance.on("response", (client, statusCode, resBytes, responseTime) => { + histogram.recordValue(responseTime); + }); autocannon.track(instance, { outputStream: this.config.writeStream || process.stdout, renderProgressBar: true, renderLatencyTable: true, renderResultsTable: true, - }) + }); // Wrap this in a Promise to force waiting for Autocannon run to finish return new Promise((resolve) => { - instance.on('done', (results) => { + instance.on("done", (results) => { // Write Autocannon results object to output file - const outfile = path.join(this.reportPath, outputFile) - fs.outputJSONSync(outfile, results) + const outfile = path.join(this.reportPath, outputFile); + fs.outputJSONSync(outfile, results); // Build and return Metrics object const metrics = makeBenchmarkMetrics({ name: metadata.queryName, @@ -147,9 +171,9 @@ export class AutocannonExecutor extends BenchmarkExecutor { count: results.requests.total, average: results.requests.average, }, - }) - resolve(metrics) - }) - }) + }); + resolve(metrics); + }); + }); } } diff --git a/app/queries/src/executors/base/index.ts b/app/queries/src/executors/base/index.ts index 14e3496..ea712bf 100644 --- a/app/queries/src/executors/base/index.ts +++ b/app/queries/src/executors/base/index.ts @@ -68,7 +68,7 @@ export abstract class BenchmarkExecutor { public config: GlobalConfig /** Path to the configuration file to load */ - private configPath = path.join(__dirname, '../../config.yaml') + // private configPath = path.join(__dirname, '../../config.yaml') /** Path to the reports directory */ public baseReportPath = path.join(__dirname, '../../../reports') @@ -99,8 +99,8 @@ export abstract class BenchmarkExecutor { } } - public readConfigFile(pathTo?: string): GlobalConfig { - const configFile = fs.readFileSync(pathTo || this.configPath, 'utf-8') + public readConfigFile(pathTo: string): GlobalConfig { + const configFile = fs.readFileSync(pathTo , 'utf-8') return yaml.load(configFile) } diff --git a/app/queries/src/executors/base/types.ts b/app/queries/src/executors/base/types.ts index b4b6202..62a8e13 100644 --- a/app/queries/src/executors/base/types.ts +++ b/app/queries/src/executors/base/types.ts @@ -59,6 +59,11 @@ interface _BenchmarkRunConfig { query: string variables?: Record connections?: number + variable_file?: VariableFile +} + +export interface VariableFile { + file_path: string } /** diff --git a/app/queries/src/executors/base/utils.ts b/app/queries/src/executors/base/utils.ts new file mode 100644 index 0000000..84d2489 --- /dev/null +++ b/app/queries/src/executors/base/utils.ts @@ -0,0 +1,40 @@ +import neatCsv = require("neat-csv"); +import { Benchmark } from "./types"; +import fs from "fs-extra"; +import path from "path"; + + +export class Utils { + + /** + * Asynchronously Read values from the CSV file and output as an Array of JSON values. The JSOn Object keys are the first row in the file, + * which must also be the variable name. Also supports nested Variables which are themselves JSON. + * + * @param bench Benchmark + * @returns neatCsv.Row[] - Array of objects pertaining to each row in the CSV file + */ + static async readVariablesFromFile(bench: Benchmark) { + let output: neatCsv.Row[] = []; + if ( + bench.variable_file && + bench.variable_file.file_path + ) { + let varCsvFile = fs.createReadStream( + path.join(process.cwd(), bench.variable_file.file_path) + ); + let rows = await neatCsv(varCsvFile); + rows.forEach((row) => { + const keys = Object.keys(row); + keys.forEach((key) => { + try { + row[key] = JSON.parse(row[key]); + } catch (error) { + // Suppress the unparseable json error. + } + }); + }); + output = rows; + } + return output; + } +} diff --git a/app/queries/src/executors/k6/index.ts b/app/queries/src/executors/k6/index.ts index b8b26f9..5383b16 100644 --- a/app/queries/src/executors/k6/index.ts +++ b/app/queries/src/executors/k6/index.ts @@ -29,6 +29,7 @@ import { BenchmarkExecutor, makeBenchmarkMetrics } from '../base' import execa from 'execa' import { lookpath } from 'lookpath' +import { Utils } from '../base/utils' interface RunK6Metadata { queryName: string @@ -41,11 +42,11 @@ export class K6Executor extends BenchmarkExecutor { private k6BinaryPath = path.join(__dirname, 'k6', 'k6') private reportPath = path.join(this.baseReportPath, 'k6') - public runCustomBench(bench: CustomBenchmark) { + public async runCustomBench(bench: CustomBenchmark) { // Need to set the url, headers, query, and variables ENV or it won't work if (bench.options.k6?.scenarios) { for (let scenario in bench.options.k6?.scenarios) { - bench.options.k6.scenarios[scenario].env = this._makeScenarioEnv(bench) + bench.options.k6.scenarios[scenario].env = await this._makeScenarioEnv(bench) } } @@ -58,14 +59,14 @@ export class K6Executor extends BenchmarkExecutor { return this._runK6(metadata, bench.options.k6 as K6Options) } - public runMultiStageBench(bench: MultiStageBenchmark) { + public async runMultiStageBench(bench: MultiStageBenchmark) { const scenario: RampingArrivalRateExecutor = { executor: 'ramping-arrival-rate', startRate: bench.initial_rps, timeUnit: '1s', preAllocatedVUs: bench.connections || 10, stages: bench.stages, - env: this._makeScenarioEnv(bench), + env: await this._makeScenarioEnv(bench), } const queryName = this._makeBenchmarkName(bench) @@ -81,14 +82,14 @@ export class K6Executor extends BenchmarkExecutor { }) } - public runRequestsPerSecondBench(bench: RequestsPerSecondBenchmark) { + public async runRequestsPerSecondBench(bench: RequestsPerSecondBenchmark) { const scenario: ConstantArrivalRateExecutor = { executor: 'constant-arrival-rate', rate: bench.rps, timeUnit: '1s', duration: bench.duration, preAllocatedVUs: bench.connections || 10, - env: this._makeScenarioEnv(bench), + env: await this._makeScenarioEnv(bench), } const queryName = this._makeBenchmarkName(bench) @@ -104,12 +105,12 @@ export class K6Executor extends BenchmarkExecutor { }) } - public runFixedRequestNumberBench(bench: FixedRequestNumberBenchmark) { + public async runFixedRequestNumberBench(bench: FixedRequestNumberBenchmark) { const scenario: PerVUIterationsExecutor = { executor: 'per-vu-iterations', iterations: bench.requests / (bench.connections || 10), vus: bench.connections || 10, - env: this._makeScenarioEnv(bench), + env: await this._makeScenarioEnv(bench), } const queryName = this._makeBenchmarkName(bench) @@ -125,12 +126,12 @@ export class K6Executor extends BenchmarkExecutor { }) } - public runMaxRequestsInDurationBench(bench: MaxRequestsInDurationBenchmark) { + public async runMaxRequestsInDurationBench(bench: MaxRequestsInDurationBenchmark) { const scenario: ConstantVUExecutor = { executor: 'constant-vus', duration: bench.duration, vus: bench.connections || 10, - env: this._makeScenarioEnv(bench), + env: await this._makeScenarioEnv(bench), } const queryName = this._makeBenchmarkName(bench) @@ -149,21 +150,24 @@ export class K6Executor extends BenchmarkExecutor { /** * Must return non-nested JSON for k6, hence the need to stringify headers and variables */ - private _makeScenarioEnv(bench: Benchmark) { + private async _makeScenarioEnv(bench: Benchmark) { return { url: this.config.url, query: bench.query, headers: JSON.stringify(this.config.headers), variables: JSON.stringify(bench.variables), + fileVariables: JSON.stringify(await Utils.readVariablesFromFile(bench)) // Variables read from file } } private async getBinaryPath() { - const defaultPath = await lookpath('k6') - if (defaultPath) return defaultPath + // Prefer local binary to global one if local is present. const localK6Binary = path.join(this.localBinaryFolder, 'k6/k6') const localBinaryExists = await fs.pathExists(localK6Binary) if (localBinaryExists) return localK6Binary + + const globalK6Path = await lookpath('k6') + if (globalK6Path) return globalK6Path throw new Error( 'Could not find K6 binary either globally in $PATH or in local ./bin/k6 folder' ) diff --git a/app/queries/src/executors/wrk2/index.ts b/app/queries/src/executors/wrk2/index.ts index 0f70634..f6aebef 100644 --- a/app/queries/src/executors/wrk2/index.ts +++ b/app/queries/src/executors/wrk2/index.ts @@ -26,6 +26,7 @@ import path from 'path' import fs from 'fs-extra' import execa from 'execa' import { lookpath } from 'lookpath' +import { Utils } from '../base/utils' export class Wrk2Executor extends BenchmarkExecutor { public tool = BenchmarkTool.WRK2 @@ -118,6 +119,7 @@ export class Wrk2Executor extends BenchmarkExecutor { query: bench.query, variables: bench.variables, headers: this.config.headers, + fileVariables: await Utils.readVariablesFromFile(bench) }, }) } @@ -138,11 +140,13 @@ export class Wrk2Executor extends BenchmarkExecutor { latency: true, duration: bench.duration, rate: bench.rps, + connections: bench.connections // Add connections }, config: { query: bench.query, variables: bench.variables, headers: this.config.headers, + fileVariables: await Utils.readVariablesFromFile(bench) }, }) } @@ -162,12 +166,14 @@ export class Wrk2Executor extends BenchmarkExecutor { return args } - private async getBinaryPath() { - const defaultPath = await lookpath('wrk') - if (defaultPath) return defaultPath + private async getBinaryPath() { + // If available, Prefer local binary to wrk binary in PATH const localWrkBinary = path.join(this.localBinaryFolder, 'wrk/wrk') const localBinaryExists = await fs.pathExists(localWrkBinary) if (localBinaryExists) return localWrkBinary + + const defaultPath = await lookpath('wrk') + if (defaultPath) return defaultPath throw new Error( 'Could not find wrk binary either globally in $PATH or in local ./bin/wrk folder' ) @@ -194,7 +200,7 @@ export class Wrk2Executor extends BenchmarkExecutor { const output = await wrk const end = new Date() - const stats = JSON.parse(output.stderr) + const stats = (output.stderr) ? JSON.parse(output.stderr): null; // Also emits these same stats to stderr, so could make script not write stat file and just read from there // const stats: Wrk2StatsOutputJson = await fs.readJSON('/tmp/wrk2-stats.json') if (!stats) throw new Error('Failed reading stats output from wrk stderr') diff --git a/app/queries/src/executors/wrk2/types.ts b/app/queries/src/executors/wrk2/types.ts index 8c6c97a..bc96e1b 100644 --- a/app/queries/src/executors/wrk2/types.ts +++ b/app/queries/src/executors/wrk2/types.ts @@ -9,6 +9,7 @@ export interface Wrk2BinaryArgs { query: string variables?: Record headers?: Record + fileVariables?: Record } } diff --git a/app/subscriptions/package.json b/app/subscriptions/package.json index b69caeb..f2e1372 100644 --- a/app/subscriptions/package.json +++ b/app/subscriptions/package.json @@ -27,7 +27,8 @@ "pg": "^8.2.1", "reattempt": "^0.1.1", "websocket-as-promised": "^1.0.1", - "ws": "^7.3.0" + "ws": "^7.3.0", + "neat-csv": "^6.0.1" }, "devDependencies": { "@types/graphql": "^14.0.0", diff --git a/app/subscriptions/src/main.ts b/app/subscriptions/src/main.ts index e2aad6a..99dcfe3 100644 --- a/app/subscriptions/src/main.ts +++ b/app/subscriptions/src/main.ts @@ -71,6 +71,7 @@ export interface SockerManagerConfig { label: string endpoint: string variables: object + fileVariables?: any[] headers?: Record maxConnections: number insertPayloadData: boolean @@ -84,6 +85,7 @@ export class SocketManager { public connections: { [id: number]: Connection } = {} public config: SockerManagerConfig public queryArgGenerator: Iterator + private fileVariablesCurrentIndex = 0; constructor(config: SockerManagerConfig) { this.config = config @@ -109,10 +111,26 @@ export class SocketManager { .insertGraph(this.allEvents) } +/** + * Returns a row from the CSV by running an iterator. If the iterator exceeds the number of rows in the file, it is reset. + * @returns One row from the CSV file determined by fileVariablesCurrentIndex + */ + private getRowFromFileVariables() { + if (this.fileVariablesCurrentIndex >= this.config.fileVariables.length) { + this.fileVariablesCurrentIndex = 0; + } + const output = this.config.fileVariables[this.fileVariablesCurrentIndex]; + this.fileVariablesCurrentIndex++; + return output; + } + public async spawnConnection() { const socketId = this.nextSocketId++ const socketManagerConfig = this.config - const queryVariables = this.queryArgGenerator.next().value + let queryVariables = this.queryArgGenerator.next().value + if (this.config.fileVariables.length !=0) { + queryVariables = {...queryVariables, ...this.getRowFromFileVariables()} + } try { const connection = new Connection({ id: socketId, @@ -269,7 +287,7 @@ async function assertDatabaseConnection() { }) } -function prettyPrintConfig(options) { +function prettyPrintConfig(options: SubscriptionBenchConfig) { console.table({ url: options.url, db_connection_string: options.db_connection_string, @@ -281,6 +299,8 @@ function prettyPrintConfig(options) { 'connections_per_second', ]) console.table({ variables: options.config.variables }) + if (options.config.variable_file) console.table({ variable_file: options.config.variable_file.file_path }) + } /** @@ -345,11 +365,17 @@ export async function main(opts: SubscriptionBenchConfig) { * Execution */ - const socketManagerParams = yamlConfigToSocketManagerParams(options) + const socketManagerParams = await yamlConfigToSocketManagerParams(options) const socketManager = new SocketManager(socketManagerParams) const MAX_CONNECTIONS = options.config.max_connections - const SPAWN_RATE = 1000 / options.config.connections_per_second + + let SPAWN_RATE; + if (options.config.connections_per_second > 0) { + SPAWN_RATE = 1000 / options.config.connections_per_second + } else { + SPAWN_RATE = 0; // Open connections as fast as possible + } let socketSpawned = 0 const spawnFn = () => { diff --git a/app/subscriptions/src/utils.ts b/app/subscriptions/src/utils.ts index f36e597..10fe0dd 100644 --- a/app/subscriptions/src/utils.ts +++ b/app/subscriptions/src/utils.ts @@ -2,6 +2,7 @@ import { SockerManagerConfig } from './main' import yaml from 'js-yaml' import fs from 'fs' import path from 'path' +import neatCsv = require("neat-csv"); export const GQL = { START: 'start', @@ -25,6 +26,7 @@ function* makeRangeIterator(start, end) { } const isRangeVariable = (obj) => obj.start != null && obj.end != null + export function* argumentGenerator(args) { // Clone holds the original args, and the iterator values let internal = Object.assign({}, args) @@ -62,6 +64,11 @@ export interface QueryConfig { connections_per_second: number query: string variables: Record + variable_file?: VariableFile +} + +export interface VariableFile { + file_path: string } export interface Range { @@ -69,9 +76,9 @@ export interface Range { end: number } -export const yamlConfigToSocketManagerParams = ( +export const yamlConfigToSocketManagerParams = async ( options: SubscriptionBenchConfig -): SockerManagerConfig => ({ +): Promise => ({ label: options.config.label, endpoint: options.url, variables: options.config.variables, @@ -81,6 +88,7 @@ export const yamlConfigToSocketManagerParams = ( pgConnectionString: options.db_connection_string, subscriptionString: options.config.query, insertPayloadData: options.config.insert_payload_data ?? true, + fileVariables: await Utils.readVariablesFromFile(options) }) export const COLORS = { @@ -91,3 +99,37 @@ export const COLORS = { RESET: '\x1b[0m', BLINK: '\x1b[5m', } + +export class Utils { + + /** + * Asynchronously Read values from the CSV file and output as an Array of JSON values. The JSOn Object keys are the first row in the file, + * which must also be the variable name. Also supports nested Variables which are themselves JSON. + * + * @param bench Benchmark + * @returns neatCsv.Row[] - Array of objects pertaining to each row in the CSV file/Empty Array + */ + static async readVariablesFromFile(bench: SubscriptionBenchConfig) { + let output: neatCsv.Row[] = []; + if ( + bench.config.variable_file?.file_path + ) { + let varCsvFile = fs.createReadStream( + path.join(process.cwd(), bench.config.variable_file?.file_path) + ); + let rows = await neatCsv(varCsvFile); + rows.forEach((row) => { + const keys = Object.keys(row); + keys.forEach((key) => { + try { + row[key] = JSON.parse(row[key]); + } catch (error) { + // Suppress the unparseable json error. + } + }); + }); + output = rows; + } + return output; + } +}