diff --git a/Dockerfile b/Dockerfile index 299d168..376b74c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,8 @@ -FROM node:12-alpine +FROM node:20-alpine #ENV PATH $PATH:/node_modules/.bin COPY . /hyperflow -RUN npm install -g /hyperflow +WORKDIR /hyperflow +RUN npm install +RUN npm install -g . diff --git a/Makefile b/Makefile index 616dfe4..b466d3d 100644 --- a/Makefile +++ b/Makefile @@ -8,7 +8,7 @@ all: push container: image image: - docker build -t $(PREFIX)/$(REPO_NAME) . # Build new image and automatically tag it as latest + docker build --no-cache -t $(PREFIX)/$(REPO_NAME) . # Build new image and automatically tag it as latest docker tag $(PREFIX)/$(REPO_NAME) $(PREFIX)/$(REPO_NAME):$(TAG) # Add the version tag to the latest image push: image diff --git a/functions/kubernetes/ADMISSION_CONTROLLER.md b/functions/kubernetes/ADMISSION_CONTROLLER.md new file mode 100644 index 0000000..226ccf7 --- /dev/null +++ b/functions/kubernetes/ADMISSION_CONTROLLER.md @@ -0,0 +1,145 @@ +# Kubernetes Admission Controller + +## Overview + +The K8s Admission Controller prevents overwhelming the Kubernetes scheduler when submitting large numbers of workflow tasks as Pods. + +### Problem + +When thousands of tasks are ready to execute, creating thousands of Pods immediately can overwhelm the Kubernetes scheduler, resulting in many Pods stuck in Pending state and API throttling (HTTP 429 errors). + +### Solution + +The admission controller implements a local gate using: + +1. **Watch-based state tracking** - Monitors Pod state via long-lived watch (no polling) +2. **Token bucket rate limiter** - Smoothly paces Pod creation requests +3. **Concurrency window** - Caps the number of pending Pods +4. **Adaptive tuning** - Automatically adjusts limits based on observed cluster throughput + +## How It Works + +### Two-Gate Model + +Every task submission must pass through two gates: + +**Gate 1: Concurrency Window** +``` +IF pendingCount >= pendingMax THEN wait +``` +- Prevents too many Pods stuck in Pending state +- `pendingCount` tracked in real-time via Kubernetes Watch API +- `pendingMax` is the maximum allowed pending Pods + +**Gate 2: Token Bucket** +``` +IF tokens < 1 THEN wait +tokens = tokens - 1 +``` +- Smooths out bursts and paces submissions over time +- Tokens refill continuously at rate `fillRate` (tokens/second) +- Maximum token accumulation capped at `burst` + +### Token Bucket Refill + +``` +elapsed = currentTime - lastRefillTime +newTokens = elapsed * fillRate +tokens = min(burst, tokens + newTokens) +``` + +**Parameters:** +- `fillRate` - Tokens added per second (controls submission rate) +- `burst` - Maximum token accumulation (handles bursts) +- `tokens` - Current available tokens + +### Adaptive Tuning + +Every second, the controller measures cluster performance and adjusts parameters: + +**Throughput Measurement:** +``` +currentRate = Pending→Running transitions / elapsed time (pods/sec) +runningRateEWMA = α × currentRate + (1 - α) × runningRateEWMA +``` + +**Fill Rate Adaptation:** +``` +targetRate = max(1.0, 1.2 × runningRateEWMA) +errorPenalty = max(0.5, 1 - 2 × createErrorEWMA) +fillRate = max(1.0, min(configuredRate × 2, targetRate × errorPenalty)) +``` + +**Pending Max Adaptation (with Hysteresis):** +``` +minuteBuffer = round(60 × max(0.5, runningRateEWMA)) +targetPendingMax = clamp(minuteBuffer, minPendingMax, maxPendingMax) + +IF targetPendingMax > pendingMax THEN + pendingMax = targetPendingMax // Increase immediately (aggressive) +ELSE IF targetPendingMax < pendingMax AND timeSinceLastDecrease > 2 minutes THEN + pendingMax = targetPendingMax // Decrease only if sustained (conservative) + lastPendingMaxDecrease = now +ELSE + // Keep current pendingMax (ignore temporary dip) +END +``` + +**Parameters:** +- `α` (alpha) = 0.2 - EWMA smoothing factor (higher = more reactive) +- `runningRateEWMA` - Exponentially weighted moving average of scheduling rate +- `createErrorEWMA` - EWMA of API error rate (0-1) +- `minPendingMax` = 50 - Minimum allowed pending limit +- `maxPendingMax` = 2000 - Maximum allowed pending limit +- `hysteresisWindow` = 2 minutes - Minimum time before decreasing pendingMax + +**Adaptive behavior:** +- **High throughput observed** → increase `fillRate` to submit more +- **API errors detected** → reduce `fillRate` via error penalty +- **Fast scheduling** → increase `pendingMax` immediately (aggressive) +- **Temporary slowdown** → keep `pendingMax` high (prevent under-utilization) +- **Sustained slowdown (>2 min)** → decrease `pendingMax` conservatively + +## Configuration + +### Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `HF_VAR_ADMISSION_CONTROLLER` | `1` | Set to `0` to disable | +| `HF_VAR_ADMISSION_PENDING_MAX` | `200` | Max pending Pods | +| `HF_VAR_ADMISSION_FILL_RATE` | `1` | Token fill rate (tokens/sec) | +| `HF_VAR_ADMISSION_BURST` | `20` | Max token bucket size | +| `HF_VAR_ADMISSION_ADAPTIVE` | `1` | Set to `0` to disable adaptive tuning | +| `HF_VAR_ADMISSION_DEBUG` | `0` | Set to `1` to enable debug logging | + +## Usage + +The admission controller is **automatically enabled** when using the standard HyperFlow k8s executor. No code changes required. + +To disable: +```bash +export HF_VAR_ADMISSION_CONTROLLER=0 +``` + +## Tuning Examples + +### Conservative (Small Clusters) +```bash +export HF_VAR_ADMISSION_PENDING_MAX=50 +export HF_VAR_ADMISSION_FILL_RATE=1 +export HF_VAR_ADMISSION_BURST=10 +``` + +### Aggressive (Large Clusters) +```bash +export HF_VAR_ADMISSION_PENDING_MAX=500 +export HF_VAR_ADMISSION_FILL_RATE=10 +export HF_VAR_ADMISSION_BURST=50 +``` + +### Adaptive (Recommended) +```bash +export HF_VAR_ADMISSION_ADAPTIVE=1 +# Controller automatically adjusts based on observed cluster performance +``` diff --git a/functions/kubernetes/k8sAdmissionController.js b/functions/kubernetes/k8sAdmissionController.js new file mode 100644 index 0000000..3520ec8 --- /dev/null +++ b/functions/kubernetes/k8sAdmissionController.js @@ -0,0 +1,515 @@ +/** + * K8s Admission Controller - Rate limiter and concurrency gate for Pod creation + * + * Prevents overwhelming the Kubernetes scheduler by: + * 1. Maintaining real-time count of pending Pods via Watch API (no polling) + * 2. Using token bucket rate limiter to pace submissions + * 3. Capping concurrent pending Pods (concurrency window) + * 4. Self-adapting to observed cluster scheduling throughput + * + * Usage: + * const controller = new K8sAdmissionController(kubeconfig, namespace); + * await controller.initialize(); + * await controller.maybeSubmit(async () => { + * await k8sApi.createNamespacedJob(namespace, jobYaml); + * }); + */ + +const k8s = require('@kubernetes/client-node'); + +class K8sAdmissionController { + constructor(kubeconfig, namespace, options = {}) { + this.kubeconfig = kubeconfig; + this.namespace = namespace; + + // Label selector to identify HyperFlow pods + this.selector = options.selector || 'app=hyperflow'; + + // Configuration (can be overridden via options or env vars) + this.config = { + // Concurrency window: max pending Pods allowed + pendingMax: options.pendingMax || + parseInt(process.env.HF_VAR_ADMISSION_PENDING_MAX) || + 200, + + // Token bucket: initial fill rate (tokens/sec) + initialFillRate: options.initialFillRate || + parseFloat(process.env.HF_VAR_ADMISSION_FILL_RATE) || + 1, + + // Token bucket: max burst capacity + burst: options.burst || + parseInt(process.env.HF_VAR_ADMISSION_BURST) || + 20, + + // Enable adaptive tuning of pendingMax and fillRate + adaptive: options.adaptive !== undefined ? options.adaptive : + process.env.HF_VAR_ADMISSION_ADAPTIVE !== '0', + + // Minimum and maximum bounds for adaptive pendingMax + minPendingMax: options.minPendingMax || 50, + maxPendingMax: options.maxPendingMax || 2000, + + // EWMA alpha values (higher = more reactive to recent changes) + runningRateAlpha: options.runningRateAlpha || 0.2, + errorRateAlpha: options.errorRateAlpha || 0.3, + + // Backoff parameters for API errors + backoffInitialMs: options.backoffInitialMs || 250, + backoffMaxMs: options.backoffMaxMs || 4000, + + // Adaptive tuning interval (ms) + adaptIntervalMs: options.adaptIntervalMs || 1000, + + // Enable debug logging + debug: options.debug || process.env.HF_VAR_ADMISSION_DEBUG === '1' + }; + + // State tracking (updated from watch events) + this.state = { + pendingCount: 0, // Current number of pending Pods + runningRateEWMA: 1, // EWMA of scheduling throughput (Pods/sec) + createErrorEWMA: 0, // EWMA of API error rate + podPhases: new Map(), // Pod name -> current phase (for transition detection) + + // For time-aware EWMA calculation + runningTransitions: 0, // Count of Pending->Running transitions since last adapt + lastAdaptTime: Date.now(), // Timestamp of last adapt() call + + // Hysteresis for pendingMax (prevent rapid decreases during temporary slowdowns) + lastPendingMaxDecrease: Date.now() // Timestamp of last pendingMax decrease + }; + + // Throttle gate-blocked messages (log at most once per N seconds) + this.lastGate1LogTime = 0; + this.lastGate2LogTime = 0; + this.gateLogThrottleMs = 5000; // Log gate blocks at most once per 5 seconds + + // Token bucket state + this.tokens = 0; + this.lastRefill = Date.now() / 1000; + this.fillRate = this.config.initialFillRate; + + // Per-submitter backoff state + this.backoffMs = this.config.backoffInitialMs; + + // Watch-related state + this.watch = null; + this.watchAbortController = null; + this.initialized = false; + this.adaptInterval = null; + } + + /** + * Initialize the admission controller: + * - Do initial LIST to seed pendingCount + * - Start WATCH to track Pod state changes + * - Start adaptive tuning loop + */ + async initialize() { + if (this.initialized) { + console.log('[AdmissionController] Already initialized'); + return; + } + + this.log('Initializing admission controller...'); + this.log(`Config: pendingMax=${this.config.pendingMax}, fillRate=${this.fillRate}, burst=${this.config.burst}, adaptive=${this.config.adaptive}`); + + const coreApi = this.kubeconfig.makeApiClient(k8s.CoreV1Api); + + try { + // Initial LIST to seed state + const listResponse = await coreApi.listNamespacedPod( + this.namespace, + undefined, // pretty + undefined, // allowWatchBookmarks + undefined, // continue + undefined, // fieldSelector + this.selector // labelSelector + ); + + // Count initial pending Pods + for (const pod of listResponse.body.items) { + const phase = pod.status?.phase; + const podName = pod.metadata.name; + + this.state.podPhases.set(podName, phase); + if (phase === 'Pending') { + this.state.pendingCount++; + } + } + + this.log(`Initial state: ${this.state.pendingCount} pending Pods (${listResponse.body.items.length} total)`); + + // Start watch + await this._startWatch(); + + // Start adaptive tuning loop + this.adaptInterval = setInterval(() => { + this._refillTokens(); + if (this.config.adaptive) { + this._adapt(); + } + }, this.config.adaptIntervalMs); + + this.initialized = true; + this.log('Admission controller initialized successfully'); + } catch (err) { + console.error('[AdmissionController] Initialization failed:', err.message); + throw err; + } + } + + /** + * Start watching Pod events + */ + async _startWatch() { + this.watchAbortController = new AbortController(); + this.watch = new k8s.Watch(this.kubeconfig); + + const path = `/api/v1/namespaces/${this.namespace}/pods`; + const queryParams = { labelSelector: this.selector }; + + this.log(`Starting watch on ${path}?labelSelector=${this.selector}`); + + try { + await this.watch.watch( + path, + queryParams, + this._handleWatchEvent.bind(this), + this._handleWatchError.bind(this), + this.watchAbortController.signal + ); + } catch (err) { + console.error('[AdmissionController] Watch failed to start:', err.message); + throw err; + } + } + + /** + * Handle individual watch events (ADDED, MODIFIED, DELETED) + */ + _handleWatchEvent(phase, obj) { + const podName = obj?.metadata?.name; + const currentPhase = obj?.status?.phase; + + if (!podName) { + return; // Invalid event + } + + const previousPhase = this.state.podPhases.get(podName); + + switch (phase) { + case 'ADDED': + this.state.podPhases.set(podName, currentPhase); + if (currentPhase === 'Pending') { + this.state.pendingCount++; + this.log(`Pod ${podName} added (Pending), total pending: ${this.state.pendingCount}`); + } + break; + + case 'MODIFIED': + // Detect phase transitions + if (previousPhase !== currentPhase) { + this.log(`Pod ${podName} transition: ${previousPhase} -> ${currentPhase}`); + + // Track Pending -> Running transitions for throughput measurement + if (previousPhase === 'Pending' && currentPhase === 'Running') { + this.state.pendingCount = Math.max(0, this.state.pendingCount - 1); + this.state.runningTransitions++; + this.log(`Pod ${podName} started running, pending: ${this.state.pendingCount}, transitions: ${this.state.runningTransitions}`); + } + + // Handle other transitions away from Pending (e.g., Failed, Unknown) + if (previousPhase === 'Pending' && currentPhase !== 'Pending') { + this.state.pendingCount = Math.max(0, this.state.pendingCount - 1); + } + + // Handle transitions into Pending (rare but possible) + if (previousPhase !== 'Pending' && currentPhase === 'Pending') { + this.state.pendingCount++; + } + + this.state.podPhases.set(podName, currentPhase); + } + break; + + case 'DELETED': + // Remove from tracking + if (previousPhase === 'Pending') { + this.state.pendingCount = Math.max(0, this.state.pendingCount - 1); + this.log(`Pending pod ${podName} deleted, pending: ${this.state.pendingCount}`); + } + this.state.podPhases.delete(podName); + break; + + case 'BOOKMARK': + // Bookmark events are for watch reliability, no action needed + break; + + default: + this.log(`Unknown watch event type: ${phase}`); + } + } + + /** + * Handle watch errors and reconnect + */ + _handleWatchError(err) { + if (err) { + console.error('[AdmissionController] Watch error:', err.message); + } else { + this.log('Watch stream ended normally'); + } + + // Reconnect after 1 second + this.log('Reconnecting watch in 1 second...'); + setTimeout(() => { + this._startWatch().catch(err => { + console.error('[AdmissionController] Watch reconnection failed:', err.message); + // Will retry again via the error handler + this._handleWatchError(err); + }); + }, 1000); + } + + /** + * Refill token bucket based on elapsed time + */ + _refillTokens() { + const now = Date.now() / 1000; + const elapsed = now - this.lastRefill; + const newTokens = elapsed * this.fillRate; + + this.tokens = Math.min(this.config.burst, this.tokens + newTokens); + this.lastRefill = now; + } + + /** + * Adaptive tuning: adjust fillRate and pendingMax based on observed metrics + */ + _adapt() { + const now = Date.now(); + const elapsedSec = (now - this.state.lastAdaptTime) / 1000; + + // Calculate current scheduling throughput (Pods/sec) + const currentRate = elapsedSec > 0 ? this.state.runningTransitions / elapsedSec : 0; + + // Update EWMA of scheduling throughput + const alpha = this.config.runningRateAlpha; + this.state.runningRateEWMA = alpha * currentRate + (1 - alpha) * this.state.runningRateEWMA; + + // Reset counters for next interval + this.state.runningTransitions = 0; + this.state.lastAdaptTime = now; + + // Adapt fillRate: increase if throughput is good, decrease if errors + // If we're successfully scheduling pods, we can increase submission rate + // Target 1.2x observed throughput to stay ahead of the scheduler + const targetRate = Math.max(1.0, 1.2 * this.state.runningRateEWMA); + + // Reduce fillRate if we're seeing API errors + const errorPenalty = Math.max(0.5, 1 - 2 * this.state.createErrorEWMA); + + // Keep fillRate within reasonable bounds + const configuredRate = parseFloat(process.env.HF_VAR_ADMISSION_FILL_RATE) || this.fillRate; + this.fillRate = Math.max(1.0, Math.min(configuredRate * 2, targetRate * errorPenalty)); + + // Adapt pendingMax with hysteresis to prevent under-utilization during temporary slowdowns + // Calculate target pendingMax based on current throughput + const minuteBuffer = Math.round(60 * Math.max(0.5, this.state.runningRateEWMA)); + const targetPendingMax = Math.min( + this.config.maxPendingMax, + Math.max(this.config.minPendingMax, minuteBuffer) + ); + + // Hysteresis: increase immediately, decrease only after sustained slowdown (2 minutes) + const hysteresisWindowMs = 120000; // 2 minutes + const timeSinceLastDecrease = now - this.state.lastPendingMaxDecrease; + + if (targetPendingMax > this.config.pendingMax) { + // INCREASE: immediate (aggressive) + this.config.pendingMax = targetPendingMax; + } else if (targetPendingMax < this.config.pendingMax) { + // DECREASE: only if slowdown persists > 2 minutes (conservative) + if (timeSinceLastDecrease >= hysteresisWindowMs) { + this.config.pendingMax = targetPendingMax; + this.state.lastPendingMaxDecrease = now; + } + // Otherwise: keep current pendingMax (ignore temporary dip) + } + + this.log(`Adapt: rate=${currentRate.toFixed(2)} pods/s, EWMA=${this.state.runningRateEWMA.toFixed(2)}, fillRate=${this.fillRate.toFixed(2)}, pendingMax=${this.config.pendingMax}, errors=${this.state.createErrorEWMA.toFixed(3)}`); + } + + /** + * Acquire a permit to create a Pod (waits for both gates to allow) + * Returns immediately after acquiring permit - does NOT execute Pod creation + * Use this for fire-and-forget pattern to preserve parallelism + */ + async acquirePermit() { + if (!this.initialized) { + throw new Error('AdmissionController not initialized. Call initialize() first.'); + } + + while (true) { + // Gate 1: Pending window + if (this.state.pendingCount >= this.config.pendingMax) { + const delay = this._jitter(150, 400); + const now = Date.now(); + if (now - this.lastGate1LogTime >= this.gateLogThrottleMs) { + this.log(`Gate 1 blocked: pending (${this.state.pendingCount}) >= max (${this.config.pendingMax}), waiting ${delay}ms`); + this.lastGate1LogTime = now; + } + await this._sleep(delay); + continue; + } + + // Gate 2: Token bucket + this._refillTokens(); + if (this.tokens < 1) { + const delay = this._jitter(50, 150); + const now = Date.now(); + if (now - this.lastGate2LogTime >= this.gateLogThrottleMs) { + this.log(`Gate 2 blocked: no tokens (${this.tokens.toFixed(2)}), waiting ${delay}ms`); + this.lastGate2LogTime = now; + } + await this._sleep(delay); + continue; + } + + // Both gates passed, consume token and return permit + this.tokens -= 1; + + // Optimistically increment pending count (watch will correct if needed) + this.state.pendingCount++; + + this.log(`Permit acquired (pending: ${this.state.pendingCount}, tokens: ${this.tokens.toFixed(2)})`); + return; // Permit granted + } + } + + /** + * Release a permit (call if Pod creation failed and you want to return the token) + * This is optional - primarily for error handling in fire-and-forget scenarios + */ + releasePermit() { + this.tokens = Math.min(this.config.burst, this.tokens + 1); + this.state.pendingCount = Math.max(0, this.state.pendingCount - 1); + this.log(`Permit released (pending: ${this.state.pendingCount}, tokens: ${this.tokens.toFixed(2)})`); + } + + /** + * Record an error for adaptive tuning (updates error EWMA) + * Call this if Pod creation failed after acquiring a permit + */ + recordError() { + this.state.createErrorEWMA = this.config.errorRateAlpha * 1 + + (1 - this.config.errorRateAlpha) * this.state.createErrorEWMA; + } + + /** + * Gate function: wait until both gates (pending window + token bucket) allow submission + * Then execute the provided Pod creation function + * + * This is the all-in-one method that handles permit acquisition, execution, and error handling. + * For fire-and-forget pattern, use acquirePermit() instead. + */ + async maybeSubmit(createPodFn) { + if (!this.initialized) { + throw new Error('AdmissionController not initialized. Call initialize() first.'); + } + + while (true) { + // Acquire permit (waits for gates) + await this.acquirePermit(); + + try { + this.log(`Submitting Pod (pending: ${this.state.pendingCount}, tokens: ${this.tokens.toFixed(2)})`); + await createPodFn(); + + // Reset backoff on success + this.backoffMs = this.config.backoffInitialMs; + + return; // Success + + } catch (err) { + // Return the permit + this.releasePermit(); + + // Update error rate EWMA + this.recordError(); + + // Check if it's a retryable error + const statusCode = err.response?.statusCode || err.statusCode; + const isRetryable = statusCode === 409 || statusCode === 429 || statusCode >= 500; + + if (isRetryable) { + // Exponential backoff with jitter + const delay = this._jitter(this.backoffMs, this.backoffMs * 2); + console.error(`[AdmissionController] Pod creation failed (${statusCode}), retrying after ${delay}ms:`, err.message); + + this.backoffMs = Math.min(this.backoffMs * 2, this.config.backoffMaxMs); + await this._sleep(delay); + continue; // Retry (will acquire new permit) + } else { + // Non-retryable error, propagate + console.error('[AdmissionController] Pod creation failed with non-retryable error:', err.message); + throw err; + } + } + } + } + + /** + * Shutdown the admission controller + */ + async shutdown() { + this.log('Shutting down admission controller...'); + + if (this.adaptInterval) { + clearInterval(this.adaptInterval); + this.adaptInterval = null; + } + + if (this.watchAbortController) { + this.watchAbortController.abort(); + this.watchAbortController = null; + } + + this.initialized = false; + this.log('Admission controller shut down'); + } + + /** + * Get current state (for monitoring/debugging) + */ + getState() { + return { + pendingCount: this.state.pendingCount, + runningRateEWMA: this.state.runningRateEWMA, + createErrorEWMA: this.state.createErrorEWMA, + fillRate: this.fillRate, + tokens: this.tokens, + pendingMax: this.config.pendingMax, + backoffMs: this.backoffMs + }; + } + + // Utility functions + _sleep(ms) { + return new Promise(resolve => setTimeout(resolve, ms)); + } + + _jitter(minMs, maxMs) { + return Math.floor(minMs + Math.random() * (maxMs - minMs)); + } + + log(message) { + if (this.config.debug) { + console.log(`[AdmissionController] ${message}`); + } + } +} + +module.exports = { K8sAdmissionController }; diff --git a/functions/kubernetes/k8sJobSubmit.js b/functions/kubernetes/k8sJobSubmit.js index 1a15aeb..14dca34 100644 --- a/functions/kubernetes/k8sJobSubmit.js +++ b/functions/kubernetes/k8sJobSubmit.js @@ -1,201 +1,245 @@ -const k8s = require('@kubernetes/client-node'); -var fs = require('fs'); -const yaml = require('js-yaml'); -const createJobMessage = require('../../common/jobMessage').createJobMessage; - -// k8sJobSubmit.js -// Common functions for job submission to Kubernetes clusters - -// Function createK8sJobMessage - creates the job message -// -// Inputs: -// - 'job': job definition object; it should contain: -// * 'executable' and 'args' (usually passed via 'context.executor') -// * 'ins' and 'outs' (arrays, as passed to the process function) -// * 'name': job class name (not unique id), usually passed via 'context.name' -// - 'taskId': unique task identifier (use 'context.taskId' or define custom) -// - 'context': pass the 'context' parameter of the process function; -// it should contain the 'redis_url' -// -// Returns: -// - jobMessage: string with job command to be sent to a remote executor -function createK8sJobMessage(job, taskId, context) { - let jobMessage = { - "executable": job.executable, - "args": [].concat(job.args), - // "env": job.env || {}, - "inputs": job.ins.map(i => i), - "outputs": job.outs.map(o => o), - "stdout": job.stdout, // if present, denotes file name to which stdout should be redirected - "stderr": job.stderr, // if present, denotes file name to which stderr should be redirected - "stdoutAppend": job.stdoutAppend, // redirect stdout in append mode - "stderrAppend": job.stderrAppend, // redirect stderr in append mode - "redis_url": context.redis_url, - "taskId": taskId, - "name": job.name - } - return jobMessage; -} - -// Function createK8sJobYaml -// -// Inputs: -// - 'job': job definition object; it should contain: -// * 'name': job class name (not unique id), usually passed via 'context.name' -// - 'taskIds': array of unique tasks' identifiers (use '[context.taskId]' or define custom) -// - 'context': pass the 'context' parameter of the process function; -// it should contain the 'redis_url', 'hfId', 'appId' -// - 'jobYamlTemplate': job YAML that may contain variables '{{varname}}' -// - 'customParams': JSON object that defines values for variables in the -// job's YAML template. -// -// Returns: -// - jobYaml: string with job YAML to create the k8s job -var createK8sJobYaml = (job, taskIds, context, jobYamlTemplate, customParams) => { - let quotedTaskIds = taskIds.map(x => '"' + x + '"'); - var command = 'hflow-job-execute ' + context.redis_url + ' -a -- ' + quotedTaskIds.join(' '); - var containerName = job.image || process.env.HF_VAR_WORKER_CONTAINER; - var volumePath = '/work_dir'; - var jobName = Math.random().toString(36).substring(7) + '-' + - job.name.replace(/_/g, '-') + "-" + context.procId + '-' + context.firingId; - var workingDirPath = context.workdir; - - // remove chars not allowd in Pod names - jobName = jobName.replace(/[^0-9a-z-]/gi, '').toLowerCase(); - - var cpuRequest = job.cpuRequest || process.env.HF_VAR_CPU_REQUEST || "0.5"; - var memRequest = job.memRequest || process.env.HF_VAR_MEM_REQUEST || "50Mi"; - - // Restart policy -- enable if "HF_VAR_BACKOFF_LIMIT" (number of retries) is defined - var backoffLimit = process.env.HF_VAR_BACKOFF_LIMIT || 0; - var restartPolicy = backoffLimit > 0 ? "OnFailure": "Never"; - - var restartCount = 0; - - // use string replacement (instead of eval) to evaluate job template - // 'params' should contain values for variables to be replaced in job template yaml - var params = { - command: command, containerName: containerName, - jobName: jobName, volumePath: volumePath, - cpuRequest: cpuRequest, memRequest: memRequest, - restartPolicy: restartPolicy, backoffLimit: backoffLimit, - experimentId: context.hfId + ":" + context.appId, - workflowName: context.wfname, taskName: job.name, - appId: context.appId, workingDirPath: workingDirPath - } - - // Add/override custom parameters for the job - Object.keys(customParams).forEach(function(key) { - params[key] = customParams[key]; - }); - - // args[v] will evaluate to 'undefined' if 'v' doesn't exist - var interpolate = (tpl, args) => tpl.replace(/\${(\w+)}/g, (_, v) => args[v]); - var jobYaml = yaml.safeLoad(interpolate(jobYamlTemplate, params)); - - return jobYaml; -} - -// Function submitK8sJob -// Submits a job to a Kubernetes cluster and awaits for its completion -// -// Inputs: -// - see inputs to 'createK8sJobYaml' and 'createK8sJobMessage' -// - restartFn - function that will be called in case of failed job -// -// -// Returns: job exit code -var submitK8sJob = async(kubeconfig, jobArr, taskIdArr, contextArr, customParams) => { - - // Load definition of the the worker job pod - // File 'job-template.yaml' should be provided externally during deployment - var jobTemplatePath = customParams.jobTemplatePath || process.env.HF_VAR_JOB_TEMPLATE_PATH || "./job-template.yaml"; - var jobYamlTemplate = fs.readFileSync(jobTemplatePath, 'utf8'); - //var job = yaml.safeLoad(eval('`'+jobYaml+'`')); // this works, but eval unsafe - - let context = contextArr[0]; - - // CAUTION: When creating job YAML first job details (requests, container) are used. - var jobYaml = createK8sJobYaml(jobArr[0], taskIdArr, contextArr[0], jobYamlTemplate, customParams); - let jobMessages = []; - for (var i=0; i 0); - } - - var namespace = customParams.k8sNamespace || process.env.HF_VAR_NAMESPACE || 'default'; - - let taskStart = new Date().toISOString(); - console.log("Starting tasks", taskIdArr, 'time=' + taskStart); - - const k8sApi = kubeconfig.makeApiClient(k8s.BatchV1Api); - - // Create the job via the Kubernetes API. We implement a simple retry logic - // in case the API is overloaded and returns HTTP 429 (Too many requests). - var createJob = function(attempt) { - try { - k8sApi.createNamespacedJob(namespace, jobYaml).then( - (response) => { - }, - (err) => { - try { - var statusCode = err.response.statusCode; - } catch(e) { - // We didn't get a response, probably connection error - throw(err); - } - switch(statusCode) { - // if we get 409 or 429 ==> wait and retry - case 409: // 'Conflict' -- "Operation cannot be fulfilled on reourcequotas"; bug in k8s? - case 429: // 'Too many requests' -- API overloaded - // Calculate delay: default 1s, for '429' we should get it in the 'retry-after' header - let delay = Number(err.response.headers['retry-after'] || 1)*1000; - console.log("Create k8s job", taskIdArr, "HTTP error " + statusCode + " (attempt " + attempt + - "), retrying after " + delay + "ms." ); - setTimeout(() => createJob(attempt+1), delay); - break; - default: - console.error("Err"); - console.error(err); - console.error(job); - let taskEnd = new Date().toISOString(); - console.log("Task ended with error, time=", taskEnd); - } - } - ); - } catch (e) { - console.error(e); - } - } - createJob(1); - - let sendJobMessagesPromises = []; - for (var i=0; i i), + "outputs": job.outs.map(o => o), + "stdout": job.stdout, // if present, denotes file name to which stdout should be redirected + "stderr": job.stderr, // if present, denotes file name to which stderr should be redirected + "stdoutAppend": job.stdoutAppend, // redirect stdout in append mode + "stderrAppend": job.stderrAppend, // redirect stderr in append mode + "redis_url": context.redis_url, + "taskId": taskId, + "name": job.name + } + return jobMessage; +} + +// Function createK8sJobYaml +// +// Inputs: +// - 'job': job definition object; it should contain: +// * 'name': job class name (not unique id), usually passed via 'context.name' +// - 'taskIds': array of unique tasks' identifiers (use '[context.taskId]' or define custom) +// - 'context': pass the 'context' parameter of the process function; +// it should contain the 'redis_url', 'hfId', 'appId' +// - 'jobYamlTemplate': job YAML that may contain variables '{{varname}}' +// - 'customParams': JSON object that defines values for variables in the +// job's YAML template. +// +// Returns: +// - jobYaml: string with job YAML to create the k8s job +var createK8sJobYaml = (job, taskIds, context, jobYamlTemplate, customParams) => { + let quotedTaskIds = taskIds.map(x => '"' + x + '"'); + var command = 'hflow-job-execute ' + context.redis_url + ' -a -- ' + quotedTaskIds.join(' '); + var containerName = job.image || process.env.HF_VAR_WORKER_CONTAINER; + var volumePath = '/work_dir'; + var jobName = Math.random().toString(36).substring(7) + '-' + + job.name.replace(/_/g, '-') + "-" + context.procId + '-' + context.firingId; + var workingDirPath = context.workdir; + + // remove chars not allowd in Pod names + jobName = jobName.replace(/[^0-9a-z-]/gi, '').toLowerCase(); + + var cpuRequest = job.cpuRequest || process.env.HF_VAR_CPU_REQUEST || "0.5"; + var memRequest = job.memRequest || process.env.HF_VAR_MEM_REQUEST || "50Mi"; + + // Restart policy -- enable if "HF_VAR_BACKOFF_LIMIT" (number of retries) is defined + var backoffLimit = process.env.HF_VAR_BACKOFF_LIMIT || 0; + var restartPolicy = backoffLimit > 0 ? "OnFailure": "Never"; + + var restartCount = 0; + + // use string replacement (instead of eval) to evaluate job template + // 'params' should contain values for variables to be replaced in job template yaml + var params = { + command: command, containerName: containerName, + jobName: jobName, volumePath: volumePath, + cpuRequest: cpuRequest, memRequest: memRequest, + restartPolicy: restartPolicy, backoffLimit: backoffLimit, + experimentId: context.hfId + ":" + context.appId, + workflowName: context.wfname, taskName: job.name, + appId: context.appId, workingDirPath: workingDirPath + } + + // Add/override custom parameters for the job + Object.keys(customParams).forEach(function(key) { + params[key] = customParams[key]; + }); + + // args[v] will evaluate to 'undefined' if 'v' doesn't exist + var interpolate = (tpl, args) => tpl.replace(/\${(\w+)}/g, (_, v) => args[v]); + var jobYaml = yaml.safeLoad(interpolate(jobYamlTemplate, params)); + + return jobYaml; +} + +// Function submitK8sJob +// Submits a job to a Kubernetes cluster and awaits for its completion +// +// Inputs: +// - see inputs to 'createK8sJobYaml' and 'createK8sJobMessage' +// - restartFn - function that will be called in case of failed job +// +// +// Returns: job exit code +var submitK8sJob = async(kubeconfig, jobArr, taskIdArr, contextArr, customParams) => { + + // Load definition of the the worker job pod + // File 'job-template.yaml' should be provided externally during deployment + var jobTemplatePath = customParams.jobTemplatePath || process.env.HF_VAR_JOB_TEMPLATE_PATH || "./job-template.yaml"; + var jobYamlTemplate = fs.readFileSync(jobTemplatePath, 'utf8'); + //var job = yaml.safeLoad(eval('`'+jobYaml+'`')); // this works, but eval unsafe + + let context = contextArr[0]; + + // CAUTION: When creating job YAML first job details (requests, container) are used. + var jobYaml = createK8sJobYaml(jobArr[0], taskIdArr, contextArr[0], jobYamlTemplate, customParams); + let jobMessages = []; + for (var i=0; i 0); + } + + var namespace = customParams.k8sNamespace || process.env.HF_VAR_NAMESPACE || 'default'; + + let taskStart = new Date().toISOString(); + console.log("Starting tasks", taskIdArr, 'time=' + taskStart); + + const k8sApi = kubeconfig.makeApiClient(k8s.BatchV1Api); + + // Check if admission controller is enabled (check at runtime, not module load time) + const admissionControllerEnabled = process.env.HF_VAR_ADMISSION_CONTROLLER === '1'; + + // Initialize admission controller if enabled and not already initializing/initialized + if (admissionControllerEnabled) { + if (!admissionControllerInitPromise) { + // First caller: start initialization + console.log("Enabling k8s admission controller..."); + admissionControllerInitPromise = (async () => { + admissionController = new K8sAdmissionController(kubeconfig, namespace); + await admissionController.initialize(); + console.log("k8s admission controller initialized successfully"); + })(); + } + // Wait for initialization to complete (whether we started it or someone else did) + await admissionControllerInitPromise; + } + + // Create the job via the Kubernetes API + // If admission controller is enabled, acquire permit first (rate limiting), + // then fire-and-forget the Pod creation. This preserves parallelism with + // message sending while still protecting the scheduler. + if (admissionControllerEnabled && admissionController) { + // NEW BEHAVIOR: Acquire permit (waits for rate limiting), then fire-and-forget + await admissionController.acquirePermit(); + + // Fire-and-forget Pod creation (happens in parallel with message sending below) + k8sApi.createNamespacedJob(namespace, jobYaml).then( + (response) => { + // Success (fire-and-forget, no further action needed) + }, + (err) => { + // On error, record it for adaptive tuning (but don't block) + admissionController.recordError(); + console.error("Pod creation error after permit acquired:", err.message || err); + } + ); + } else { + // ORIGINAL BEHAVIOR: Fire-and-forget with simple retry (does NOT wait) + var createJob = function(attempt) { + try { + k8sApi.createNamespacedJob(namespace, jobYaml).then( + (response) => { + // Success (fire-and-forget, no further action) + }, + (err) => { + try { + var statusCode = err.response.statusCode; + } catch(e) { + // We didn't get a response, probably connection error + throw(err); + } + switch(statusCode) { + // if we get 409 or 429 ==> wait and retry + case 409: // 'Conflict' -- "Operation cannot be fulfilled on resourcequotas"; bug in k8s? + case 429: // 'Too many requests' -- API overloaded + // Calculate delay: default 1s, for '429' we should get it in the 'retry-after' header + let delay = Number(err.response.headers['retry-after'] || 1)*1000; + console.log("Create k8s job", taskIdArr, "HTTP error " + statusCode + " (attempt " + attempt + + "), retrying after " + delay + "ms." ); + setTimeout(() => createJob(attempt+1), delay); + break; + default: + console.error("Err"); + console.error(err); + console.error("Job YAML:", jobYaml); + let taskEnd = new Date().toISOString(); + console.log("Task ended with error, time=", taskEnd); + } + } + ); + } catch (e) { + console.error(e); + } + }; + createJob(1); // Fire-and-forget: returns immediately, does NOT wait + } + + let sendJobMessagesPromises = []; + for (var i=0; i=18.0.0" }, "bin": { "hflow": "./bin/hflow.js",