Skip to content

Commit 5ca5f57

Browse files
committed
Do not mutate source docs when executing pipeline
Prevents aggregate results from appearing as sources
1 parent 062399b commit 5ca5f57

File tree

1 file changed

+8
-5
lines changed

1 file changed

+8
-5
lines changed

src/engine/runPipeline.ts

+8-5
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@ export async function runPipeline(
2020
onStatus({ status: 'sourcing' });
2121

2222
// get all documents from the source
23-
let docs: Doc[] = await getDocs(pipeline);
23+
const docs: Doc[] = await getDocs(pipeline);
2424
// validate docs
2525
if (!docs?.length) {
2626
throw new Error('At least one document is required!');
2727
}
2828

29+
// clone docs for execution
30+
let newDocs: Doc[] = structuredClone(docs);
31+
2932
// get all connections to source
3033
let stepIds = source.connectsTo;
3134
// find all connected steps
@@ -37,7 +40,7 @@ export async function runPipeline(
3740
// if it's a basic doc/result step - process all docs with it
3841
if (nextStep.input === 'doc' || nextStep.input === 'result') {
3942
await Promise.all(
40-
docs.map((doc) =>
43+
newDocs.map((doc) =>
4144
litlytics.runStep({
4245
step: nextStep!,
4346
source,
@@ -47,7 +50,7 @@ export async function runPipeline(
4750
})
4851
)
4952
);
50-
docs = docs.filter((d) => d !== undefined);
53+
newDocs = newDocs.filter((d) => d !== undefined);
5154
// if it's an aggregate step - only run it once
5255
} else if (
5356
nextStep.input === 'aggregate-docs' ||
@@ -66,7 +69,7 @@ export async function runPipeline(
6669
doc: aggregateResult,
6770
allDocs: docs,
6871
})) as Doc;
69-
docs.push(aggregateResult);
72+
newDocs.push(aggregateResult);
7073
}
7174
// get next step and continue
7275
stepIds = nextStep.connectsTo;
@@ -76,7 +79,7 @@ export async function runPipeline(
7679
const Output = outputProviders[pipeline.output.outputType];
7780
const output = new Output(pipeline);
7881
// save result docs
79-
output.saveResults(docs);
82+
output.saveResults(newDocs);
8083
// process and store final results
8184
pipeline.results = output.getResult();
8285
break;

0 commit comments

Comments
 (0)