diff --git a/README.md b/README.md index be8bfd5..99620ff 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ io(new EventEmitter): the channel of internal communication with the job work processJob(core.process):function to run a job. (task, config, ondone) pluginDirs: the directories in which to look for plugins dataDir($HOME/.strider): the directory in which to clone/test/etc +concurrentJobs(1): maximum number of jobs to execute at once ``` ### Events diff --git a/lib/index.js b/lib/index.js index 95951f9..b3d9181 100644 --- a/lib/index.js +++ b/lib/index.js @@ -12,6 +12,8 @@ var fs = require('fs-extra') , cachier = require('./cachier') , keeper = require('dirkeeper') , JobData = require('./jobdata') + , JobQueue = require('./jobqueue') + , branchFromJob = require('./utils').branchFromJob // timeout for callbacks. Helps to kill misbehaving plugins, etc function t(time, done) { @@ -56,11 +58,12 @@ function Runner(emitter, config) { logger: console, processJob: core.process, pluginDir: path.join(__dirname, '../node_modules'), - dataDir: process.env.STRIDER_CLONE_DEST || dotStrider + dataDir: process.env.STRIDER_CLONE_DEST || dotStrider, + concurrentJobs: parseInt(process.env.CONCURRENT_JOBS || '1', 10) || 1, }, config) this.emitter = emitter this.log = this.config.logger.log - this.queue = async.queue(this.processJob.bind(this), 1) + this.queue = new JobQueue(this.processJob.bind(this), this.config.concurrentJobs) this.io = this.config.io this.callbackMap = {} this.hooks = [] @@ -68,16 +71,15 @@ function Runner(emitter, config) { this.attach() } -// base: the base directory where all jobs data is stored +// base: the base directory where all jobs data for this project and branch is stored // the job object. // done(err, {base:, data:, cache:}) -function initJobDirs(base, job, cache, done) { - var name = job.project.name - , dirs = { - base: base, - data: path.join(base, "data", name.replace('/','-') + "-" + job._id.toString()), - cache: cache - } +function initJobDirs(branchBase, job, cache, done) { + var dirs = { + base: branchBase, + data: path.join(branchBase, 'job-' + job._id.toString()), + cache: cache + } async.series([ function checkData(next) { @@ -179,13 +181,13 @@ Runner.prototype = { this.jobdata.add(job) this.log('[runner:' + this.id + '] Queued new job. Project: ' + job.project.name + ' Job ID: ' + job._id) this.emitter.emit('browser.update', job.project.name, 'job.status.queued', [job._id, now]) - this.queue.push({job: job, config: config}) + this.queue.push(job, config) }, cancelJob: function (id) { var jobdata for (var i=0; i offset) { + var task = this.tasks[offset] + + if (task.key in this.active) { + // This task cannot run right now, so skip it. + offset += 1 + } else { + // This task is eligible to run. Remove it from the queue and prepare it to launch. + this.tasks.splice(offset, 1) + launchTasks.push(task) + } + } + + // Create a task completion callback. Remove the task from the active set, invoke the tasks' + // push() callback, then drain() again to see if another task is ready to run. + var makeTaskHandler = function (task) { + return function (err) { + delete self.active[task.key] + + task.callback(err) + + // Defer the next drain() call again in case the task's callback was synchronous. + process.nextTick(self.drain.bind(self)) + } + } + + // Launch the queue handler for each chosen task. + for (var i = 0; i < launchTasks.length; i++) { + var each = launchTasks[i] + + this.active[each.key] = each + + this.handler(each.job, each.config, makeTaskHandler(each)) + } + + // Fire and unset the drain callback if one has been registered. + if (this.drainCallback) { + var lastCallback = this.drainCallback + this.drainCallback = null + lastCallback() + } + }, + + // Count the number of tasks waiting on the queue. + length: function () { + return this.tasks.length + }, + + // Return true if "id" corresponds to the job ID of an active job. + isActive: function (id) { + for (var key in this.active) { + if (this.active.hasOwnProperty(key) && this.active[key].id === id) { + return true + } + } + return false + }, + + // Fire a callback the next time that a drain() is executed. + onNextDrain: function (callback) { + this.drainCallback = callback + } +} diff --git a/lib/utils.js b/lib/utils.js index 3b38285..756dd00 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -1,10 +1,11 @@ var _ = require('lodash') - , consts = require('./consts') + , stringify = require('json-stable-stringify') module.exports = { - ensureCommand: ensureCommand + ensureCommand: ensureCommand, + branchFromJob: branchFromJob } function ensureCommand(phase) { @@ -16,3 +17,23 @@ function ensureCommand(phase) { return command } +// Extract a branch name, suitable for use as a filesystem path, from the contents of the job's +// ref field. Prefer common ref structures when available (branch, fetch) but fall back to something +// that's ugly but unique and stable for arbitrary ref payloads. +function branchFromJob(job) { + var ref = job.ref + + if (typeof ref === 'undefined') { + return '' + } + + if ('branch' in ref) { + return ref.branch + } else if ('fetch' in ref) { + return ref.fetch + } else { + // This is going to be incredibly ugly, but it will be (a) consistent for consistent refs and + // (b) include only filesystem-safe characters. + return encodeURIComponent(stringify(ref)) + } +} diff --git a/package.json b/package.json index e84b631..73e835b 100644 --- a/package.json +++ b/package.json @@ -15,7 +15,8 @@ "strider-runner-core": "~2.0.0", "strider-extension-loader": "~0.4.3", "fs-extra": "~0.8.1", - "dirkeeper": "~0.2.0" + "dirkeeper": "~0.2.0", + "json-stable-stringify": "~1.0.1" }, "devDependencies": { "mocha": "^1.21.1", diff --git a/test/test_jobqueue.js b/test/test_jobqueue.js new file mode 100644 index 0000000..74565b0 --- /dev/null +++ b/test/test_jobqueue.js @@ -0,0 +1,178 @@ + +var expect = require('expect.js') + , async = require('async') + , JobQueue = require('../lib/jobqueue.js'); + +describe('JobQueue', function () { + var q, handlers + + beforeEach(function () { + handlers = {} + }) + + function makeJob(jid, project, branch) { + return { + _id: jid, + project: project, + ref: { branch: branch } + } + } + + // Configure a handler to be updated when handlerDispatch executes a job with an expected ID. + function expectJobs() { + function defaultFinish() { + throw new Error('Job ' + jid + ' was never handled') + } + + for (var i = 0; i < arguments.length; i++) { + var jid = arguments[i] + + handlers[jid] = { + wasCalled: false, + finish: defaultFinish + } + } + } + + // Execute a function after each queue drain. Each function should finish with an action that + // causes a drain to occur (push a new task or complete an active task). + function onEachDrain(steps, done) { + var wrappedSteps = [] + + var makeStep = function (arg) { + return function (cb) { + q.onNextDrain(cb) + + arg() + } + } + + for (var i = 0; i < steps.length; i++) { + wrappedSteps.push(makeStep(steps[i])) + } + + async.series(wrappedSteps, done) + } + + // JobQueue handler function that manipulates handler objects set up in advance with expectJob. + function handlerDispatch(job, config, cb) { + var jid = job._id + var handler = handlers[jid] + if (!handler) { + return cb(new Error('Unexpected job id ' + jid)) + } + + handler.wasCalled = true + handler.finish = function () { + cb(null) + } + } + + describe('with concurrency 1', function () { + beforeEach(function () { + q = new JobQueue(handlerDispatch, 1) + }) + + it('executes on push on next tick when unsaturated', function (done) { + expectJobs(1) + + q.push(makeJob(1, 'foo/bar', 'master')) + + q.onNextDrain(function () { + expect(q.isActive(1)).to.be(true) + expect(handlers[1].wasCalled).to.be(true) + + handlers[1].finish() + done() + }) + }) + + it('waits for an available task slot when saturated', function (done) { + expectJobs(1, 2) + + onEachDrain([ + function () { + q.push(makeJob(1, 'foo/bar1', 'master')) + }, + function () { + expect(handlers[1].wasCalled).to.be(true) + q.push(makeJob(2, 'foo/bar2', 'master')) + }, + function () { + expect(handlers[2].wasCalled).to.be(false) + handlers[1].finish() + } + ], function () { + expect(handlers[2].wasCalled).to.be(true) + + handlers[2].finish() + done() + }); + }) + }) + + describe('with concurrency 2', function () { + beforeEach(function () { + q = new JobQueue(handlerDispatch, 2) + }) + + it('executes the first two tasks immediately, then waits for task completion', function (done) { + expectJobs(1, 2, 3) + + onEachDrain([ + function () { + q.push(makeJob(1, 'foo/bar1', 'master')) + }, + function () { + expect(handlers[1].wasCalled).to.be(true) + q.push(makeJob(2, 'foo/bar2', 'master')) + }, + function () { + expect(handlers[2].wasCalled).to.be(true) + q.push(makeJob(3, 'foo/bar3', 'master')) + }, + function () { + // The queue was saturated when job 3 was added. It should not have run yet + expect(handlers[3].wasCalled).to.be(false) + handlers[2].finish() + }, + function () { + expect(handlers[3].wasCalled).to.be(true) + + // Call the handlers for jobs 1 and 3 to be tidy. + handlers[1].finish() + } + ], function () { + handlers[3].finish() + done() + }) + }) + + it('prevents scheduling concurrent jobs on the same branch', function (done) { + expectJobs(1, 2) + + onEachDrain([ + function () { + q.push(makeJob(1, 'foo/bar', 'master')) + }, + function () { + expect(handlers[1].wasCalled).to.be(true) + + q.push(makeJob(2, 'foo/bar', 'master')) + }, + function () { + // Even though the queue is unsaturated, job 2 should not be handled yet, because it's + // for the same project and branch as a running job. + expect(handlers[2].wasCalled).to.be(false) + + handlers[1].finish() + }, + function () { + expect(handlers[2].wasCalled).to.be(true) + + handlers[2].finish() + } + ], done) + }) + }) +})