diff --git a/app/common/src/main/java/stirling/software/common/service/ToolMetadataService.java b/app/common/src/main/java/stirling/software/common/service/ToolMetadataService.java index 662878b741..fb7a928d76 100644 --- a/app/common/src/main/java/stirling/software/common/service/ToolMetadataService.java +++ b/app/common/src/main/java/stirling/software/common/service/ToolMetadataService.java @@ -1,11 +1,21 @@ package stirling.software.common.service; +import java.util.List; + /** Provides metadata about tool endpoints for internal dispatch. */ public interface ToolMetadataService { /** Returns true if the given operation path accepts multiple input files. */ boolean isMultiInput(String operationPath); + /** + * Returns the file extensions (lowercase, no leading dot, e.g. {@code "pdf"}) that the + * operation accepts as input ({@code output=false}) or produces as output ({@code + * output=true}), derived from the endpoint's declared type. Returns {@code null} when the + * endpoint declares no specific type, which callers should treat as "any type accepted". + */ + List getExtensionTypes(boolean output, String operationPath); + /** * Returns true when the endpoint's ZIP response is a transport for multiple typed results and * should be unpacked: multi-output endpoints (Type:SIMO / Type:MIMO) and wrapper declarations diff --git a/app/core/src/main/java/stirling/software/SPDF/service/ApiDocService.java b/app/core/src/main/java/stirling/software/SPDF/service/ApiDocService.java index b0d2e5a4a0..3a8d418aa6 100644 --- a/app/core/src/main/java/stirling/software/SPDF/service/ApiDocService.java +++ b/app/core/src/main/java/stirling/software/SPDF/service/ApiDocService.java @@ -63,6 +63,7 @@ private String getApiDocsUrl() { return "http://localhost:" + port + contextPath + "/v1/api-docs"; } + @Override public List getExtensionTypes(boolean output, String operationName) { if (outputToFileTypes.isEmpty()) { outputToFileTypes.put("PDF", List.of("pdf")); diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/policy/controller/PolicyController.java b/app/proprietary/src/main/java/stirling/software/proprietary/policy/controller/PolicyController.java new file mode 100644 index 0000000000..0a94eef1fc --- /dev/null +++ b/app/proprietary/src/main/java/stirling/software/proprietary/policy/controller/PolicyController.java @@ -0,0 +1,319 @@ +package stirling.software.proprietary.policy.controller; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.core.io.FileSystemResource; +import org.springframework.core.io.Resource; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.util.MultiValueMap; +import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.multipart.MultipartFile; +import org.springframework.web.multipart.MultipartHttpServletRequest; +import org.springframework.web.server.ResponseStatusException; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import io.github.pixee.security.Filenames; +import io.swagger.v3.oas.annotations.Hidden; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import stirling.software.common.model.job.JobResponse; +import stirling.software.common.util.TempFile; +import stirling.software.common.util.TempFileManager; +import stirling.software.proprietary.policy.engine.PolicyRunHandle; +import stirling.software.proprietary.policy.engine.PolicyRunRegistry; +import stirling.software.proprietary.policy.model.PipelineDefinition; +import stirling.software.proprietary.policy.model.Policy; +import stirling.software.proprietary.policy.model.PolicyInputs; +import stirling.software.proprietary.policy.model.PolicyRun; +import stirling.software.proprietary.policy.model.PolicyRunStatus; +import stirling.software.proprietary.policy.model.PolicyRunView; +import stirling.software.proprietary.policy.progress.PolicyProgressListener; +import stirling.software.proprietary.policy.store.PolicyStore; +import stirling.software.proprietary.policy.trigger.ManualTrigger; +import stirling.software.proprietary.security.config.PremiumEndpoint; + +import tools.jackson.core.JacksonException; +import tools.jackson.databind.ObjectMapper; + +/** + * Manages policies and runs pipelines. The premium backend entry point: CRUD for stored {@code + * Policy} objects, running a stored policy by id, and running an ad-hoc pipeline (for AI/Automate + * one-offs). + * + *

Runs execute asynchronously and return a run id immediately. Poll {@code GET /run/{runId}} for + * status, and download outputs via the existing {@code GET /api/v1/general/files/{fileId}} using + * the file ids in the run view. + */ +@Slf4j +@RestController +@RequestMapping("/api/v1/policies") +@Hidden +@PremiumEndpoint +@RequiredArgsConstructor +@Tag(name = "Policies", description = "Run tool pipelines on the backend") +public class PolicyController { + + private final ManualTrigger manualTrigger; + private final PolicyRunRegistry runRegistry; + private final PolicyStore policyStore; + private final ObjectMapper objectMapper; + private final TempFileManager tempFileManager; + + /** SSE emitter timeout, generous enough for long multi-step runs on large files. */ + @Value("${stirling.policies.streamTimeoutMs:1800000}") + private long streamTimeoutMs; + + @PostMapping(value = "/run", consumes = MediaType.MULTIPART_FORM_DATA_VALUE) + @Operation( + summary = "Run a tool pipeline", + description = + "Accepts the documents to process (multipart field 'fileInput'), any supporting" + + " files (each under a multipart field named as its asset key, e.g." + + " 'company-logo'), and a JSON pipeline definition ('json'). Runs the" + + " steps in order asynchronously and returns a run id. Poll the run" + + " status endpoint and download outputs via /api/v1/general/files/{id}.") + public ResponseEntity> run( + @RequestParam("json") String json, MultipartHttpServletRequest request) + throws IOException { + PipelineDefinition definition = parseDefinition(json); + PolicyInputs inputs = collectInputs(request); + String runId = manualTrigger.fire(definition, inputs, PolicyProgressListener.NOOP).runId(); + return ResponseEntity.accepted().body(new JobResponse<>(true, runId, null)); + } + + @PostMapping(value = "/run/stream", consumes = MediaType.MULTIPART_FORM_DATA_VALUE) + @Operation( + summary = "Run a tool pipeline with live progress", + description = + "Same as /run, but returns Server-Sent Events: a 'step' event as each step" + + " starts and completes, then a terminal 'completed', 'failed'," + + " 'cancelled', or 'waiting' event carrying the final run view.") + public SseEmitter runStream( + @RequestParam("json") String json, MultipartHttpServletRequest request) + throws IOException { + PipelineDefinition definition = parseDefinition(json); + PolicyInputs inputs = collectInputs(request); + + SseEmitter emitter = new SseEmitter(streamTimeoutMs); + emitter.onError(e -> log.warn("Policy run SSE emitter error", e)); + + PolicyRunHandle handle = manualTrigger.fire(definition, inputs, streamListener(emitter)); + // Close the stream with a terminal event once the run finishes. whenComplete runs on the + // engine's worker thread after the run is done, so this never races the step events. + handle.completion() + .whenComplete( + (run, throwable) -> { + if (throwable != null) { + sendEvent( + emitter, + "failed", + Map.of("message", throwable.getMessage())); + } else { + sendEvent(emitter, terminalEventName(run), PolicyRunView.of(run)); + } + emitter.complete(); + }); + return emitter; + } + + @GetMapping("/run/{runId}") + @Operation( + summary = "Get pipeline run status", + description = "Returns the current status, step cursor, and output files of a run.") + public ResponseEntity status(@PathVariable String runId) { + PolicyRun run = runRegistry.get(runId); + if (run == null) { + return ResponseEntity.notFound().build(); + } + return ResponseEntity.ok(PolicyRunView.of(run)); + } + + // --- Policy management --- + + @PostMapping(consumes = MediaType.APPLICATION_JSON_VALUE) + @Operation( + summary = "Create or update a policy", + description = + "Stores a policy (trigger config + steps + output + metadata). A blank id is" + + " assigned; returns the stored policy with its id.") + public ResponseEntity savePolicy(@RequestBody String json) { + return ResponseEntity.ok(policyStore.save(parsePolicy(json))); + } + + @GetMapping + @Operation(summary = "List policies") + public List listPolicies() { + return policyStore.all(); + } + + @GetMapping("/{policyId}") + @Operation(summary = "Get a policy by id") + public ResponseEntity getPolicy(@PathVariable String policyId) { + return policyStore + .get(policyId) + .map(ResponseEntity::ok) + .orElseGet(() -> ResponseEntity.notFound().build()); + } + + @DeleteMapping("/{policyId}") + @Operation(summary = "Delete a policy by id") + public ResponseEntity deletePolicy(@PathVariable String policyId) { + return policyStore.delete(policyId) + ? ResponseEntity.noContent().build() + : ResponseEntity.notFound().build(); + } + + @PostMapping(value = "/{policyId}/run", consumes = MediaType.MULTIPART_FORM_DATA_VALUE) + @Operation( + summary = "Run a stored policy", + description = + "Runs the stored policy's pipeline on the supplied files (primary documents" + + " under 'fileInput', supporting files under their asset-key fields)." + + " Runs regardless of the policy's enabled flag, which only gates" + + " automatic triggering. Returns a run id.") + public ResponseEntity> runStoredPolicy( + @PathVariable String policyId, MultipartHttpServletRequest request) throws IOException { + Policy policy = + policyStore + .get(policyId) + .orElseThrow( + () -> + new ResponseStatusException( + HttpStatus.NOT_FOUND, "No policy: " + policyId)); + PolicyInputs inputs = collectInputs(request); + String runId = manualTrigger.run(policy, inputs, PolicyProgressListener.NOOP).runId(); + return ResponseEntity.accepted().body(new JobResponse<>(true, runId, null)); + } + + private Policy parsePolicy(String json) { + try { + return objectMapper.readValue(json, Policy.class); + } catch (JacksonException e) { + throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Invalid policy JSON"); + } + } + + private PipelineDefinition parseDefinition(String json) { + PipelineDefinition definition; + try { + definition = objectMapper.readValue(json, PipelineDefinition.class); + } catch (JacksonException e) { + throw new ResponseStatusException( + HttpStatus.BAD_REQUEST, "Invalid pipeline definition JSON"); + } + if (definition.steps().isEmpty()) { + throw new ResponseStatusException( + HttpStatus.BAD_REQUEST, "Pipeline definition has no steps"); + } + return definition; + } + + /** + * Split the multipart file parts into the primary document stream ("fileInput") and the named + * supporting-file store: every other file field becomes an asset keyed by its field name, which + * a step references from {@code fileParameters}. + */ + private PolicyInputs collectInputs(MultipartHttpServletRequest request) throws IOException { + MultiValueMap fileMap = request.getMultiFileMap(); + List primary = toResources(fileMap.get("fileInput")); + Map> supportingFiles = new LinkedHashMap<>(); + for (Map.Entry> entry : fileMap.entrySet()) { + if ("fileInput".equals(entry.getKey())) { + continue; + } + List assets = toResources(entry.getValue()); + if (!assets.isEmpty()) { + supportingFiles.put(entry.getKey(), assets); + } + } + return new PolicyInputs(primary, supportingFiles); + } + + /** + * A progress listener that forwards each step transition to the SSE stream as a "step" event. + */ + private PolicyProgressListener streamListener(SseEmitter emitter) { + return new PolicyProgressListener() { + @Override + public void onStepStart(int stepIndex, int stepCount, String operation) { + sendEvent(emitter, "step", stepEvent("started", stepIndex, stepCount, operation)); + } + + @Override + public void onStepComplete(int stepIndex, int stepCount, String operation) { + sendEvent(emitter, "step", stepEvent("completed", stepIndex, stepCount, operation)); + } + }; + } + + private static Map stepEvent( + String phase, int stepIndex, int stepCount, String operation) { + return Map.of( + "phase", phase, + "stepIndex", stepIndex, + "stepCount", stepCount, + "operation", operation); + } + + private static String terminalEventName(PolicyRun run) { + PolicyRunStatus status = run.getStatus(); + return switch (status) { + case COMPLETED -> "completed"; + case FAILED -> "failed"; + case CANCELLED -> "cancelled"; + case WAITING_FOR_INPUT -> "waiting"; + default -> "ended"; + }; + } + + private void sendEvent(SseEmitter emitter, String name, Object data) { + try { + emitter.send(SseEmitter.event().name(name).data(data, MediaType.APPLICATION_JSON)); + } catch (IOException | IllegalStateException e) { + // Client disconnected or the emitter already closed. The run continues and its results + // remain downloadable via the job endpoints; nothing useful left to stream. + log.debug("Dropping policy SSE event '{}': {}", name, e.getMessage()); + } + } + + private List toResources(List files) throws IOException { + List resources = new ArrayList<>(); + if (files == null) { + return resources; + } + for (MultipartFile file : files) { + if (file == null || file.isEmpty()) { + continue; + } + TempFile tempFile = tempFileManager.createManagedTempFile("policy-run"); + file.transferTo(tempFile.getPath()); + final String originalName = Filenames.toSimpleFileName(file.getOriginalFilename()); + resources.add( + new FileSystemResource(tempFile.getFile()) { + @Override + public String getFilename() { + return originalName; + } + }); + } + return resources; + } +} diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/policy/engine/PolicyEngine.java b/app/proprietary/src/main/java/stirling/software/proprietary/policy/engine/PolicyEngine.java new file mode 100644 index 0000000000..8f41209c7e --- /dev/null +++ b/app/proprietary/src/main/java/stirling/software/proprietary/policy/engine/PolicyEngine.java @@ -0,0 +1,261 @@ +package stirling.software.proprietary.policy.engine; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; + +import org.springframework.core.io.Resource; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Service; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import stirling.software.common.model.job.ResultFile; +import stirling.software.common.service.FileStorage; +import stirling.software.common.service.InternalApiTimeoutException; +import stirling.software.common.service.JobOwnershipService; +import stirling.software.common.service.JobQueue; +import stirling.software.common.service.ResourceMonitor; +import stirling.software.common.service.TaskManager; +import stirling.software.common.util.ExecutorFactory; +import stirling.software.proprietary.policy.model.OutputSpec; +import stirling.software.proprietary.policy.model.PipelineDefinition; +import stirling.software.proprietary.policy.model.Policy; +import stirling.software.proprietary.policy.model.PolicyInputs; +import stirling.software.proprietary.policy.model.PolicyRun; +import stirling.software.proprietary.policy.model.WaitState; +import stirling.software.proprietary.policy.output.PolicyOutputSink; +import stirling.software.proprietary.policy.progress.PolicyProgressListener; + +/** + * Runs pipelines asynchronously as tracked jobs. + * + *

Each run is the unit of async work: {@link #submit} returns a run id immediately and the + * pipeline executes on a virtual thread, so a step blocking on a slow tool does not tie up a + * platform thread. The run drives {@link PolicyExecutor} for the actual step loop, registers its + * outputs and progress with {@link TaskManager} (so the existing job status/download endpoints work + * unchanged), and keeps rich state in {@link PolicyRunRegistry}. + * + *

The engine deliberately manages its own virtual-thread execution rather than routing through + * {@code JobExecutorService}: that path force-completes a job once its work returns, which is + * incompatible with a run that suspends in {@code WAITING_FOR_INPUT}. It still applies the shared + * {@link ResourceMonitor}/{@link JobQueue} admission control, so heavy runs queue under load + * instead of oversubscribing. + */ +@Slf4j +@Service +@RequiredArgsConstructor +public class PolicyEngine { + + /** + * Resource weight of a pipeline run for admission control. A run chains many tools and holds + * intermediate files, so it is weighted as heavy work: the shared {@link ResourceMonitor} + * should let it start while the system is healthy but hold it back under memory/CPU pressure. + * See {@link ResourceMonitor#shouldQueueJob(int)} for how a weight maps to that decision. + */ + private static final int RUN_RESOURCE_WEIGHT = 50; + + private final PolicyExecutor stepExecutor; + private final TaskManager taskManager; + private final PolicyRunRegistry registry; + private final FileStorage fileStorage; + private final JobOwnershipService jobOwnershipService; + private final List outputSinks; + private final ResourceMonitor resourceMonitor; + private final JobQueue jobQueue; + + private final ExecutorService asyncExecutor = ExecutorFactory.newVirtualThreadExecutor(); + + /** + * Submit a pipeline to run asynchronously. The returned handle's run id scopes a job in {@link + * TaskManager}, so progress (notes), status, and result files are observable via the existing + * job endpoints as well as via {@link #getRun(String)}; its completion future resolves when the + * run reaches a terminal or paused state. + */ + public PolicyRunHandle submit( + PipelineDefinition definition, PolicyInputs inputs, PolicyProgressListener listener) { + // Scope the run id to the current user (on this request thread) so the file-download + // ownership check passes; NoOpJobOwnershipService returns the id unchanged when security + // is off. + String runId = jobOwnershipService.createScopedJobKey(UUID.randomUUID().toString()); + taskManager.createTask(runId); + PolicyRun run = new PolicyRun(runId, definition); + registry.register(run); + CompletableFuture completion = new CompletableFuture<>(); + PolicyProgressListener tracking = trackingListener(runId, run, listener); + Runnable task = () -> runToCompletion(run, inputs, tracking, completion); + + // Each run is one admission unit; steps run synchronously within it, so this gates heavy + // work under load without the pool-within-pool risk of queueing each tool call. Under + // resource pressure the run waits in the shared JobQueue; otherwise it starts immediately. + if (resourceMonitor.shouldQueueJob(RUN_RESOURCE_WEIGHT)) { + log.debug("Queueing policy run {} under resource pressure", runId); + jobQueue.queueJob( + runId, + RUN_RESOURCE_WEIGHT, + () -> { + task.run(); + return null; + }, + 0L) + .exceptionally(ex -> failRejectedRun(run, completion, ex)); + } else { + asyncExecutor.execute(task); + } + return new PolicyRunHandle(runId, completion); + } + + /** + * Run a stored policy on demand. Builds the policy's pipeline and submits it. {@code enabled} + * gates automatic triggering, not explicit runs, so this runs regardless of that flag. + */ + public PolicyRunHandle runPolicy( + Policy policy, PolicyInputs inputs, PolicyProgressListener listener) { + return submit(policy.toDefinition(), inputs, listener); + } + + public PolicyRun getRun(String runId) { + return registry.get(runId); + } + + /** + * Request cancellation of a run. Stage 1 marks the run cancelled in the registry if it has not + * already finished; interrupting an in-flight tool call lands in a later stage. + */ + public boolean cancel(String runId) { + PolicyRun run = registry.get(runId); + if (run == null) { + return false; + } + boolean cancelled = run.cancel(); + if (cancelled) { + taskManager.addNote(runId, "Run cancelled by request"); + } + return cancelled; + } + + /** + * Resume a run paused in {@code WAITING_FOR_INPUT}. Not yet implemented; the run shape and + * {@link WaitState} snapshot are in place so this can be added without reworking the engine. + */ + public String resume(String runId, List additionalInputs) { + throw new UnsupportedOperationException("Pause/resume is not yet implemented"); + } + + private void runToCompletion( + PolicyRun run, + PolicyInputs inputs, + PolicyProgressListener listener, + CompletableFuture completion) { + String runId = run.getRunId(); + try { + run.markRunning(); + PolicyExecutionResult result = + stepExecutor.execute(run.getDefinition(), inputs, listener); + OutputSpec output = run.getDefinition().output(); + List outputs = sinkFor(output).deliver(runId, result.files(), output); + taskManager.setMultipleFileResults(runId, outputs); + taskManager.setComplete(runId); + run.complete(outputs); + } catch (PolicyInputRequiredException e) { + // Designed-for path: suspend the run rather than fail it. Persist intermediates as + // fileIds so the run can resume after this worker thread is gone. + WaitState wait = suspend(e); + run.waitForInput(wait); + taskManager.addNote(runId, "Waiting for input: " + e.getMessage()); + } catch (InternalApiTimeoutException e) { + String message = toolTimeoutMessage(e); + log.error( + "Policy run {} timed out on {}: {}", + runId, + e.getEndpointPath(), + e.getMessage()); + run.fail(message); + taskManager.setError(runId, message); + } catch (Exception e) { + String message = "Policy run failed: " + e.getMessage(); + log.error("Policy run {} failed", runId, e); + run.fail(message); + taskManager.setError(runId, message); + } finally { + // Always resolve the handle with the run's final state so stream/await callers unblock. + completion.complete(run); + } + } + + private ResponseEntity failRejectedRun( + PolicyRun run, CompletableFuture completion, Throwable ex) { + // Only reached if the run never started (e.g. the queue was full). A run that started + // always resolves its own completion in runToCompletion. + if (!completion.isDone()) { + String message = "Policy run could not be queued: " + ex.getMessage(); + log.error("Policy run {} was not admitted: {}", run.getRunId(), ex.getMessage()); + run.fail(message); + taskManager.setError(run.getRunId(), message); + completion.complete(run); + } + return null; + } + + private WaitState suspend(PolicyInputRequiredException e) { + List fileIds = new ArrayList<>(); + for (Resource resource : e.getPendingFiles()) { + String name = resource.getFilename() != null ? resource.getFilename() : "pending"; + try (InputStream is = resource.getInputStream()) { + fileIds.add(fileStorage.storeInputStream(is, name).fileId()); + } catch (IOException io) { + log.warn("Failed to persist pending file for paused run: {}", io.getMessage()); + } + } + return new WaitState(e.getMessage(), e.getResumeStepIndex(), fileIds); + } + + private PolicyProgressListener trackingListener( + String runId, PolicyRun run, PolicyProgressListener delegate) { + return new PolicyProgressListener() { + @Override + public void onStepStart(int stepIndex, int stepCount, String operation) { + run.enterStep(stepIndex); + taskManager.addNote( + runId, + "Step " + stepIndex + "/" + stepCount + ": " + operation + " started"); + delegate.onStepStart(stepIndex, stepCount, operation); + } + + @Override + public void onStepComplete(int stepIndex, int stepCount, String operation) { + taskManager.addNote( + runId, + "Step " + stepIndex + "/" + stepCount + ": " + operation + " completed"); + delegate.onStepComplete(stepIndex, stepCount, operation); + } + + @Override + public void onHeartbeat() { + delegate.onHeartbeat(); + } + }; + } + + private PolicyOutputSink sinkFor(OutputSpec spec) { + return outputSinks.stream() + .filter(sink -> sink.supports(spec)) + .findFirst() + .orElseThrow( + () -> + new IllegalStateException( + "No output sink supports spec: " + + (spec == null ? "" : spec.type()))); + } + + private static String toolTimeoutMessage(InternalApiTimeoutException e) { + return String.format( + "The %s tool did not respond within %d seconds and was aborted.", + e.getEndpointPath(), e.getReadTimeout().toSeconds()); + } +} diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/policy/engine/PolicyExecutionResult.java b/app/proprietary/src/main/java/stirling/software/proprietary/policy/engine/PolicyExecutionResult.java new file mode 100644 index 0000000000..eb5f028bde --- /dev/null +++ b/app/proprietary/src/main/java/stirling/software/proprietary/policy/engine/PolicyExecutionResult.java @@ -0,0 +1,17 @@ +package stirling.software.proprietary.policy.engine; + +import java.util.List; + +import org.springframework.core.io.Resource; + +import tools.jackson.databind.JsonNode; + +/** + * Result of running a pipeline through {@link PolicyExecutor}. + * + *

{@code files} are the final output resources (temp files, not yet stored to {@code + * FileStorage}). {@code report} is the structured metadata payload captured from the last step that + * produced one (a JSON body, or an {@code X-Stirling-Tool-Report} header), with {@code reportTool} + * naming the step it came from; both are null when no step produced a report. + */ +public record PolicyExecutionResult(List files, JsonNode report, String reportTool) {} diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/policy/engine/PolicyExecutor.java b/app/proprietary/src/main/java/stirling/software/proprietary/policy/engine/PolicyExecutor.java new file mode 100644 index 0000000000..92a436bdfd --- /dev/null +++ b/app/proprietary/src/main/java/stirling/software/proprietary/policy/engine/PolicyExecutor.java @@ -0,0 +1,312 @@ +package stirling.software.proprietary.policy.engine; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import org.springframework.core.io.Resource; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Service; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import stirling.software.common.service.InternalApiClient; +import stirling.software.common.service.InternalApiTimeoutException; +import stirling.software.common.service.ToolMetadataService; +import stirling.software.common.util.TempFileManager; +import stirling.software.common.util.ZipExtractionUtils; +import stirling.software.proprietary.policy.model.PipelineDefinition; +import stirling.software.proprietary.policy.model.PipelineStep; +import stirling.software.proprietary.policy.model.PolicyInputs; +import stirling.software.proprietary.policy.progress.PolicyProgressListener; +import stirling.software.proprietary.service.AiToolResponseHeaders; + +import tools.jackson.core.JacksonException; +import tools.jackson.databind.JsonNode; +import tools.jackson.databind.ObjectMapper; + +/** + * Runs an ordered chain of tool steps, chaining each step's output files into the next step's + * input. + * + *

This is the single execution loop for the proprietary surface (AI plans now; + * manually-triggered runs and watched folders later). Each step is dispatched synchronously via + * {@link InternalApiClient} loopback HTTP: the tool runs in its own handler and returns its file + * inline. The caller decides how to run the executor itself (the AI turn loop calls it directly; + * the engine runs it on a virtual thread for async runs). Files cross step boundaries as {@link + * Resource} temp files; they are only persisted to durable storage at the run boundaries by the + * caller. + */ +@Slf4j +@Service +@RequiredArgsConstructor +public class PolicyExecutor { + + private static final String FILTER_OPERATION_PREFIX = "/api/v1/filter/filter-"; + + private final InternalApiClient internalApiClient; + private final ToolMetadataService toolMetadataService; + private final TempFileManager tempFileManager; + private final ObjectMapper objectMapper; + + /** + * Internal value-class for tool responses. {@code files} holds any result files (typically one; + * multiple for ZIP-response tools). {@code report} holds an optional structured metadata + * payload the tool chose to surface alongside (or instead of) a file. + */ + private record ToolResult(List files, JsonNode report) {} + + /** + * Execute every step in {@code definition} in order, feeding each step's output into the next. + * Supporting files supplied in {@code inputs} are bound to steps' named file fields and never + * enter the document stream. + * + * @param definition the pipeline to run (must have at least one step) + * @param inputs the primary documents plus the named supporting-file store + * @param listener receives per-step progress + * @return the final output files plus the last structured report produced, if any + * @throws InternalApiTimeoutException if a tool does not respond within its read timeout + * @throws IOException if a tool returns a non-OK response, references a missing supporting + * file, or a file cannot be read + */ + public PolicyExecutionResult execute( + PipelineDefinition definition, PolicyInputs inputs, PolicyProgressListener listener) + throws IOException { + List steps = definition.steps(); + if (steps.isEmpty()) { + throw new IllegalArgumentException("Pipeline definition has no steps"); + } + + List currentFiles = inputs.primary(); + Map> supportingFiles = inputs.supportingFiles(); + // Propagate the *last* non-null report; the terminal step defines the output. + JsonNode lastReport = null; + String lastReportTool = null; + + for (int i = 0; i < steps.size(); i++) { + PipelineStep step = steps.get(i); + String operation = step.operation(); + if (operation == null || operation.isBlank()) { + throw new IllegalArgumentException( + "Pipeline step " + (i + 1) + " has no operation"); + } + listener.onStepStart(i + 1, steps.size(), operation); + ToolResult stepResult = executeStep(step, currentFiles, supportingFiles); + currentFiles = stepResult.files(); + if (stepResult.report() != null) { + lastReport = stepResult.report(); + lastReportTool = operation; + } + listener.onStepComplete(i + 1, steps.size(), operation); + } + + return new PolicyExecutionResult(currentFiles, lastReport, lastReportTool); + } + + /** + * Execute a single tool step. If the endpoint accepts multiple files, all files are sent in one + * call. Otherwise, the endpoint is called once per file. ZIP responses are unpacked so each + * inner file is treated as its own result (e.g. split outputs a ZIP of pages). + * + *

A structured {@code report} may be returned alongside (or instead of) files; see {@link + * ToolResult}. For per-file dispatch (single-input endpoints called once per input), the first + * non-null report wins. + */ + private ToolResult executeStep( + PipelineStep step, + List inputFiles, + Map> supportingFiles) + throws IOException { + requireAcceptedTypes(step.operation(), inputFiles); + List files = new ArrayList<>(); + JsonNode report = null; + if (toolMetadataService.isMultiInput(step.operation())) { + ToolResult r = callEndpoint(step, inputFiles, supportingFiles); + files.addAll(r.files()); + report = r.report(); + } else { + for (Resource file : inputFiles) { + ToolResult r = callEndpoint(step, List.of(file), supportingFiles); + files.addAll(r.files()); + if (report == null) { + report = r.report(); + } + } + } + return new ToolResult(files, report); + } + + /** + * Call an endpoint and return its result files and optional report. + * + *

+ */ + private ToolResult callEndpoint( + PipelineStep step, List files, Map> supportingFiles) + throws IOException { + String endpointPath = step.operation(); + MultiValueMap body = new LinkedMultiValueMap<>(); + for (Resource file : files) { + body.add("fileInput", file); + } + // Bind supporting files to their named tool fields (e.g. stampImage, overlayFiles). These + // come from the run's named asset store, not the document stream. + for (Map.Entry binding : step.fileParameters().entrySet()) { + String fieldName = binding.getKey(); + String assetKey = binding.getValue(); + List assets = supportingFiles.get(assetKey); + if (assets == null || assets.isEmpty()) { + throw new IOException( + "Step " + + endpointPath + + " references supporting file '" + + assetKey + + "' for field '" + + fieldName + + "' but no such file was provided"); + } + for (Resource asset : assets) { + body.add(fieldName, asset); + } + } + for (Map.Entry entry : step.parameters().entrySet()) { + if (entry.getValue() instanceof List list) { + if (containsStructuredElements(list)) { + // Endpoints binding lists of structured objects (e.g. /security/redact's + // redactions, /general/edit-text's edits) parse a single JSON string field via + // a property editor. Pre-serialize the whole list so binding succeeds. + body.add(entry.getKey(), objectMapper.writeValueAsString(list)); + } else { + for (Object item : list) { + body.add(entry.getKey(), item); + } + } + } else { + body.add(entry.getKey(), entry.getValue()); + } + } + ResponseEntity response = internalApiClient.post(endpointPath, body); + if (!HttpStatus.OK.equals(response.getStatusCode()) || response.getBody() == null) { + throw new IOException( + "Tool returned HTTP " + response.getStatusCode() + " for " + endpointPath); + } + Resource resource = response.getBody(); + + // Filter operations return an empty body to signal the file was filtered out: drop it + // rather than forwarding a zero-byte document. + if (isFilterOperation(endpointPath) && isEmpty(resource)) { + return new ToolResult(List.of(), null); + } + + HttpHeaders headers = response.getHeaders(); + MediaType contentType = headers.getContentType(); + + // JSON-only response: the whole body is the structured report, no result file. + if (contentType != null && MediaType.APPLICATION_JSON.isCompatibleWith(contentType)) { + try (InputStream is = resource.getInputStream()) { + JsonNode report = objectMapper.readTree(is); + return new ToolResult(List.of(), report); + } + } + + JsonNode report = parseReportHeader(headers, endpointPath); + if (toolMetadataService.shouldUnpackZipResponse(endpointPath)) { + return new ToolResult(ZipExtractionUtils.extractZip(resource, tempFileManager), report); + } + return new ToolResult(List.of(resource), report); + } + + /** + * Parse the optional {@link AiToolResponseHeaders#TOOL_REPORT} header into a {@link JsonNode}, + * or return null. + */ + private JsonNode parseReportHeader(HttpHeaders headers, String endpointPath) { + String raw = headers.getFirst(AiToolResponseHeaders.TOOL_REPORT); + if (raw == null || raw.isBlank()) { + return null; + } + try { + return objectMapper.readTree(raw); + } catch (JacksonException e) { + log.warn( + "Ignoring malformed {} header from {}: {}", + AiToolResponseHeaders.TOOL_REPORT, + endpointPath, + e.getMessage()); + return null; + } + } + + private static boolean containsStructuredElements(List list) { + for (Object item : list) { + if (item instanceof Map || item instanceof List) { + return true; + } + } + return false; + } + + /** + * Fail the run if any document in the primary stream is not a file type the step accepts. An + * endpoint that declares no specific input type accepts anything. + */ + private void requireAcceptedTypes(String operation, List files) throws IOException { + List accepted = toolMetadataService.getExtensionTypes(false, operation); + if (accepted == null || accepted.isEmpty()) { + return; + } + for (Resource file : files) { + if (!matchesType(file, accepted)) { + throw new IOException( + "Step " + + operation + + " accepts " + + accepted + + " but received '" + + file.getFilename() + + "'"); + } + } + } + + private static boolean matchesType(Resource file, List acceptedExtensions) { + String filename = file.getFilename(); + if (filename == null) { + return false; + } + int dot = filename.lastIndexOf('.'); + if (dot < 0 || dot == filename.length() - 1) { + return false; + } + return acceptedExtensions.contains(filename.substring(dot + 1).toLowerCase(Locale.ROOT)); + } + + private static boolean isFilterOperation(String operation) { + return operation.startsWith(FILTER_OPERATION_PREFIX); + } + + private static boolean isEmpty(Resource resource) { + try { + return resource.contentLength() == 0; + } catch (IOException e) { + return false; + } + } +} diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/policy/engine/PolicyInputRequiredException.java b/app/proprietary/src/main/java/stirling/software/proprietary/policy/engine/PolicyInputRequiredException.java new file mode 100644 index 0000000000..fa30373c57 --- /dev/null +++ b/app/proprietary/src/main/java/stirling/software/proprietary/policy/engine/PolicyInputRequiredException.java @@ -0,0 +1,32 @@ +package stirling.software.proprietary.policy.engine; + +import java.util.List; + +import org.springframework.core.io.Resource; + +import lombok.Getter; + +/** + * Thrown by a step to signal that the run cannot proceed without further user input, pausing the + * run in {@code WAITING_FOR_INPUT} rather than failing it. + * + *

Carries everything needed to resume: a human-readable reason, the 0-based index of the step to + * resume from, and the intermediate files produced so far. The engine persists those files and + * suspends the run. + * + *

Defined now to fix the run shape; no step throws it yet, and the resume handshake is + * implemented in a later stage. + */ +@Getter +public class PolicyInputRequiredException extends RuntimeException { + + private final transient List pendingFiles; + private final int resumeStepIndex; + + public PolicyInputRequiredException( + String reason, int resumeStepIndex, List pendingFiles) { + super(reason); + this.resumeStepIndex = resumeStepIndex; + this.pendingFiles = pendingFiles == null ? List.of() : pendingFiles; + } +} diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/policy/engine/PolicyRunHandle.java b/app/proprietary/src/main/java/stirling/software/proprietary/policy/engine/PolicyRunHandle.java new file mode 100644 index 0000000000..eeda863ccc --- /dev/null +++ b/app/proprietary/src/main/java/stirling/software/proprietary/policy/engine/PolicyRunHandle.java @@ -0,0 +1,16 @@ +package stirling.software.proprietary.policy.engine; + +import java.util.concurrent.CompletableFuture; + +import stirling.software.proprietary.policy.model.PolicyRun; + +/** + * Returned by {@link PolicyEngine#submit}: the run id (for status polling and result download) plus + * a future that resolves when the run reaches a terminal or paused state. + * + *

The completion future lets callers react to the end of a run (e.g. an SSE endpoint sending a + * final event and closing the stream) without polling. It carries the {@link PolicyRun} whose + * status describes the outcome (completed, failed, cancelled, or waiting for input); it does not + * complete exceptionally for ordinary run failures. + */ +public record PolicyRunHandle(String runId, CompletableFuture completion) {} diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/policy/engine/PolicyRunRegistry.java b/app/proprietary/src/main/java/stirling/software/proprietary/policy/engine/PolicyRunRegistry.java new file mode 100644 index 0000000000..0b613726d7 --- /dev/null +++ b/app/proprietary/src/main/java/stirling/software/proprietary/policy/engine/PolicyRunRegistry.java @@ -0,0 +1,97 @@ +package stirling.software.proprietary.policy.engine; + +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import jakarta.annotation.PreDestroy; + +import lombok.extern.slf4j.Slf4j; + +import stirling.software.proprietary.policy.model.PolicyRun; + +/** + * In-memory store of live {@link PolicyRun} state, keyed by runId. Holds the authoritative run + * state machine; durable status/files for download are projected separately into {@code + * TaskManager}. + * + *

Finished runs are evicted on a fixed interval once they age past {@code + * stirling.policies.runExpiryMinutes}, mirroring the job-result expiry in {@code TaskManager} so a + * run's rich in-memory state does not outlive the process. Only terminal runs are evicted; active + * and paused ({@code WAITING_FOR_INPUT}) runs are retained regardless of age. Result files are not + * touched here: a run shares its runId with a {@code TaskManager} job, which owns file-lifecycle + * cleanup, so eviction only frees this map's entry. + */ +@Slf4j +@Service +public class PolicyRunRegistry { + + private final Map runs = new ConcurrentHashMap<>(); + + private final Duration runExpiry; + private final ScheduledExecutorService cleanupExecutor = + Executors.newSingleThreadScheduledExecutor( + Thread.ofVirtual().name("policy-run-cleanup-", 0).factory()); + + public PolicyRunRegistry( + @Value("${stirling.policies.runExpiryMinutes:30}") int runExpiryMinutes) { + this.runExpiry = Duration.ofMinutes(runExpiryMinutes); + cleanupExecutor.scheduleAtFixedRate(this::evictExpiredRuns, 10, 10, TimeUnit.MINUTES); + log.debug( + "Policy run registry initialized with run expiry of {} minutes", runExpiryMinutes); + } + + public void register(PolicyRun run) { + runs.put(run.getRunId(), run); + } + + public PolicyRun get(String runId) { + return runs.get(runId); + } + + public Collection all() { + return runs.values(); + } + + /** Scheduled hook: evict terminal runs that finished before the expiry window. */ + private void evictExpiredRuns() { + try { + evictExpired(Instant.now().minus(runExpiry)); + } catch (Exception e) { + log.error("Error during policy run cleanup: {}", e.getMessage(), e); + } + } + + /** + * Remove every terminal run last updated before {@code cutoff}; active and paused runs are kept + * regardless of age. Returns the number evicted. Package-visible so the scheduled sweep and + * tests exercise the same path with an explicit cutoff. + */ + int evictExpired(Instant cutoff) { + int removed = 0; + for (Map.Entry entry : runs.entrySet()) { + PolicyRun run = entry.getValue(); + if (run.getStatus().isTerminal() && run.getUpdatedAt().isBefore(cutoff)) { + runs.remove(entry.getKey()); + removed++; + } + } + if (removed > 0) { + log.info("Evicted {} expired policy runs", removed); + } + return removed; + } + + @PreDestroy + void shutdown() { + cleanupExecutor.shutdownNow(); + } +} diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/OutputSpec.java b/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/OutputSpec.java new file mode 100644 index 0000000000..4759810bc8 --- /dev/null +++ b/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/OutputSpec.java @@ -0,0 +1,21 @@ +package stirling.software.proprietary.policy.model; + +import java.util.Map; + +/** + * Describes where a pipeline run's output files should be delivered. {@code type} selects a {@code + * PolicyOutputSink} (e.g. "inline"); {@code options} carries sink-specific configuration. + * + *

New destinations (folder, S3) are added as new sink beans keyed on a new {@code type} without + * changing this shape or the engine. + */ +public record OutputSpec(String type, Map options) { + public OutputSpec { + options = options == null ? Map.of() : options; + } + + /** The default destination: store outputs and return them to the caller for download. */ + public static OutputSpec inline() { + return new OutputSpec("inline", Map.of()); + } +} diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/PipelineDefinition.java b/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/PipelineDefinition.java new file mode 100644 index 0000000000..983c3dbca9 --- /dev/null +++ b/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/PipelineDefinition.java @@ -0,0 +1,16 @@ +package stirling.software.proprietary.policy.model; + +import java.util.List; + +/** + * An ordered chain of tool steps plus where the output should go. + * + *

This is the single shape executed by the policy engine, shared by AI plans, manually-triggered + * runs, and (later) watched folders. {@code output} may be null for callers that handle result + * files themselves (e.g. the AI workflow, which builds its own response payload). + */ +public record PipelineDefinition(String name, List steps, OutputSpec output) { + public PipelineDefinition { + steps = steps == null ? List.of() : steps; + } +} diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/PipelineStep.java b/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/PipelineStep.java new file mode 100644 index 0000000000..52c6c060a2 --- /dev/null +++ b/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/PipelineStep.java @@ -0,0 +1,30 @@ +package stirling.software.proprietary.policy.model; + +import java.util.Map; + +/** + * A single tool invocation in a pipeline: the API endpoint path to call and the inputs to pass. + * + *

{@code operation} is a Stirling tool endpoint path (e.g. {@code /api/v1/misc/compress-pdf}), + * matching the dispatch convention used by {@code InternalApiClient}. {@code parameters} are the + * tool-specific scalar form fields. + * + *

{@code fileParameters} binds a tool's named file fields (beyond the primary {@code fileInput} + * stream) to supporting files supplied with the run: it maps the form field name (e.g. {@code + * stampImage}, {@code overlayFiles}) to an asset key in the run's supporting-file store. This keeps + * supporting inputs (a stamp image, a certificate, an overlay) out of the document stream that + * flows step to step. + */ +public record PipelineStep( + String operation, Map parameters, Map fileParameters) { + + public PipelineStep { + parameters = parameters == null ? Map.of() : parameters; + fileParameters = fileParameters == null ? Map.of() : fileParameters; + } + + /** A step with no supporting-file bindings. */ + public PipelineStep(String operation, Map parameters) { + this(operation, parameters, Map.of()); + } +} diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/Policy.java b/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/Policy.java new file mode 100644 index 0000000000..3a2acaf0c8 --- /dev/null +++ b/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/Policy.java @@ -0,0 +1,38 @@ +package stirling.software.proprietary.policy.model; + +import java.util.List; + +/** + * A stored, owned automation: how it is triggered, the ordered tool steps to run, and where its + * output goes, plus identity and metadata. + * + *

This is the central object of the feature. Everything that runs a chain of tools is a use of a + * Policy: a watched folder is a Policy with a folder {@link TriggerConfig} and a folder {@link + * OutputSpec}; a scheduled job is a Policy with a schedule trigger; manual/Automate/AI runs execute + * a Policy (or an ad-hoc {@link PipelineDefinition}) on demand. The engine itself only ever + * executes the {@link PipelineDefinition} this exposes via {@link #toDefinition()} - it is + * trigger-agnostic. + * + *

{@code enabled} gates automatic triggering (a disabled policy is not picked up by its + * trigger); it does not block an explicit manual run. + */ +public record Policy( + String id, + String name, + String owner, + boolean enabled, + TriggerConfig trigger, + List steps, + OutputSpec output) { + + public Policy { + trigger = trigger == null ? TriggerConfig.manual() : trigger; + steps = steps == null ? List.of() : steps; + output = output == null ? OutputSpec.inline() : output; + } + + /** The engine-level, trigger-agnostic view of this policy's pipeline. */ + public PipelineDefinition toDefinition() { + return new PipelineDefinition(name, steps, output); + } +} diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/PolicyInputs.java b/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/PolicyInputs.java new file mode 100644 index 0000000000..4e6ddae834 --- /dev/null +++ b/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/PolicyInputs.java @@ -0,0 +1,32 @@ +package stirling.software.proprietary.policy.model; + +import java.util.List; +import java.util.Map; + +import org.springframework.core.io.Resource; + +/** + * The files a run operates on, split into two roles: + * + *

    + *
  • {@code primary} - the documents that flow through the pipeline, each step's output becoming + * the next step's input. + *
  • {@code supportingFiles} - a named store of auxiliary files (a stamp image, certificate, + * overlay, attachments) that steps bind to their named file fields via {@link + * PipelineStep#fileParameters()}. These never enter the document stream. + *
+ * + * Asset values are lists so a single key can carry multi-file fields (e.g. attachments). + */ +public record PolicyInputs(List primary, Map> supportingFiles) { + + public PolicyInputs { + primary = primary == null ? List.of() : primary; + supportingFiles = supportingFiles == null ? Map.of() : supportingFiles; + } + + /** Inputs with primary documents only and no supporting files. */ + public static PolicyInputs of(List primary) { + return new PolicyInputs(primary, Map.of()); + } +} diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/PolicyRun.java b/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/PolicyRun.java new file mode 100644 index 0000000000..20621f547f --- /dev/null +++ b/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/PolicyRun.java @@ -0,0 +1,87 @@ +package stirling.software.proprietary.policy.model; + +import java.time.Instant; +import java.util.List; + +import lombok.Getter; + +import stirling.software.common.model.job.ResultFile; + +/** + * Live, mutable state of a single pipeline run, held in memory by {@code PolicyRunRegistry}. + * + *

This carries the rich execution state (status, step cursor, wait state) that the job system's + * {@code JobResult} does not model. The run is also projected into {@code TaskManager} for + * cluster-visible status, progress notes, and file download; this object is the authoritative + * source of the state machine. + */ +@Getter +public class PolicyRun { + + private final String runId; + private final PipelineDefinition definition; + private final Instant createdAt = Instant.now(); + + private volatile PolicyRunStatus status = PolicyRunStatus.PENDING; + + /** 1-based index of the step currently running (0 before the run starts). */ + private volatile int currentStep = 0; + + private volatile WaitState waitState; + private volatile String error; + private volatile List outputs = List.of(); + private volatile Instant updatedAt = Instant.now(); + + public PolicyRun(String runId, PipelineDefinition definition) { + this.runId = runId; + this.definition = definition; + } + + public int stepCount() { + return definition.steps().size(); + } + + public synchronized void markRunning() { + this.status = PolicyRunStatus.RUNNING; + touch(); + } + + public synchronized void enterStep(int oneBasedStepIndex) { + this.currentStep = oneBasedStepIndex; + touch(); + } + + public synchronized void complete(List resultFiles) { + this.outputs = resultFiles == null ? List.of() : List.copyOf(resultFiles); + this.status = PolicyRunStatus.COMPLETED; + touch(); + } + + public synchronized void fail(String message) { + this.error = message; + this.status = PolicyRunStatus.FAILED; + touch(); + } + + public synchronized void waitForInput(WaitState wait) { + this.waitState = wait; + this.status = PolicyRunStatus.WAITING_FOR_INPUT; + touch(); + } + + /** + * Mark cancelled if the run has not already reached a terminal state. Returns whether it did. + */ + public synchronized boolean cancel() { + if (status.isTerminal()) { + return false; + } + this.status = PolicyRunStatus.CANCELLED; + touch(); + return true; + } + + private void touch() { + this.updatedAt = Instant.now(); + } +} diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/PolicyRunStatus.java b/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/PolicyRunStatus.java new file mode 100644 index 0000000000..ee75f5e50f --- /dev/null +++ b/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/PolicyRunStatus.java @@ -0,0 +1,21 @@ +package stirling.software.proprietary.policy.model; + +/** + * Lifecycle states of a {@link PolicyRun}. + * + *

{@code WAITING_FOR_INPUT} is modelled now so the engine and run shape support pausing a run + * (e.g. a step that blocks for a human decision) without holding a thread; the resume handshake is + * implemented in a later stage. + */ +public enum PolicyRunStatus { + PENDING, + RUNNING, + WAITING_FOR_INPUT, + COMPLETED, + FAILED, + CANCELLED; + + public boolean isTerminal() { + return this == COMPLETED || this == FAILED || this == CANCELLED; + } +} diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/PolicyRunView.java b/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/PolicyRunView.java new file mode 100644 index 0000000000..a3427835ce --- /dev/null +++ b/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/PolicyRunView.java @@ -0,0 +1,28 @@ +package stirling.software.proprietary.policy.model; + +import java.util.List; + +import stirling.software.common.model.job.ResultFile; + +/** + * Read-only view of a {@link PolicyRun} returned by the status endpoint. Output files are surfaced + * as {@link ResultFile} so the caller can download each via {@code GET /api/v1/general/files/{id}}. + */ +public record PolicyRunView( + String runId, + PolicyRunStatus status, + int currentStep, + int stepCount, + String error, + List outputs) { + + public static PolicyRunView of(PolicyRun run) { + return new PolicyRunView( + run.getRunId(), + run.getStatus(), + run.getCurrentStep(), + run.stepCount(), + run.getError(), + run.getOutputs()); + } +} diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/TriggerConfig.java b/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/TriggerConfig.java new file mode 100644 index 0000000000..848c48e4f5 --- /dev/null +++ b/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/TriggerConfig.java @@ -0,0 +1,25 @@ +package stirling.software.proprietary.policy.model; + +import java.util.Map; + +/** + * How a {@link Policy} is automatically triggered. {@code type} selects a trigger kind ("manual", + * "folder", "schedule", "s3"); {@code options} carries type-specific configuration (a folder path, + * a cron expression, a bucket, ...). + * + *

Data-driven and parallel to {@link OutputSpec}: new trigger kinds are new {@code type} values + * handled by a new trigger bean, with no change to the model. {@code "manual"} means there is no + * automatic trigger - the policy is only ever run on demand. + */ +public record TriggerConfig(String type, Map options) { + + public TriggerConfig { + type = type == null || type.isBlank() ? "manual" : type; + options = options == null ? Map.of() : options; + } + + /** No automatic trigger; the policy is run on demand only. */ + public static TriggerConfig manual() { + return new TriggerConfig("manual", Map.of()); + } +} diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/WaitState.java b/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/WaitState.java new file mode 100644 index 0000000000..7fff6c9fdd --- /dev/null +++ b/app/proprietary/src/main/java/stirling/software/proprietary/policy/model/WaitState.java @@ -0,0 +1,19 @@ +package stirling.software.proprietary.policy.model; + +import java.util.List; + +/** + * Captured when a run pauses in {@link PolicyRunStatus#WAITING_FOR_INPUT}. Together with the run's + * {@link PipelineDefinition} this is the resumable snapshot: {@code resumeStepIndex} is the 0-based + * step to continue from, and {@code pendingFileIds} are the intermediate files (stored in {@code + * FileStorage}, so they survive the worker thread ending or a node restart) that become the input + * to the resumed run. + * + *

Stored as fileIds rather than in-memory resources by design: a paused run must be resumable + * long after its worker thread has gone. + */ +public record WaitState(String reason, int resumeStepIndex, List pendingFileIds) { + public WaitState { + pendingFileIds = pendingFileIds == null ? List.of() : pendingFileIds; + } +} diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/policy/output/InlineOutputSink.java b/app/proprietary/src/main/java/stirling/software/proprietary/policy/output/InlineOutputSink.java new file mode 100644 index 0000000000..bef5d46b37 --- /dev/null +++ b/app/proprietary/src/main/java/stirling/software/proprietary/policy/output/InlineOutputSink.java @@ -0,0 +1,68 @@ +package stirling.software.proprietary.policy.output; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +import org.springframework.core.io.Resource; +import org.springframework.http.MediaType; +import org.springframework.http.MediaTypeFactory; +import org.springframework.stereotype.Service; + +import lombok.RequiredArgsConstructor; + +import stirling.software.common.model.job.ResultFile; +import stirling.software.common.service.FileStorage; +import stirling.software.proprietary.policy.model.OutputSpec; + +/** + * Default output sink: stores each output file in {@code FileStorage} so it is downloadable via + * {@code GET /api/v1/general/files/{fileId}}. This is the destination for manually-triggered runs + * whose results are returned to the caller. + */ +@Service +@RequiredArgsConstructor +public class InlineOutputSink implements PolicyOutputSink { + + private static final String TYPE = "inline"; + + private final FileStorage fileStorage; + + @Override + public String type() { + return TYPE; + } + + @Override + public boolean supports(OutputSpec spec) { + return spec == null || spec.type() == null || TYPE.equals(spec.type()); + } + + @Override + public List deliver(String runId, List outputs, OutputSpec spec) + throws IOException { + List results = new ArrayList<>(); + for (int i = 0; i < outputs.size(); i++) { + Resource resource = outputs.get(i); + String name = + resource.getFilename() != null ? resource.getFilename() : "result-" + (i + 1); + String contentType = + MediaTypeFactory.getMediaType(name) + .orElse(MediaType.APPLICATION_OCTET_STREAM) + .toString(); + FileStorage.StoredFile stored; + try (InputStream is = resource.getInputStream()) { + stored = fileStorage.storeInputStream(is, name); + } + results.add( + ResultFile.builder() + .fileId(stored.fileId()) + .fileName(name) + .contentType(contentType) + .fileSize(stored.size()) + .build()); + } + return results; + } +} diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/policy/output/PolicyOutputSink.java b/app/proprietary/src/main/java/stirling/software/proprietary/policy/output/PolicyOutputSink.java new file mode 100644 index 0000000000..f6bf478e7f --- /dev/null +++ b/app/proprietary/src/main/java/stirling/software/proprietary/policy/output/PolicyOutputSink.java @@ -0,0 +1,35 @@ +package stirling.software.proprietary.policy.output; + +import java.io.IOException; +import java.util.List; + +import org.springframework.core.io.Resource; + +import stirling.software.common.model.job.ResultFile; +import stirling.software.proprietary.policy.model.OutputSpec; + +/** + * Delivers a finished run's output files to a destination, returning durable {@link ResultFile} + * descriptors (fileId + metadata) for the run record. + * + *

Implementations are Spring beans selected by {@link #supports(OutputSpec)}. New destinations + * (folder, S3) are added as new beans without changing the engine. + */ +public interface PolicyOutputSink { + + /** Stable identifier for this sink, matching {@code OutputSpec.type()} (e.g. "inline"). */ + String type(); + + /** Whether this sink can handle the given output spec. */ + boolean supports(OutputSpec spec); + + /** + * Persist/deliver the output files and return their descriptors. + * + * @param runId the run these outputs belong to + * @param outputs the final pipeline output resources + * @param spec the requested destination + */ + List deliver(String runId, List outputs, OutputSpec spec) + throws IOException; +} diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/policy/progress/PolicyProgressListener.java b/app/proprietary/src/main/java/stirling/software/proprietary/policy/progress/PolicyProgressListener.java new file mode 100644 index 0000000000..84eae284ef --- /dev/null +++ b/app/proprietary/src/main/java/stirling/software/proprietary/policy/progress/PolicyProgressListener.java @@ -0,0 +1,22 @@ +package stirling.software.proprietary.policy.progress; + +/** + * Receives live progress as a pipeline run executes. Implementations forward to an SSE stream, + * write job notes for polling, or both. Step indices are 1-based. + * + *

All methods default to no-ops so callers implement only what they surface. + */ +public interface PolicyProgressListener { + + /** A listener that ignores all progress. */ + PolicyProgressListener NOOP = new PolicyProgressListener() {}; + + /** Called immediately before step {@code stepIndex} of {@code stepCount} begins. */ + default void onStepStart(int stepIndex, int stepCount, String operation) {} + + /** Called immediately after step {@code stepIndex} of {@code stepCount} completes. */ + default void onStepComplete(int stepIndex, int stepCount, String operation) {} + + /** Called on a keep-alive tick so downstream connections can detect disconnects promptly. */ + default void onHeartbeat() {} +} diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/policy/store/InProcessPolicyStore.java b/app/proprietary/src/main/java/stirling/software/proprietary/policy/store/InProcessPolicyStore.java new file mode 100644 index 0000000000..df8fc93394 --- /dev/null +++ b/app/proprietary/src/main/java/stirling/software/proprietary/policy/store/InProcessPolicyStore.java @@ -0,0 +1,61 @@ +package stirling.software.proprietary.policy.store; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +import stirling.software.proprietary.policy.model.Policy; + +/** + * In-memory {@link PolicyStore}. Not the runtime bean - {@link JpaPolicyStore} is the durable + * store. Kept as a lightweight, dependency-free implementation for tests and for any future no- + * database mode. + */ +public class InProcessPolicyStore implements PolicyStore { + + private final Map policies = new ConcurrentHashMap<>(); + + @Override + public Policy save(Policy policy) { + String id = + policy.id() == null || policy.id().isBlank() + ? UUID.randomUUID().toString() + : policy.id(); + Policy stored = + new Policy( + id, + policy.name(), + policy.owner(), + policy.enabled(), + policy.trigger(), + policy.steps(), + policy.output()); + policies.put(id, stored); + return stored; + } + + @Override + public Optional get(String id) { + return Optional.ofNullable(policies.get(id)); + } + + @Override + public List all() { + return List.copyOf(policies.values()); + } + + @Override + public List findByTriggerType(String triggerType) { + return policies.values().stream() + .filter(Policy::enabled) + .filter(policy -> triggerType.equals(policy.trigger().type())) + .toList(); + } + + @Override + public boolean delete(String id) { + return policies.remove(id) != null; + } +} diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/policy/store/JpaPolicyStore.java b/app/proprietary/src/main/java/stirling/software/proprietary/policy/store/JpaPolicyStore.java new file mode 100644 index 0000000000..977e8c9c12 --- /dev/null +++ b/app/proprietary/src/main/java/stirling/software/proprietary/policy/store/JpaPolicyStore.java @@ -0,0 +1,83 @@ +package stirling.software.proprietary.policy.store; + +import java.util.List; +import java.util.Optional; +import java.util.UUID; + +import org.springframework.stereotype.Service; + +import lombok.RequiredArgsConstructor; + +import stirling.software.proprietary.policy.model.Policy; + +import tools.jackson.databind.ObjectMapper; + +/** + * Durable {@link PolicyStore} backed by JPA. The runtime store whenever the proprietary module runs + * (a datasource is always present). Policies are persisted as JSON via {@link PolicyEntity}; the + * scalar columns are kept in sync for querying. + */ +@Service +@RequiredArgsConstructor +public class JpaPolicyStore implements PolicyStore { + + private final PolicyRepository repository; + private final ObjectMapper objectMapper; + + @Override + public Policy save(Policy policy) { + String id = + policy.id() == null || policy.id().isBlank() + ? UUID.randomUUID().toString() + : policy.id(); + Policy stored = + new Policy( + id, + policy.name(), + policy.owner(), + policy.enabled(), + policy.trigger(), + policy.steps(), + policy.output()); + + PolicyEntity entity = new PolicyEntity(); + entity.setId(id); + entity.setName(stored.name()); + entity.setOwner(stored.owner()); + entity.setEnabled(stored.enabled()); + entity.setTriggerType(stored.trigger().type()); + entity.setPolicyJson(objectMapper.writeValueAsString(stored)); + repository.save(entity); + return stored; + } + + @Override + public Optional get(String id) { + return repository.findById(id).map(this::toPolicy); + } + + @Override + public List all() { + return repository.findAll().stream().map(this::toPolicy).toList(); + } + + @Override + public List findByTriggerType(String triggerType) { + return repository.findByTriggerTypeAndEnabledTrue(triggerType).stream() + .map(this::toPolicy) + .toList(); + } + + @Override + public boolean delete(String id) { + if (!repository.existsById(id)) { + return false; + } + repository.deleteById(id); + return true; + } + + private Policy toPolicy(PolicyEntity entity) { + return objectMapper.readValue(entity.getPolicyJson(), Policy.class); + } +} diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/policy/store/PolicyEntity.java b/app/proprietary/src/main/java/stirling/software/proprietary/policy/store/PolicyEntity.java new file mode 100644 index 0000000000..d5aad61303 --- /dev/null +++ b/app/proprietary/src/main/java/stirling/software/proprietary/policy/store/PolicyEntity.java @@ -0,0 +1,50 @@ +package stirling.software.proprietary.policy.store; + +import java.io.Serializable; + +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.Id; +import jakarta.persistence.Table; + +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +/** + * JPA row for a {@link stirling.software.proprietary.policy.model.Policy}. + * + *

The whole policy is stored as JSON in {@code policyJson} (authoritative on read, and the same + * serialization the API uses); the scalar columns are denormalized copies for querying - notably + * {@code triggerType} + {@code enabled} so background triggers can fetch their policies. Ownership + * is a plain {@code owner} string rather than a foreign key, to stay decoupled from the security + * entities; richer team scoping can be layered on later. + */ +@Entity +@Table(name = "policies") +@NoArgsConstructor +@Getter +@Setter +public class PolicyEntity implements Serializable { + + private static final long serialVersionUID = 1L; + + @Id + @Column(name = "id") + private String id; + + @Column(name = "name") + private String name; + + @Column(name = "owner") + private String owner; + + @Column(name = "enabled") + private boolean enabled; + + @Column(name = "trigger_type") + private String triggerType; + + @Column(name = "policy_json", columnDefinition = "text") + private String policyJson; +} diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/policy/store/PolicyRepository.java b/app/proprietary/src/main/java/stirling/software/proprietary/policy/store/PolicyRepository.java new file mode 100644 index 0000000000..ba6924f0f8 --- /dev/null +++ b/app/proprietary/src/main/java/stirling/software/proprietary/policy/store/PolicyRepository.java @@ -0,0 +1,13 @@ +package stirling.software.proprietary.policy.store; + +import java.util.List; + +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.stereotype.Repository; + +@Repository +public interface PolicyRepository extends JpaRepository { + + /** Enabled policies of a given trigger type, for background triggers to activate. */ + List findByTriggerTypeAndEnabledTrue(String triggerType); +} diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/policy/store/PolicyStore.java b/app/proprietary/src/main/java/stirling/software/proprietary/policy/store/PolicyStore.java new file mode 100644 index 0000000000..16b2ae8614 --- /dev/null +++ b/app/proprietary/src/main/java/stirling/software/proprietary/policy/store/PolicyStore.java @@ -0,0 +1,28 @@ +package stirling.software.proprietary.policy.store; + +import java.util.List; +import java.util.Optional; + +import stirling.software.proprietary.policy.model.Policy; + +/** + * Stores {@link Policy} definitions. The in-memory implementation backs simple deployments now; a + * durable (JPA) implementation can replace it behind this interface without touching callers. + */ +public interface PolicyStore { + + /** Create or update a policy. A blank/absent id is assigned; returns the stored policy. */ + Policy save(Policy policy); + + Optional get(String id); + + List all(); + + /** + * Enabled policies whose automatic trigger is of the given type (used by background triggers). + */ + List findByTriggerType(String triggerType); + + /** Remove a policy; returns whether it existed. */ + boolean delete(String id); +} diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/policy/trigger/ManualTrigger.java b/app/proprietary/src/main/java/stirling/software/proprietary/policy/trigger/ManualTrigger.java new file mode 100644 index 0000000000..2ae24671a8 --- /dev/null +++ b/app/proprietary/src/main/java/stirling/software/proprietary/policy/trigger/ManualTrigger.java @@ -0,0 +1,41 @@ +package stirling.software.proprietary.policy.trigger; + +import org.springframework.stereotype.Service; + +import lombok.RequiredArgsConstructor; + +import stirling.software.proprietary.policy.engine.PolicyEngine; +import stirling.software.proprietary.policy.engine.PolicyRunHandle; +import stirling.software.proprietary.policy.model.PipelineDefinition; +import stirling.software.proprietary.policy.model.Policy; +import stirling.software.proprietary.policy.model.PolicyInputs; +import stirling.software.proprietary.policy.progress.PolicyProgressListener; + +/** + * Runs policies on demand, in response to a request (the {@code PolicyController} endpoints, an AI, + * or another automation). It is the request-driven trigger: no background lifecycle, it just + * forwards to the engine. Any policy can be run manually regardless of its configured trigger type. + */ +@Service +@RequiredArgsConstructor +public class ManualTrigger implements PolicyTrigger { + + private final PolicyEngine policyEngine; + + @Override + public String type() { + return "manual"; + } + + /** Run a stored policy immediately and return its run handle. */ + public PolicyRunHandle run( + Policy policy, PolicyInputs inputs, PolicyProgressListener listener) { + return policyEngine.runPolicy(policy, inputs, listener); + } + + /** Run an ad-hoc pipeline (no stored policy), e.g. for AI or Automate one-offs. */ + public PolicyRunHandle fire( + PipelineDefinition definition, PolicyInputs inputs, PolicyProgressListener listener) { + return policyEngine.submit(definition, inputs, listener); + } +} diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/policy/trigger/PolicyTrigger.java b/app/proprietary/src/main/java/stirling/software/proprietary/policy/trigger/PolicyTrigger.java new file mode 100644 index 0000000000..c63e5a1da7 --- /dev/null +++ b/app/proprietary/src/main/java/stirling/software/proprietary/policy/trigger/PolicyTrigger.java @@ -0,0 +1,24 @@ +package stirling.software.proprietary.policy.trigger; + +/** + * Activates policies of one trigger type. A trigger owns a {@link #type()} (matching {@code + * TriggerConfig.type()}); when its condition fires it runs the relevant {@code Policy} through the + * {@code PolicyEngine}. + * + *

Background triggers (folder watcher, schedule) are driven by configuration: on {@link + * #start()} they begin watching/scheduling for the policies returned by {@code + * PolicyStore.findByTriggerType(type())}, and stop on {@link #stop()}. Request-driven triggers + * (manual) have no background lifecycle and run a policy directly in response to a call. New + * trigger kinds are new beans of this type; the engine and the {@code Policy} model do not change. + */ +public interface PolicyTrigger { + + /** Stable identifier for this trigger kind, matching {@code TriggerConfig.type()}. */ + String type(); + + /** Begin activating policies of this type (e.g. start a folder watcher). No-op for manual. */ + default void start() {} + + /** Stop activating and release any resources. */ + default void stop() {} +} diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/security/configuration/DatabaseConfig.java b/app/proprietary/src/main/java/stirling/software/proprietary/security/configuration/DatabaseConfig.java index d81bf6a360..454df5f67e 100644 --- a/app/proprietary/src/main/java/stirling/software/proprietary/security/configuration/DatabaseConfig.java +++ b/app/proprietary/src/main/java/stirling/software/proprietary/security/configuration/DatabaseConfig.java @@ -31,13 +31,15 @@ "stirling.software.proprietary.security.repository", "stirling.software.proprietary.repository", "stirling.software.proprietary.storage.repository", - "stirling.software.proprietary.workflow.repository" + "stirling.software.proprietary.workflow.repository", + "stirling.software.proprietary.policy.store" }) @EntityScan({ "stirling.software.proprietary.security.model", "stirling.software.proprietary.model", "stirling.software.proprietary.storage.model", - "stirling.software.proprietary.workflow.model" + "stirling.software.proprietary.workflow.model", + "stirling.software.proprietary.policy.store" }) public class DatabaseConfig { diff --git a/app/proprietary/src/main/java/stirling/software/proprietary/service/AiWorkflowService.java b/app/proprietary/src/main/java/stirling/software/proprietary/service/AiWorkflowService.java index 8f49694d82..0726bf1147 100644 --- a/app/proprietary/src/main/java/stirling/software/proprietary/service/AiWorkflowService.java +++ b/app/proprietary/src/main/java/stirling/software/proprietary/service/AiWorkflowService.java @@ -14,14 +14,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.io.FileSystemResource; import org.springframework.core.io.Resource; -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.MediaTypeFactory; -import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; -import org.springframework.util.LinkedMultiValueMap; -import org.springframework.util.MultiValueMap; import org.springframework.web.multipart.MultipartFile; import io.github.pixee.security.Filenames; @@ -32,14 +27,11 @@ import stirling.software.common.model.ApplicationProperties; import stirling.software.common.service.CustomPDFDocumentFactory; import stirling.software.common.service.FileStorage; -import stirling.software.common.service.InternalApiClient; import stirling.software.common.service.InternalApiTimeoutException; -import stirling.software.common.service.ToolMetadataService; import stirling.software.common.service.UserServiceInterface; import stirling.software.common.util.ExceptionUtils; import stirling.software.common.util.TempFile; import stirling.software.common.util.TempFileManager; -import stirling.software.common.util.ZipExtractionUtils; import stirling.software.proprietary.model.api.ai.AiConversationMessage; import stirling.software.proprietary.model.api.ai.AiDocumentIngestRequest; import stirling.software.proprietary.model.api.ai.AiEngineProgressDetail; @@ -53,6 +45,13 @@ import stirling.software.proprietary.model.api.ai.AiWorkflowRequest; import stirling.software.proprietary.model.api.ai.AiWorkflowResponse; import stirling.software.proprietary.model.api.ai.AiWorkflowResultFile; +import stirling.software.proprietary.policy.engine.PolicyExecutionResult; +import stirling.software.proprietary.policy.engine.PolicyExecutor; +import stirling.software.proprietary.policy.model.OutputSpec; +import stirling.software.proprietary.policy.model.PipelineDefinition; +import stirling.software.proprietary.policy.model.PipelineStep; +import stirling.software.proprietary.policy.model.PolicyInputs; +import stirling.software.proprietary.policy.progress.PolicyProgressListener; import stirling.software.proprietary.security.util.DesktopClientUtils; import stirling.software.proprietary.service.PdfContentExtractor.LoadedFile; import stirling.software.proprietary.service.PdfContentExtractor.PdfContentResult; @@ -72,12 +71,11 @@ public class AiWorkflowService { private final AiEngineClient aiEngineClient; private final PdfContentExtractor pdfContentExtractor; private final ObjectMapper objectMapper; - private final InternalApiClient internalApiClient; private final FileStorage fileStorage; - private final ToolMetadataService toolMetadataService; private final TempFileManager tempFileManager; private final FileIdStrategy fileIdStrategy; private final AiEngineEndpointResolver endpointResolver; + private final PolicyExecutor policyExecutor; private final UserServiceInterface userService; private final ApplicationProperties applicationProperties; @@ -86,24 +84,22 @@ public AiWorkflowService( AiEngineClient aiEngineClient, PdfContentExtractor pdfContentExtractor, ObjectMapper objectMapper, - InternalApiClient internalApiClient, FileStorage fileStorage, - ToolMetadataService toolMetadataService, TempFileManager tempFileManager, FileIdStrategy fileIdStrategy, AiEngineEndpointResolver endpointResolver, + PolicyExecutor policyExecutor, @Autowired(required = false) UserServiceInterface userService, ApplicationProperties applicationProperties) { this.pdfDocumentFactory = pdfDocumentFactory; this.aiEngineClient = aiEngineClient; this.pdfContentExtractor = pdfContentExtractor; this.objectMapper = objectMapper; - this.internalApiClient = internalApiClient; this.fileStorage = fileStorage; - this.toolMetadataService = toolMetadataService; this.tempFileManager = tempFileManager; this.fileIdStrategy = fileIdStrategy; this.endpointResolver = endpointResolver; + this.policyExecutor = policyExecutor; this.userService = userService; this.applicationProperties = applicationProperties; } @@ -150,16 +146,6 @@ record Pending(WorkflowTurnRequest request) implements WorkflowState {} record Terminal(AiWorkflowResponse response) implements WorkflowState {} } - /** - * Internal value-class for tool responses. {@code files} holds any result files (typically one; - * multiple for ZIP-response tools). {@code report} holds an optional structured metadata - * payload the tool chose to surface alongside (or instead of) a file. - * - *

Tools populate the report either by returning a JSON body (whole body → report) or by - * adding the {@link AiToolResponseHeaders#TOOL_REPORT} header alongside a file body. - */ - private record ToolResult(List files, JsonNode report) {} - public AiWorkflowResponse orchestrate(AiWorkflowRequest request) throws IOException { return orchestrate(request, NOOP_LISTENER); } @@ -374,7 +360,6 @@ private void ingestFile(AiFile file, MultipartFile multipartFile) throws IOExcep pages.size()); } - @SuppressWarnings("unchecked") private WorkflowState onToolCall( AiWorkflowResponse response, Map filesById, @@ -391,8 +376,14 @@ private WorkflowState onToolCall( try { List inputFiles = toResources(filesById); - listener.onProgress(AiWorkflowProgressEvent.executingTool(endpointPath, 1, 1)); - ToolResult result = executeStep(endpointPath, parameters, inputFiles); + PipelineDefinition definition = + new PipelineDefinition( + null, + List.of(new PipelineStep(endpointPath, parameters)), + OutputSpec.inline()); + PolicyExecutionResult result = + policyExecutor.execute( + definition, PolicyInputs.of(inputFiles), stepProgress(listener)); return new WorkflowState.Terminal( buildCompletedResponse( response.getRationale(), @@ -466,38 +457,32 @@ private WorkflowState runPlan( cannotContinue("AI engine returned a plan with no steps.")); } - try { - List currentFiles = toResources(filesById); - // Propagate the *last* non-null report — the terminal step defines the output. - JsonNode lastReport = null; - String lastReportTool = null; - - for (int i = 0; i < steps.size(); i++) { - Map step = steps.get(i); - String endpointPath = (String) step.get("tool"); - Map parameters = - step.containsKey("parameters") - ? (Map) step.get("parameters") - : Map.of(); - - if (endpointPath == null || endpointPath.isBlank()) { - return new WorkflowState.Terminal( - cannotContinue("Plan step " + (i + 1) + " has no tool endpoint.")); - } - - listener.onProgress( - AiWorkflowProgressEvent.executingTool(endpointPath, i + 1, steps.size())); - ToolResult stepResult = executeStep(endpointPath, parameters, currentFiles); - currentFiles = stepResult.files(); - if (stepResult.report() != null) { - lastReport = stepResult.report(); - lastReportTool = endpointPath; - } + List pipelineSteps = new ArrayList<>(); + for (int i = 0; i < steps.size(); i++) { + Map step = steps.get(i); + String endpointPath = (String) step.get("tool"); + if (endpointPath == null || endpointPath.isBlank()) { + return new WorkflowState.Terminal( + cannotContinue("Plan step " + (i + 1) + " has no tool endpoint.")); } + Map parameters = + step.containsKey("parameters") + ? (Map) step.get("parameters") + : Map.of(); + pipelineSteps.add(new PipelineStep(endpointPath, parameters)); + } + + try { + List inputFiles = toResources(filesById); + PipelineDefinition definition = + new PipelineDefinition(summary, pipelineSteps, OutputSpec.inline()); + PolicyExecutionResult result = + policyExecutor.execute( + definition, PolicyInputs.of(inputFiles), stepProgress(listener)); // Multi-turn: if the plan was emitted with resume_with set, the delegate wants // Java to re-invoke the orchestrator with any captured report as an artifact. - if (resumeWith != null && !resumeWith.isBlank() && lastReport != null) { + if (resumeWith != null && !resumeWith.isBlank() && result.report() != null) { WorkflowTurnRequest resumeRequest = new WorkflowTurnRequest(); resumeRequest.setUserMessage(previousRequest.getUserMessage()); resumeRequest.setFiles(previousRequest.getFiles()); @@ -507,14 +492,14 @@ private WorkflowState runPlan( .getArtifacts() .add( new PdfContentExtractor.ToolReportArtifact( - lastReportTool, lastReport)); + result.reportTool(), result.report())); resumeRequest.setResumeWith(resumeWith); return new WorkflowState.Pending(resumeRequest); } return new WorkflowState.Terminal( buildCompletedResponse( - summary, currentFiles, inputFileNames(filesById), lastReport)); + summary, result.files(), inputFileNames(filesById), result.report())); } catch (InternalApiTimeoutException e) { log.error("Plan step on tool {} timed out: {}", e.getEndpointPath(), e.getMessage()); return new WorkflowState.Terminal( @@ -545,123 +530,19 @@ private static String toolFailureMessage(String endpointPath, Throwable cause) { } /** - * Execute a single tool step. If the endpoint accepts multiple files, all files are sent in one - * call. Otherwise, the endpoint is called once per file. ZIP responses are unpacked so each - * inner file is treated as its own result (e.g. split outputs a ZIP of pages). - * - *

A structured {@code report} may be returned alongside (or instead of) files — see {@link - * ToolResult}. For per-file dispatch (single-input endpoints called once per input), the first - * non-null report wins. - */ - private ToolResult executeStep( - String endpointPath, Map parameters, List inputFiles) - throws IOException { - List files = new ArrayList<>(); - JsonNode report = null; - if (toolMetadataService.isMultiInput(endpointPath)) { - ToolResult r = callEndpoint(endpointPath, parameters, inputFiles); - files.addAll(r.files()); - report = r.report(); - } else { - for (Resource file : inputFiles) { - ToolResult r = callEndpoint(endpointPath, parameters, List.of(file)); - files.addAll(r.files()); - if (report == null) { - report = r.report(); - } - } - } - return new ToolResult(files, report); - } - - /** - * Call an endpoint and return its result files and optional report. - * - *

    - *
  • JSON body (Content-Type: application/json) → the entire body is the report, no files - * are returned. - *
  • File body (PDF etc.) → the file is returned; if an {@link - * AiToolResponseHeaders#TOOL_REPORT} header is present, its (minified JSON) value is - * parsed as the report. - *
  • ZIP responses declared by the tool metadata service are unpacked so callers always see - * a flat list of result files. - *
- */ - private ToolResult callEndpoint( - String endpointPath, Map parameters, List files) - throws IOException { - MultiValueMap body = new LinkedMultiValueMap<>(); - for (Resource file : files) { - body.add("fileInput", file); - } - for (Map.Entry entry : parameters.entrySet()) { - if (entry.getValue() instanceof List list) { - if (containsStructuredElements(list)) { - // Endpoints binding lists of structured objects (e.g. /security/redact's - // redactions, /general/edit-text's edits) parse a single JSON string field via - // a property editor. Pre-serialize the whole list so binding succeeds. - body.add(entry.getKey(), objectMapper.writeValueAsString(list)); - } else { - for (Object item : list) { - body.add(entry.getKey(), item); - } - } - } else { - body.add(entry.getKey(), entry.getValue()); - } - } - ResponseEntity response = internalApiClient.post(endpointPath, body); - if (!HttpStatus.OK.equals(response.getStatusCode()) || response.getBody() == null) { - throw new IOException( - "Tool returned HTTP " + response.getStatusCode() + " for " + endpointPath); - } - Resource resource = response.getBody(); - HttpHeaders headers = response.getHeaders(); - MediaType contentType = headers.getContentType(); - - // JSON-only response — the whole body is the structured report, no result file. - if (contentType != null && MediaType.APPLICATION_JSON.isCompatibleWith(contentType)) { - try (java.io.InputStream is = resource.getInputStream()) { - JsonNode report = objectMapper.readTree(is); - return new ToolResult(List.of(), report); - } - } - - JsonNode report = parseReportHeader(headers, endpointPath); - if (toolMetadataService.shouldUnpackZipResponse(endpointPath)) { - return new ToolResult(ZipExtractionUtils.extractZip(resource, tempFileManager), report); - } - return new ToolResult(List.of(resource), report); - } - - /** - * Parse the optional {@link AiToolResponseHeaders#TOOL_REPORT} header into a {@link JsonNode}, - * or return null. + * Adapt the AI workflow's {@link ProgressListener} to the engine's {@link + * PolicyProgressListener}: each step start maps to an {@code EXECUTING_TOOL} progress event + * carrying the tool path and 1-based step position, preserving the event shape the frontend + * already renders. */ - private JsonNode parseReportHeader(HttpHeaders headers, String endpointPath) { - String raw = headers.getFirst(AiToolResponseHeaders.TOOL_REPORT); - if (raw == null || raw.isBlank()) { - return null; - } - try { - return objectMapper.readTree(raw); - } catch (JacksonException e) { - log.warn( - "Ignoring malformed {} header from {}: {}", - AiToolResponseHeaders.TOOL_REPORT, - endpointPath, - e.getMessage()); - return null; - } - } - - private static boolean containsStructuredElements(List list) { - for (Object item : list) { - if (item instanceof Map || item instanceof List) { - return true; + private static PolicyProgressListener stepProgress(ProgressListener listener) { + return new PolicyProgressListener() { + @Override + public void onStepStart(int stepIndex, int stepCount, String operation) { + listener.onProgress( + AiWorkflowProgressEvent.executingTool(operation, stepIndex, stepCount)); } - } - return false; + }; } private List toResources(Map filesById) throws IOException { diff --git a/app/proprietary/src/test/java/stirling/software/proprietary/policy/engine/PolicyEngineTest.java b/app/proprietary/src/test/java/stirling/software/proprietary/policy/engine/PolicyEngineTest.java new file mode 100644 index 0000000000..bf6442b423 --- /dev/null +++ b/app/proprietary/src/test/java/stirling/software/proprietary/policy/engine/PolicyEngineTest.java @@ -0,0 +1,255 @@ +package stirling.software.proprietary.policy.engine; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.InputStream; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.core.io.ByteArrayResource; +import org.springframework.core.io.Resource; +import org.springframework.http.ResponseEntity; + +import stirling.software.common.model.ApplicationProperties; +import stirling.software.common.service.FileStorage; +import stirling.software.common.service.FileStorage.StoredFile; +import stirling.software.common.service.InternalApiClient; +import stirling.software.common.service.JobOwnershipService; +import stirling.software.common.service.JobQueue; +import stirling.software.common.service.ResourceMonitor; +import stirling.software.common.service.TaskManager; +import stirling.software.common.service.ToolMetadataService; +import stirling.software.common.util.TempFileManager; +import stirling.software.common.util.TempFileRegistry; +import stirling.software.proprietary.policy.model.OutputSpec; +import stirling.software.proprietary.policy.model.PipelineDefinition; +import stirling.software.proprietary.policy.model.PipelineStep; +import stirling.software.proprietary.policy.model.Policy; +import stirling.software.proprietary.policy.model.PolicyInputs; +import stirling.software.proprietary.policy.model.PolicyRun; +import stirling.software.proprietary.policy.model.PolicyRunStatus; +import stirling.software.proprietary.policy.model.TriggerConfig; +import stirling.software.proprietary.policy.output.InlineOutputSink; +import stirling.software.proprietary.policy.progress.PolicyProgressListener; + +import tools.jackson.databind.json.JsonMapper; + +/** + * Tests for {@link PolicyEngine}: async submission runs the pipeline on a virtual thread, registers + * outputs and progress with {@link TaskManager}, and surfaces terminal state via {@link + * PolicyRunRegistry}. The step executor and inline sink are real (with mocked collaborators) so the + * full run path is exercised. + */ +@ExtendWith(MockitoExtension.class) +class PolicyEngineTest { + + private static final String ROTATE = "/api/v1/general/rotate-pdf"; + private static final String COMPRESS = "/api/v1/misc/compress-pdf"; + + @Mock private InternalApiClient internalApiClient; + @Mock private ToolMetadataService toolMetadataService; + @Mock private TaskManager taskManager; + @Mock private FileStorage fileStorage; + @Mock private JobOwnershipService jobOwnershipService; + @Mock private ResourceMonitor resourceMonitor; + @Mock private JobQueue jobQueue; + + @TempDir Path tempDir; + + private PolicyRunRegistry registry; + private PolicyEngine engine; + + @BeforeEach + void setUp() { + ApplicationProperties props = new ApplicationProperties(); + props.getSystem().getTempFileManagement().setBaseTmpDir(tempDir.toString()); + props.getSystem().getTempFileManagement().setPrefix("policy-engine-test-"); + TempFileManager tempFileManager = new TempFileManager(new TempFileRegistry(), props); + PolicyExecutor executor = + new PolicyExecutor( + internalApiClient, + toolMetadataService, + tempFileManager, + JsonMapper.builder().build()); + registry = new PolicyRunRegistry(30); + InlineOutputSink sink = new InlineOutputSink(fileStorage); + engine = + new PolicyEngine( + executor, + taskManager, + registry, + fileStorage, + jobOwnershipService, + List.of(sink), + resourceMonitor, + jobQueue); + + // Identity scoping: the run id is the generated UUID unchanged. Lenient because the + // resume/cancel tests do not submit a run. + lenient() + .when(jobOwnershipService.createScopedJobKey(anyString())) + .thenAnswer(inv -> inv.getArgument(0)); + // Default to running immediately; the queueing test overrides this. + lenient().when(resourceMonitor.shouldQueueJob(anyInt())).thenReturn(false); + } + + @Test + void submitRunsPipelineToCompletionAndRegistersOutputs() throws Exception { + when(toolMetadataService.isMultiInput(anyString())).thenReturn(false); + when(toolMetadataService.shouldUnpackZipResponse(anyString())).thenReturn(false); + stubEndpoint(ROTATE, pdf("rotated", "rotated.pdf")); + stubEndpoint(COMPRESS, pdf("compressed", "compressed.pdf")); + int[] counter = {0}; + when(fileStorage.storeInputStream(any(InputStream.class), anyString())) + .thenAnswer( + inv -> { + InputStream is = inv.getArgument(0); + long size = is.readAllBytes().length; + return new StoredFile("file-" + ++counter[0], size); + }); + + PolicyRunHandle handle = + engine.submit( + definition( + new PipelineStep(ROTATE, Map.of()), + new PipelineStep(COMPRESS, Map.of())), + PolicyInputs.of(List.of(pdf("input", "input.pdf"))), + PolicyProgressListener.NOOP); + + // The completion future resolves with the final run state, no polling needed. + String runId = handle.runId(); + PolicyRun run = handle.completion().get(10, TimeUnit.SECONDS); + assertEquals(PolicyRunStatus.COMPLETED, run.getStatus()); + assertEquals(1, run.getOutputs().size()); + assertEquals("compressed.pdf", run.getOutputs().get(0).getFileName()); + + // The run self-registers its results and completion with the job system. + verify(taskManager).createTask(runId); + verify(taskManager).setMultipleFileResults(eq(runId), any()); + verify(taskManager).setComplete(runId); + // Progress notes were written for each step. + verify(taskManager, atLeastOnce()).addNote(eq(runId), anyString()); + } + + @Test + void submitFailsRunWhenAToolErrors() throws Exception { + when(toolMetadataService.isMultiInput(ROTATE)).thenReturn(false); + when(internalApiClient.post(eq(ROTATE), any())).thenThrow(new RuntimeException("boom")); + + PolicyRunHandle handle = + engine.submit( + definition(new PipelineStep(ROTATE, Map.of())), + PolicyInputs.of(List.of(pdf("input", "input.pdf"))), + PolicyProgressListener.NOOP); + + String runId = handle.runId(); + PolicyRun run = handle.completion().get(10, TimeUnit.SECONDS); + assertEquals(PolicyRunStatus.FAILED, run.getStatus()); + verify(taskManager).setError(eq(runId), anyString()); + verify(taskManager, never()).setComplete(runId); + } + + @Test + void runPolicyExecutesThePolicysPipeline() throws Exception { + when(toolMetadataService.isMultiInput(anyString())).thenReturn(false); + when(toolMetadataService.shouldUnpackZipResponse(anyString())).thenReturn(false); + stubEndpoint(ROTATE, pdf("rotated", "rotated.pdf")); + int[] counter = {0}; + when(fileStorage.storeInputStream(any(InputStream.class), anyString())) + .thenAnswer( + inv -> { + InputStream is = inv.getArgument(0); + return new StoredFile("file-" + ++counter[0], is.readAllBytes().length); + }); + + Policy policy = + new Policy( + "p1", + "rotate", + "owner", + true, + TriggerConfig.manual(), + List.of(new PipelineStep(ROTATE, Map.of())), + OutputSpec.inline()); + + PolicyRunHandle handle = + engine.runPolicy( + policy, + PolicyInputs.of(List.of(pdf("input", "input.pdf"))), + PolicyProgressListener.NOOP); + + PolicyRun run = handle.completion().get(10, TimeUnit.SECONDS); + assertEquals(PolicyRunStatus.COMPLETED, run.getStatus()); + verify(internalApiClient).post(eq(ROTATE), any()); + } + + @Test + void runIsQueuedUnderResourcePressure() { + when(resourceMonitor.shouldQueueJob(anyInt())).thenReturn(true); + // Returning an already-completed future keeps the run parked: the queued work (which would + // start the run) is never executed by this mock, so it stays PENDING. + doReturn(CompletableFuture.completedFuture(null)) + .when(jobQueue) + .queueJob(anyString(), anyInt(), any(), anyLong()); + + PolicyRunHandle handle = + engine.submit( + definition(new PipelineStep(ROTATE, Map.of())), + PolicyInputs.of(List.of(pdf("input", "input.pdf"))), + PolicyProgressListener.NOOP); + + verify(jobQueue).queueJob(eq(handle.runId()), anyInt(), any(), anyLong()); + assertEquals(PolicyRunStatus.PENDING, registry.get(handle.runId()).getStatus()); + } + + @Test + void resumeIsNotYetImplemented() { + assertThrows(UnsupportedOperationException.class, () -> engine.resume("any", List.of())); + } + + @Test + void cancelUnknownRunReturnsFalse() { + assertFalse(engine.cancel("does-not-exist")); + } + + // --- helpers --- + + private static PipelineDefinition definition(PipelineStep... steps) { + return new PipelineDefinition("test", List.of(steps), OutputSpec.inline()); + } + + private void stubEndpoint(String endpoint, Resource body) { + when(internalApiClient.post(eq(endpoint), any())).thenReturn(ResponseEntity.ok(body)); + } + + private static ByteArrayResource pdf(String content, String filename) { + return new ByteArrayResource(content.getBytes()) { + @Override + public String getFilename() { + return filename; + } + }; + } +} diff --git a/app/proprietary/src/test/java/stirling/software/proprietary/policy/engine/PolicyExecutorTest.java b/app/proprietary/src/test/java/stirling/software/proprietary/policy/engine/PolicyExecutorTest.java new file mode 100644 index 0000000000..d682c35cd7 --- /dev/null +++ b/app/proprietary/src/test/java/stirling/software/proprietary/policy/engine/PolicyExecutorTest.java @@ -0,0 +1,378 @@ +package stirling.software.proprietary.policy.engine; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.core.io.ByteArrayResource; +import org.springframework.core.io.Resource; +import org.springframework.http.ResponseEntity; +import org.springframework.util.MultiValueMap; + +import stirling.software.common.model.ApplicationProperties; +import stirling.software.common.service.InternalApiClient; +import stirling.software.common.service.InternalApiTimeoutException; +import stirling.software.common.service.ToolMetadataService; +import stirling.software.common.util.TempFileManager; +import stirling.software.common.util.TempFileRegistry; +import stirling.software.proprietary.policy.model.OutputSpec; +import stirling.software.proprietary.policy.model.PipelineDefinition; +import stirling.software.proprietary.policy.model.PipelineStep; +import stirling.software.proprietary.policy.model.PolicyInputs; +import stirling.software.proprietary.policy.progress.PolicyProgressListener; + +import tools.jackson.databind.ObjectMapper; +import tools.jackson.databind.json.JsonMapper; + +/** + * Unit tests for {@link PolicyExecutor}, the shared pipeline step loop. Covers file chaining across + * steps, multi-input vs per-file dispatch, ZIP unpacking, structured-list parameter encoding, + * progress callbacks, and timeout propagation. External collaborators are mocked; {@link + * TempFileManager} is real so ZIP extraction exercises real code. + */ +@ExtendWith(MockitoExtension.class) +class PolicyExecutorTest { + + private static final String ROTATE = "/api/v1/general/rotate-pdf"; + private static final String COMPRESS = "/api/v1/misc/compress-pdf"; + private static final String SPLIT = "/api/v1/general/split-pages"; + private static final String MERGE = "/api/v1/general/merge-pdfs"; + + @Mock private InternalApiClient internalApiClient; + @Mock private ToolMetadataService toolMetadataService; + + @TempDir Path tempDir; + + private TempFileManager tempFileManager; + private PolicyExecutor executor; + + @BeforeEach + void setUp() { + ApplicationProperties props = new ApplicationProperties(); + props.getSystem().getTempFileManagement().setBaseTmpDir(tempDir.toString()); + props.getSystem().getTempFileManagement().setPrefix("policy-test-"); + tempFileManager = new TempFileManager(new TempFileRegistry(), props); + ObjectMapper objectMapper = JsonMapper.builder().build(); + executor = + new PolicyExecutor( + internalApiClient, toolMetadataService, tempFileManager, objectMapper); + } + + @Test + void executesStepsSequentiallyChainingOutputToInput() throws IOException { + when(toolMetadataService.isMultiInput(anyString())).thenReturn(false); + when(toolMetadataService.shouldUnpackZipResponse(anyString())).thenReturn(false); + stubEndpoint(ROTATE, pdf("rotated", "rotated.pdf")); + stubEndpoint(COMPRESS, pdf("compressed", "compressed.pdf")); + + List steps = new ArrayList<>(); + PolicyProgressListener listener = + new PolicyProgressListener() { + @Override + public void onStepStart(int stepIndex, int stepCount, String operation) { + steps.add(stepIndex); + } + }; + + PolicyExecutionResult result = + executor.execute( + definition( + new PipelineStep(ROTATE, Map.of()), + new PipelineStep(COMPRESS, Map.of())), + PolicyInputs.of(List.of(pdf("input", "input.pdf"))), + listener); + + assertEquals(1, result.files().size()); + assertEquals("compressed.pdf", result.files().get(0).getFilename()); + verify(internalApiClient, times(1)).post(eq(ROTATE), any()); + verify(internalApiClient, times(1)).post(eq(COMPRESS), any()); + // Progress fired once per step, in order. + assertEquals(List.of(1, 2), steps); + } + + @Test + void multiInputEndpointIsCalledOnceWithAllFiles() throws IOException { + when(toolMetadataService.isMultiInput(MERGE)).thenReturn(true); + when(toolMetadataService.shouldUnpackZipResponse(MERGE)).thenReturn(false); + stubEndpoint(MERGE, pdf("merged", "merged.pdf")); + + PolicyExecutionResult result = + executor.execute( + definition(new PipelineStep(MERGE, Map.of())), + PolicyInputs.of(List.of(pdf("a", "a.pdf"), pdf("b", "b.pdf"))), + PolicyProgressListener.NOOP); + + assertEquals(1, result.files().size()); + verify(internalApiClient, times(1)).post(eq(MERGE), any()); + } + + @Test + void singleInputEndpointIsCalledOncePerFile() throws IOException { + when(toolMetadataService.isMultiInput(ROTATE)).thenReturn(false); + when(toolMetadataService.shouldUnpackZipResponse(ROTATE)).thenReturn(false); + stubEndpoint(ROTATE, pdf("rotated", "rotated.pdf")); + + PolicyExecutionResult result = + executor.execute( + definition(new PipelineStep(ROTATE, Map.of())), + PolicyInputs.of(List.of(pdf("a", "a.pdf"), pdf("b", "b.pdf"))), + PolicyProgressListener.NOOP); + + assertEquals(2, result.files().size()); + verify(internalApiClient, times(2)).post(eq(ROTATE), any()); + } + + @Test + void zipResponseIsUnpackedIntoIndividualFiles() throws IOException { + when(toolMetadataService.isMultiInput(SPLIT)).thenReturn(false); + when(toolMetadataService.shouldUnpackZipResponse(SPLIT)).thenReturn(true); + stubEndpoint( + SPLIT, + zip( + "doc.zip", + List.of(new Entry("page-1.pdf", "one"), new Entry("page-2.pdf", "two")))); + + PolicyExecutionResult result = + executor.execute( + definition(new PipelineStep(SPLIT, Map.of())), + PolicyInputs.of(List.of(pdf("doc", "doc.pdf"))), + PolicyProgressListener.NOOP); + + assertEquals(2, result.files().size()); + assertEquals("page-1.pdf", result.files().get(0).getFilename()); + assertEquals("page-2.pdf", result.files().get(1).getFilename()); + } + + @Test + void structuredListParameterIsJsonEncodedAsSingleField() throws IOException { + String editText = "/api/v1/general/edit-text"; + when(toolMetadataService.isMultiInput(editText)).thenReturn(false); + when(toolMetadataService.shouldUnpackZipResponse(editText)).thenReturn(false); + stubEndpoint(editText, pdf("edited", "edited.pdf")); + + // LinkedHashMap so the serialized key order is deterministic for the assertion below. + Map edit = new LinkedHashMap<>(); + edit.put("find", "foo"); + edit.put("replace", "bar"); + Map params = new LinkedHashMap<>(); + params.put("edits", List.of(edit)); + params.put("useRegex", false); + + executor.execute( + definition(new PipelineStep(editText, params)), + PolicyInputs.of(List.of(pdf("in", "in.pdf"))), + PolicyProgressListener.NOOP); + + @SuppressWarnings("unchecked") + ArgumentCaptor> bodyCaptor = + ArgumentCaptor.forClass(MultiValueMap.class); + verify(internalApiClient).post(eq(editText), bodyCaptor.capture()); + MultiValueMap body = bodyCaptor.getValue(); + + List edits = body.get("edits"); + assertNotNull(edits); + assertEquals(1, edits.size()); + assertEquals("[{\"find\":\"foo\",\"replace\":\"bar\"}]", edits.get(0)); + } + + @Test + void supportingFilesAreBoundToTheirNamedFields() throws IOException { + String addStamp = "/api/v1/misc/add-stamp-to-pdf"; + when(toolMetadataService.isMultiInput(addStamp)).thenReturn(false); + when(toolMetadataService.shouldUnpackZipResponse(addStamp)).thenReturn(false); + stubEndpoint(addStamp, pdf("stamped", "stamped.pdf")); + + PipelineStep step = + new PipelineStep(addStamp, Map.of("opacity", 0.5), Map.of("stampImage", "logo")); + PolicyInputs inputs = + new PolicyInputs( + List.of(pdf("doc", "doc.pdf")), + Map.of("logo", List.of(pdf("logo-bytes", "logo.png")))); + + executor.execute( + new PipelineDefinition("stamp", List.of(step), OutputSpec.inline()), + inputs, + PolicyProgressListener.NOOP); + + @SuppressWarnings("unchecked") + ArgumentCaptor> bodyCaptor = + ArgumentCaptor.forClass(MultiValueMap.class); + verify(internalApiClient).post(eq(addStamp), bodyCaptor.capture()); + MultiValueMap body = bodyCaptor.getValue(); + // The document goes to fileInput; the supporting image is bound to its named field and is + // not part of the document stream. + assertEquals(1, body.get("fileInput").size()); + assertNotNull(body.get("stampImage")); + assertEquals(1, body.get("stampImage").size()); + } + + @Test + void missingSupportingFileFailsTheStep() { + String addStamp = "/api/v1/misc/add-stamp-to-pdf"; + when(toolMetadataService.isMultiInput(addStamp)).thenReturn(false); + PipelineStep step = new PipelineStep(addStamp, Map.of(), Map.of("stampImage", "logo")); + + IOException ex = + assertThrows( + IOException.class, + () -> + executor.execute( + new PipelineDefinition( + "stamp", List.of(step), OutputSpec.inline()), + PolicyInputs.of(List.of(pdf("doc", "doc.pdf"))), + PolicyProgressListener.NOOP)); + assertTrue(ex.getMessage().contains("logo")); + } + + @Test + void documentOfAnUnacceptedTypeFailsTheStep() { + String compress = "/api/v1/misc/compress-pdf"; + when(toolMetadataService.getExtensionTypes(false, compress)).thenReturn(List.of("pdf")); + + IOException ex = + assertThrows( + IOException.class, + () -> + executor.execute( + definition(new PipelineStep(compress, Map.of())), + PolicyInputs.of(List.of(pdf("img", "image.png"))), + PolicyProgressListener.NOOP)); + assertTrue(ex.getMessage().contains("image.png")); + // Type check happens before any dispatch. + verify(internalApiClient, never()).post(anyString(), any()); + } + + @Test + void documentOfAnAcceptedTypeProceeds() throws IOException { + String compress = "/api/v1/misc/compress-pdf"; + when(toolMetadataService.getExtensionTypes(false, compress)).thenReturn(List.of("pdf")); + when(toolMetadataService.isMultiInput(compress)).thenReturn(false); + when(toolMetadataService.shouldUnpackZipResponse(compress)).thenReturn(false); + stubEndpoint(compress, pdf("compressed", "compressed.pdf")); + + PolicyExecutionResult result = + executor.execute( + definition(new PipelineStep(compress, Map.of())), + PolicyInputs.of(List.of(pdf("doc", "doc.pdf"))), + PolicyProgressListener.NOOP); + + assertEquals(1, result.files().size()); + verify(internalApiClient, times(1)).post(eq(compress), any()); + } + + @Test + void filterOperationWithEmptyResultDropsTheFile() throws IOException { + String filter = "/api/v1/filter/filter-page-count"; + when(toolMetadataService.isMultiInput(filter)).thenReturn(false); + stubEndpoint(filter, pdf("", "filtered.pdf")); // empty body => filtered out + + PolicyExecutionResult result = + executor.execute( + definition(new PipelineStep(filter, Map.of())), + PolicyInputs.of(List.of(pdf("doc", "doc.pdf"))), + PolicyProgressListener.NOOP); + + assertEquals(0, result.files().size()); + } + + @Test + void timeoutFromAStepPropagates() { + when(toolMetadataService.isMultiInput(ROTATE)).thenReturn(false); + when(internalApiClient.post(eq(ROTATE), any())) + .thenThrow( + new InternalApiTimeoutException( + ROTATE, + java.time.Duration.ofSeconds(300), + new IOException("Read timed out"))); + + assertThrows( + InternalApiTimeoutException.class, + () -> + executor.execute( + definition(new PipelineStep(ROTATE, Map.of())), + PolicyInputs.of(List.of(pdf("in", "in.pdf"))), + PolicyProgressListener.NOOP)); + } + + @Test + void emptyPipelineIsRejected() { + assertThrows( + IllegalArgumentException.class, + () -> + executor.execute( + new PipelineDefinition("empty", List.of(), OutputSpec.inline()), + PolicyInputs.of(List.of(pdf("in", "in.pdf"))), + PolicyProgressListener.NOOP)); + } + + // --- helpers --- + + private static PipelineDefinition definition(PipelineStep... steps) { + return new PipelineDefinition("test", List.of(steps), OutputSpec.inline()); + } + + private void stubEndpoint(String endpoint, Resource body) { + when(internalApiClient.post(eq(endpoint), any())).thenReturn(ResponseEntity.ok(body)); + } + + private static ByteArrayResource pdf(String content, String filename) { + return new ByteArrayResource(content.getBytes()) { + @Override + public String getFilename() { + return filename; + } + }; + } + + private static ByteArrayResource zip(String filename, List entries) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ZipOutputStream zos = new ZipOutputStream(baos)) { + for (Entry entry : entries) { + zos.putNextEntry(new ZipEntry(entry.name())); + zos.write(entry.content().getBytes()); + zos.closeEntry(); + } + } + byte[] zipBytes = baos.toByteArray(); + return new ByteArrayResource(zipBytes) { + @Override + public String getFilename() { + return filename; + } + + @Override + public InputStream getInputStream() { + return new ByteArrayInputStream(zipBytes); + } + }; + } + + private record Entry(String name, String content) {} +} diff --git a/app/proprietary/src/test/java/stirling/software/proprietary/policy/engine/PolicyRunRegistryTest.java b/app/proprietary/src/test/java/stirling/software/proprietary/policy/engine/PolicyRunRegistryTest.java new file mode 100644 index 0000000000..823d53edcb --- /dev/null +++ b/app/proprietary/src/test/java/stirling/software/proprietary/policy/engine/PolicyRunRegistryTest.java @@ -0,0 +1,101 @@ +package stirling.software.proprietary.policy.engine; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Instant; +import java.util.List; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import stirling.software.proprietary.policy.model.PipelineDefinition; +import stirling.software.proprietary.policy.model.PolicyRun; +import stirling.software.proprietary.policy.model.WaitState; + +/** + * Tests for {@link PolicyRunRegistry} eviction: terminal runs expire, active/paused runs persist. + */ +class PolicyRunRegistryTest { + + private PolicyRunRegistry registry; + + @BeforeEach + void setUp() { + registry = new PolicyRunRegistry(30); + } + + @AfterEach + void tearDown() { + registry.shutdown(); + } + + @Test + void evictsTerminalRunsPastTheCutoff() { + PolicyRun completed = register("completed"); + completed.complete(List.of()); + PolicyRun failed = register("failed"); + failed.fail("boom"); + PolicyRun cancelled = register("cancelled"); + cancelled.cancel(); + + // A cutoff in the future means every terminal run finished "before" it. + int removed = registry.evictExpired(Instant.now().plusSeconds(60)); + + assertEquals(3, removed); + assertNull(registry.get("completed")); + assertNull(registry.get("failed")); + assertNull(registry.get("cancelled")); + } + + @Test + void retainsActiveAndPausedRunsRegardlessOfAge() { + register("pending"); // PENDING: never started + PolicyRun running = register("running"); + running.markRunning(); + PolicyRun waiting = register("waiting"); + waiting.waitForInput(new WaitState("needs a signature", 1, List.of())); + + int removed = registry.evictExpired(Instant.now().plusSeconds(60)); + + assertEquals(0, removed); + assertNotNull(registry.get("pending")); + assertNotNull(registry.get("running")); + assertNotNull(registry.get("waiting")); + } + + @Test + void keepsTerminalRunsStillWithinTheExpiryWindow() { + PolicyRun completed = register("recent"); + completed.complete(List.of()); + + // A cutoff in the past means the run was updated "after" it: too young to evict. + int removed = registry.evictExpired(Instant.now().minusSeconds(60)); + + assertEquals(0, removed); + assertNotNull(registry.get("recent")); + } + + @Test + void evictionLeavesUnrelatedRunsInPlace() { + PolicyRun completed = register("done"); + completed.complete(List.of()); + PolicyRun running = register("busy"); + running.markRunning(); + + registry.evictExpired(Instant.now().plusSeconds(60)); + + assertNull(registry.get("done")); + assertNotNull(registry.get("busy")); + assertTrue(registry.all().stream().anyMatch(r -> r.getRunId().equals("busy"))); + } + + private PolicyRun register(String runId) { + PolicyRun run = new PolicyRun(runId, new PipelineDefinition(runId, List.of(), null)); + registry.register(run); + return run; + } +} diff --git a/app/proprietary/src/test/java/stirling/software/proprietary/policy/store/InProcessPolicyStoreTest.java b/app/proprietary/src/test/java/stirling/software/proprietary/policy/store/InProcessPolicyStoreTest.java new file mode 100644 index 0000000000..2e97019182 --- /dev/null +++ b/app/proprietary/src/test/java/stirling/software/proprietary/policy/store/InProcessPolicyStoreTest.java @@ -0,0 +1,87 @@ +package stirling.software.proprietary.policy.store; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.Map; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import stirling.software.proprietary.policy.model.OutputSpec; +import stirling.software.proprietary.policy.model.PipelineStep; +import stirling.software.proprietary.policy.model.Policy; +import stirling.software.proprietary.policy.model.TriggerConfig; + +/** Tests for {@link InProcessPolicyStore}: id assignment, upsert, trigger-type lookup, delete. */ +class InProcessPolicyStoreTest { + + private PolicyStore store; + + @BeforeEach + void setUp() { + store = new InProcessPolicyStore(); + } + + @Test + void savedPolicyGetsAnIdAndIsRetrievable() { + Policy saved = store.save(policy(null, "compress", "manual", true)); + + assertNotNull(saved.id()); + assertFalse(saved.id().isBlank()); + assertEquals(saved, store.get(saved.id()).orElseThrow()); + } + + @Test + void savingWithAnExistingIdUpdatesInPlace() { + Policy created = store.save(policy(null, "before", "manual", true)); + + store.save( + new Policy( + created.id(), + "after", + "owner", + true, + TriggerConfig.manual(), + List.of(), + OutputSpec.inline())); + + assertEquals(1, store.all().size()); + assertEquals("after", store.get(created.id()).orElseThrow().name()); + } + + @Test + void findByTriggerTypeReturnsOnlyEnabledMatches() { + store.save(policy(null, "watch", "folder", true)); + store.save(policy(null, "watch-disabled", "folder", false)); + store.save(policy(null, "nightly", "schedule", true)); + + List folder = store.findByTriggerType("folder"); + + assertEquals(1, folder.size()); + assertEquals("watch", folder.get(0).name()); + } + + @Test + void deleteRemovesThePolicy() { + Policy saved = store.save(policy(null, "p", "manual", true)); + + assertTrue(store.delete(saved.id())); + assertTrue(store.get(saved.id()).isEmpty()); + assertFalse(store.delete(saved.id())); + } + + private static Policy policy(String id, String name, String triggerType, boolean enabled) { + return new Policy( + id, + name, + "owner", + enabled, + new TriggerConfig(triggerType, Map.of()), + List.of(new PipelineStep("/api/v1/misc/compress-pdf", Map.of())), + OutputSpec.inline()); + } +} diff --git a/app/proprietary/src/test/java/stirling/software/proprietary/policy/store/JpaPolicyStoreTest.java b/app/proprietary/src/test/java/stirling/software/proprietary/policy/store/JpaPolicyStoreTest.java new file mode 100644 index 0000000000..2a8212e2d5 --- /dev/null +++ b/app/proprietary/src/test/java/stirling/software/proprietary/policy/store/JpaPolicyStoreTest.java @@ -0,0 +1,129 @@ +package stirling.software.proprietary.policy.store; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import stirling.software.proprietary.policy.model.OutputSpec; +import stirling.software.proprietary.policy.model.PipelineStep; +import stirling.software.proprietary.policy.model.Policy; +import stirling.software.proprietary.policy.model.TriggerConfig; + +import tools.jackson.databind.ObjectMapper; +import tools.jackson.databind.json.JsonMapper; + +/** + * Tests for {@link JpaPolicyStore}'s entity mapping and query delegation. The repository is mocked; + * real Hibernate/H2 persistence is exercised at application boot (this module's convention is + * Mockito unit tests for store/service logic). + */ +@ExtendWith(MockitoExtension.class) +class JpaPolicyStoreTest { + + @Mock private PolicyRepository repository; + + private final ObjectMapper objectMapper = JsonMapper.builder().build(); + private JpaPolicyStore store; + + @BeforeEach + void setUp() { + store = new JpaPolicyStore(repository, objectMapper); + } + + @Test + void saveAssignsAnIdAndPersistsThePolicyAsJson() { + Policy saved = + store.save( + new Policy( + null, + "compress incoming", + "alice", + true, + new TriggerConfig("folder", Map.of("path", "/in")), + List.of(new PipelineStep("/api/v1/misc/compress-pdf", Map.of())), + OutputSpec.inline())); + + assertNotNull(saved.id()); + ArgumentCaptor captor = ArgumentCaptor.forClass(PolicyEntity.class); + verify(repository).save(captor.capture()); + PolicyEntity entity = captor.getValue(); + assertEquals(saved.id(), entity.getId()); + assertEquals("folder", entity.getTriggerType()); + assertTrue(entity.isEnabled()); + // The stored JSON round-trips back to an equal policy. + assertEquals(saved, objectMapper.readValue(entity.getPolicyJson(), Policy.class)); + } + + @Test + void getDeserializesThePolicyFromJson() { + Policy policy = + new Policy( + "p1", + "rotate", + "alice", + true, + TriggerConfig.manual(), + List.of( + new PipelineStep( + "/api/v1/general/rotate-pdf", Map.of("angle", 90))), + OutputSpec.inline()); + when(repository.findById("p1")).thenReturn(Optional.of(entityFor(policy))); + + assertEquals(policy, store.get("p1").orElseThrow()); + } + + @Test + void findByTriggerTypeUsesTheEnabledQuery() { + Policy policy = + new Policy( + "p1", + "watch", + "alice", + true, + new TriggerConfig("folder", Map.of()), + List.of(new PipelineStep("/api/v1/misc/compress-pdf", Map.of())), + OutputSpec.inline()); + when(repository.findByTriggerTypeAndEnabledTrue("folder")) + .thenReturn(List.of(entityFor(policy))); + + List folder = store.findByTriggerType("folder"); + + assertEquals(1, folder.size()); + assertEquals("p1", folder.get(0).id()); + } + + @Test + void deleteReturnsWhetherThePolicyExisted() { + when(repository.existsById("p1")).thenReturn(true); + assertTrue(store.delete("p1")); + verify(repository).deleteById("p1"); + + when(repository.existsById("missing")).thenReturn(false); + assertFalse(store.delete("missing")); + } + + private PolicyEntity entityFor(Policy policy) { + PolicyEntity entity = new PolicyEntity(); + entity.setId(policy.id()); + entity.setName(policy.name()); + entity.setOwner(policy.owner()); + entity.setEnabled(policy.enabled()); + entity.setTriggerType(policy.trigger().type()); + entity.setPolicyJson(objectMapper.writeValueAsString(policy)); + return entity; + } +} diff --git a/app/proprietary/src/test/java/stirling/software/proprietary/service/AiWorkflowServiceTest.java b/app/proprietary/src/test/java/stirling/software/proprietary/service/AiWorkflowServiceTest.java index 5b73ea5cc6..81d1ccdfa4 100644 --- a/app/proprietary/src/test/java/stirling/software/proprietary/service/AiWorkflowServiceTest.java +++ b/app/proprietary/src/test/java/stirling/software/proprietary/service/AiWorkflowServiceTest.java @@ -57,6 +57,7 @@ import stirling.software.proprietary.model.api.ai.AiWorkflowOutcome; import stirling.software.proprietary.model.api.ai.AiWorkflowRequest; import stirling.software.proprietary.model.api.ai.AiWorkflowResponse; +import stirling.software.proprietary.policy.engine.PolicyExecutor; import tools.jackson.databind.ObjectMapper; import tools.jackson.databind.json.JsonMapper; @@ -107,18 +108,20 @@ void setUp() throws IOException { .when(fileIdStrategy.idFor(any(MultipartFile.class))) .thenAnswer(inv -> ((MultipartFile) inv.getArgument(0)).getOriginalFilename()); + PolicyExecutor policyExecutor = + new PolicyExecutor( + internalApiClient, toolMetadataService, tempFileManager, objectMapper); service = new AiWorkflowService( pdfDocumentFactory, aiEngineClient, pdfContentExtractor, objectMapper, - internalApiClient, fileStorage, - toolMetadataService, tempFileManager, fileIdStrategy, endpointResolver, + policyExecutor, null, new ApplicationProperties()); when(endpointResolver.getEnabledEndpointUrls()).thenReturn(List.of());