Skip to content

Commit 50f76b0

Browse files
committed
Merge remote-tracking branch 'origin/fix/worker-scaling' into test/1.51.0
2 parents b13be06 + 81f354c commit 50f76b0

File tree

9 files changed

+328
-17
lines changed

9 files changed

+328
-17
lines changed

shared/packages/api/src/logger.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,9 @@ export function setupLogger(
101101
})
102102
if (initialLogLevel) setLogLevel(initialLogLevel, true)
103103

104-
logger.info('Logging to', logPath)
104+
if (handleProcess) {
105+
logger.info(`Logging to "${logPath}"`)
106+
}
105107
} else {
106108
const transportConsole = new Winston.transports.Console({
107109
level: logLevel,

shared/packages/expectationManager/package.json

+3
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
"engines": {
1313
"node": ">=14.18.0"
1414
},
15+
"devDependencies": {
16+
"type-fest": "3.13.1"
17+
},
1518
"dependencies": {
1619
"@sofie-package-manager/api": "1.50.0",
1720
"@sofie-package-manager/worker": "1.50.0",

shared/packages/expectationManager/src/evaluationRunner/evaluateExpectationStates/ready.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ export async function evaluateExpectationStateReady({
8383
}
8484
} else {
8585
// No worker is available at the moment.
86-
// Check if anough time has passed if it makes sense to check for new workers again:
86+
// Check if enough time has passed if it makes sense to check for new workers again:
8787

8888
if (
8989
trackedExp.noWorkerAssignedTime &&
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
import { PartialDeep } from 'type-fest'
2+
import {
3+
ExpectationId,
4+
LogLevel,
5+
ProcessConfig,
6+
WorkerAgentId,
7+
initializeLogger,
8+
literal,
9+
protectString,
10+
setupLogger,
11+
} from '@sofie-package-manager/api'
12+
import { WorkerScaler } from '../workerScaler'
13+
import { InternalManager } from '../../../internalManager/internalManager'
14+
import { TrackedWorkerAgent } from '../../../internalManager/lib/trackedWorkerAgents'
15+
import { ExpectationTracker } from '../../expectationTracker'
16+
import { TrackedExpectation } from '../../../lib/trackedExpectation'
17+
import { ExpectedPackageStatusAPI } from '@sofie-automation/shared-lib/dist/package-manager/package'
18+
19+
// ---------------------------------------------------------
20+
const SCALE_UP_COUNT = 1
21+
const SCALE_UP_TIME = 10
22+
let isThereExistingWorkers = false
23+
// ---------------------------------------------------------
24+
25+
const logLevel = LogLevel.WARN
26+
const config = {
27+
process: literal<ProcessConfig>({
28+
logPath: undefined,
29+
logLevel: undefined,
30+
unsafeSSL: false,
31+
certificates: [],
32+
}),
33+
}
34+
initializeLogger(config)
35+
const logger = setupLogger(config, '', undefined, undefined, logLevel)
36+
logger.warn = jest.fn(logger.warn) as any
37+
logger.error = jest.fn(logger.error) as any
38+
39+
const requestResourcesForExpectation = jest.fn(async () => false)
40+
41+
const fakeManager = literal<PartialDeep<InternalManager>>({
42+
workforceConnection: {
43+
workforceAPI: {
44+
requestResourcesForExpectation,
45+
},
46+
},
47+
workerAgents: {
48+
list: (): { workerId: WorkerAgentId; workerAgent: TrackedWorkerAgent }[] => {
49+
if (isThereExistingWorkers)
50+
return [
51+
{
52+
workerId: protectString('worker0'),
53+
workerAgent: {} as any as TrackedWorkerAgent,
54+
},
55+
]
56+
else return []
57+
},
58+
},
59+
}) as any as InternalManager
60+
61+
const fakeTracker = literal<PartialDeep<ExpectationTracker>>({
62+
constants: {
63+
SCALE_UP_COUNT,
64+
SCALE_UP_TIME,
65+
},
66+
trackedExpectations: {
67+
list: (): TrackedExpectation[] => {
68+
return expectations
69+
},
70+
},
71+
trackedExpectationAPI: {
72+
isExpectationWaitingForOther: (_exp): TrackedExpectation | null => {
73+
return null
74+
},
75+
},
76+
getTrackedPackageContainers: () => {
77+
return []
78+
},
79+
}) as any as ExpectationTracker
80+
let expectations: TrackedExpectation[] = []
81+
function setExpectations(
82+
from: {
83+
id: string
84+
state: ExpectedPackageStatusAPI.WorkStatusState
85+
hasAvailableWorkers: boolean
86+
noWorkerAssignedTime?: number
87+
}[]
88+
) {
89+
expectations = Array.from(from).map((e): TrackedExpectation => {
90+
return literal<PartialDeep<TrackedExpectation>>({
91+
id: protectString<ExpectationId>(e.id),
92+
state: e.state,
93+
noWorkerAssignedTime: e.noWorkerAssignedTime ?? null,
94+
availableWorkers: new Set<WorkerAgentId>(e.hasAvailableWorkers ? [protectString('worker0')] : []),
95+
96+
exp: {
97+
statusReport: {
98+
label: `mock${e.id}`,
99+
},
100+
},
101+
}) as any as TrackedExpectation
102+
})
103+
}
104+
105+
beforeEach(() => {
106+
isThereExistingWorkers = false
107+
expectations = []
108+
109+
requestResourcesForExpectation.mockClear()
110+
})
111+
afterEach(() => {
112+
expect(logger.warn).toHaveBeenCalledTimes(0)
113+
expect(logger.error).toHaveBeenCalledTimes(0)
114+
})
115+
116+
test('no expectations', async () => {
117+
const scaler = new WorkerScaler(logger, fakeManager, fakeTracker)
118+
await scaler.checkIfNeedToScaleUp()
119+
120+
expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(0)
121+
})
122+
test('1 fulfilled expectation', async () => {
123+
const scaler = new WorkerScaler(logger, fakeManager, fakeTracker)
124+
125+
isThereExistingWorkers = false
126+
setExpectations([
127+
{
128+
id: 'exp0',
129+
state: ExpectedPackageStatusAPI.WorkStatusState.FULFILLED,
130+
hasAvailableWorkers: true,
131+
},
132+
])
133+
134+
await scaler.checkIfNeedToScaleUp()
135+
expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(0)
136+
})
137+
138+
test('1 waiting expectation, no workers', async () => {
139+
const scaler = new WorkerScaler(logger, fakeManager, fakeTracker)
140+
141+
isThereExistingWorkers = false
142+
setExpectations([
143+
{
144+
id: 'exp0',
145+
state: ExpectedPackageStatusAPI.WorkStatusState.WAITING,
146+
hasAvailableWorkers: false,
147+
},
148+
])
149+
150+
await scaler.checkIfNeedToScaleUp()
151+
expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(1)
152+
})
153+
154+
test('1 waiting expectation', async () => {
155+
const scaler = new WorkerScaler(logger, fakeManager, fakeTracker)
156+
157+
isThereExistingWorkers = true
158+
setExpectations([
159+
{
160+
id: 'exp0',
161+
state: ExpectedPackageStatusAPI.WorkStatusState.WAITING,
162+
hasAvailableWorkers: true,
163+
},
164+
])
165+
166+
await scaler.checkIfNeedToScaleUp()
167+
await sleep(SCALE_UP_TIME * 2)
168+
await scaler.checkIfNeedToScaleUp()
169+
expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(1)
170+
})
171+
test('1 expectation, not assigned to worker', async () => {
172+
const scaler = new WorkerScaler(logger, fakeManager, fakeTracker)
173+
174+
isThereExistingWorkers = true
175+
setExpectations([
176+
{
177+
id: 'exp0',
178+
state: ExpectedPackageStatusAPI.WorkStatusState.FULFILLED,
179+
noWorkerAssignedTime: Date.now() - 1000,
180+
hasAvailableWorkers: true,
181+
},
182+
])
183+
184+
await scaler.checkIfNeedToScaleUp()
185+
await sleep(SCALE_UP_TIME * 2)
186+
await scaler.checkIfNeedToScaleUp()
187+
expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(1)
188+
})
189+
test('1 expectation, no available workers', async () => {
190+
const scaler = new WorkerScaler(logger, fakeManager, fakeTracker)
191+
192+
isThereExistingWorkers = true
193+
setExpectations([
194+
{
195+
id: 'exp0',
196+
state: ExpectedPackageStatusAPI.WorkStatusState.NEW,
197+
// noWorkerAssignedTime: Date.now() - 1000,
198+
hasAvailableWorkers: false,
199+
},
200+
])
201+
202+
await scaler.checkIfNeedToScaleUp()
203+
await sleep(SCALE_UP_TIME * 2)
204+
await scaler.checkIfNeedToScaleUp()
205+
expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(1)
206+
})
207+
208+
function sleep(ms: number) {
209+
return new Promise((resolve) => setTimeout(resolve, ms))
210+
}

shared/packages/expectationManager/src/expectationTracker/lib/workInProgressTracker.ts

+2-5
Original file line numberDiff line numberDiff line change
@@ -121,11 +121,8 @@ export class WorkInProgressTracker {
121121
},
122122
})
123123

124-
if (this.tracker.trackedExpectationAPI.onExpectationFulfilled(wip.trackedExp)) {
125-
// Something was triggered, run again asap.
126-
// We should reevaluate asap, so that any other expectation which might be waiting on this work could start.
127-
this.tracker.triggerEvaluationNow()
128-
}
124+
// Trigger another evaluation ASAP, since a worker is free now, another expectation might be able to start:
125+
this.tracker.triggerEvaluationNow()
129126
} else {
130127
// Expectation not in WORKING state, ignore
131128
}

shared/packages/expectationManager/src/expectationTracker/lib/workerScaler.ts

+11-7
Original file line numberDiff line numberDiff line change
@@ -57,24 +57,26 @@ export class WorkerScaler {
5757
this.waitingExpectations = []
5858

5959
for (const exp of this.tracker.trackedExpectations.list()) {
60+
/** The expectation is waiting on another expectation */
61+
const isWaitingForOther = this.tracker.trackedExpectationAPI.isExpectationWaitingForOther(exp)
62+
6063
/** The expectation is waiting for a worker */
6164
const isWaiting: boolean =
6265
exp.state === ExpectedPackageStatusAPI.WorkStatusState.NEW ||
6366
exp.state === ExpectedPackageStatusAPI.WorkStatusState.WAITING ||
6467
exp.state === ExpectedPackageStatusAPI.WorkStatusState.READY
6568

6669
/** Not supported by any worker */
67-
const notSupportedByAnyWorker: boolean = exp.availableWorkers.size === 0
70+
const notSupportedByAnyWorker = exp.availableWorkers.size === 0
71+
6872
/** No worker has had time to work on it lately */
69-
const notAssignedToAnyWorker: boolean =
73+
const notAssignedToAnyWorkerForSomeTime: boolean =
7074
!!exp.noWorkerAssignedTime &&
7175
Date.now() - exp.noWorkerAssignedTime > this.tracker.constants.SCALE_UP_TIME
7276

73-
if (
74-
isWaiting &&
75-
(notSupportedByAnyWorker || notAssignedToAnyWorker) &&
76-
!this.tracker.trackedExpectationAPI.isExpectationWaitingForOther(exp) // Filter out expectations that aren't ready to begin working on anyway
77-
) {
77+
// Is the expectation waiting for resources?
78+
if (!isWaitingForOther && (isWaiting || notSupportedByAnyWorker || notAssignedToAnyWorkerForSomeTime)) {
79+
// Add a second round of waiting, to ensure that we don't scale up prematurely:
7880
if (!exp.waitingForWorkerTime) {
7981
this.logger.silly(
8082
`Starting to track how long expectation "${expLabel(exp)}" has been waiting for a worker`
@@ -84,6 +86,8 @@ export class WorkerScaler {
8486
} else {
8587
exp.waitingForWorkerTime = null
8688
}
89+
90+
// If the expectation has been waiting for long enough:
8791
if (exp.waitingForWorkerTime) {
8892
const hasBeenWaitingFor = Date.now() - exp.waitingForWorkerTime
8993
if (

tests/internal-tests/src/__mocks__/child_process.ts

+9-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import path from 'path'
77

88
const fsCopyFile = promisify(fs.copyFile)
99
const fsMkdir = promisify(fs.mkdir)
10+
const fsStat = promisify(fs.stat)
1011

1112
const child_process: any = jest.createMockFromModule('child_process')
1213

@@ -173,7 +174,13 @@ async function robocopy(spawned: SpawnedProcess, args: string[]) {
173174
const source = path.join(sourceFolder, file)
174175
const destination = path.join(destinationFolder, file)
175176

176-
await fsMkdir(destinationFolder) // robocopy automatically creates the destination folder
177+
try {
178+
await fsStat(destinationFolder)
179+
} catch (e) {
180+
if (`${e}`.match(/ENOENT/)) {
181+
await fsMkdir(destinationFolder) // robocopy automatically creates the destination folder
182+
} else throw e
183+
}
177184

178185
await fsCopyFile(source, destination)
179186
}
@@ -270,4 +277,5 @@ async function netUse(commandString: string): Promise<{ stdout: string; stderr:
270277
return { stdout, stderr }
271278
}
272279
}
280+
273281
module.exports = child_process

0 commit comments

Comments
 (0)