Skip to content

Conversation

@paulhenri-l
Copy link
Contributor

@paulhenri-l paulhenri-l commented Nov 13, 2025

This PR adds flexibility to how and where workflows are queued and executed in world-postgres.

Queue Driver Abstraction

Previously, the queue implementation was hardcoded into the world. I've now extracted a QueueDriver interface that allows users to:

  • Use the default without any config (pg-boss)
  • Or Implement their custom queue drivers (useful if they already have a queue)

Proxy Strategies

Workflow code execution was done using the embedded world. I've added two execution strategies to handle this from within the world.

  • HTTP proxy: queue workers call /.well-known/workflow/v1/flow and /step endpoints
  • Function proxy: workers invoke directly the workflows/steps

Notes

  • To make the function proxy possible I had to add an export to the steps.js and workflows.js files
  • I need guidance on how it is expected to handle 503/Retry timeout. I think requeuing the job with proper delay is what's needed.
  • I may have missed other implementation details
  • I updated the README with some examples
  • The name "proxy" sounds a bit strange to me. In the end it's just a strategy pattern so maybe Strategy would be a better name

@changeset-bot
Copy link

changeset-bot bot commented Nov 13, 2025

🦋 Changeset detected

Latest commit: 376b4bd

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 10 packages
Name Type
@workflow/world-postgres Patch
@workflow/sveltekit Patch
@workflow/builders Patch
workflow Patch
@workflow/cli Patch
@workflow/next Patch
@workflow/nitro Patch
@workflow/ai Patch
@workflow/world-testing Patch
@workflow/nuxt Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@vercel
Copy link
Contributor

vercel bot commented Nov 13, 2025

@paulhenri-l is attempting to deploy a commit to the Vercel Labs Team on Vercel.

A member of the Team first needs to authorize it.

@paulhenri-l paulhenri-l force-pushed the world-postgres branch 3 times, most recently from 239af80 to b35cfcb Compare November 17, 2025 22:54
@paulhenri-l paulhenri-l marked this pull request as ready for review November 17, 2025 23:29
@pranaygp
Copy link
Collaborator

pranaygp commented Nov 18, 2025

@paulhenri-l are you able to rebase on main by chance? I just added some postgres e2e tests in #356 that we should be able to run against the postgres queue driver changes

handler
) => {
return async (req) => {
const secret = req.headers.get('X-Workflow-Secret');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The request body is parsed before the security token is validated, creating a vulnerability where attackers can consume server resources by sending invalid payloads without authorization.

View Details
📝 Patch Details
diff --git a/packages/world-postgres/src/queue.ts b/packages/world-postgres/src/queue.ts
index 85d38ef..7734792 100644
--- a/packages/world-postgres/src/queue.ts
+++ b/packages/world-postgres/src/queue.ts
@@ -60,7 +60,6 @@ export function createQueue(
   ) => {
     return async (req) => {
       const secret = req.headers.get('X-Workflow-Secret');
-      const [message, payload] = await parse(req);
 
       if (!secret || securityToken !== secret) {
         return Response.json(
@@ -69,6 +68,8 @@ export function createQueue(
         );
       }
 
+      const [message, payload] = await parse(req);
+
       if (!isValidQueueName(message.queueName)) {
         return Response.json(
           { error: `Invalid queue name: ${message.queueName}` },

Analysis

Request body parsed before authorization check enables DoS vulnerability

What fails: The createQueueHandler function in packages/world-postgres/src/queue.ts parses and deserializes the request body before validating the security token, allowing attackers to consume server resources by sending invalid payloads without proper authorization.

How to reproduce:

# Send a request with an invalid or missing X-Workflow-Secret header
# The parse() function will still execute its expensive operations:
# - JSON parsing and schema validation
# - Base64 decoding  
# - JsonTransport deserialization
curl -X POST http://localhost:3000/queue \
  -H "Content-Type: application/json" \
  -d '{"queueName":"__wkf_workflow_test","data":"...","attempt":1,"messageId":"msg_123","id":"test"}'

Result: The request body is fully parsed despite failing authentication. An attacker can send many requests with large or complex payloads without valid credentials, consuming CPU and memory resources on the server.

Expected behavior: According to OWASP DoS Cheat Sheet, "using validation that is cheap in resources first" is a key mitigation. Authentication verification should occur before expensive operations like full body parsing and deserialization. Per security best practices, "access control should come before extensive validation" to prevent DoS attacks and resource exhaustion.

Impact:

  • DoS attacks: Attackers can exhaust server resources (CPU, memory) by sending large or complex payloads without valid credentials
  • Resource exhaustion: Each unauthorized request still processes CPU-intensive operations
  • Information disclosure: Different error responses during parsing vs. authorization could leak information

Fix applied: Moved the parse(req) call after the security token validation check, ensuring unauthorized requests fail fast without consuming parsing resources.

Comment on lines 40 to 48
import { createWorld, createPgBossQueue } from "@workflow/world-postgres";

const world = createWorld({
connectionString: "postgres://username:password@localhost:5432/database",
jobPrefix: "myapp", // optional
queueConcurrency: 10, // optional
securityToken: "your-secret-token-here",
queueFactory: createPgBossHttpProxyQueue({
jobPrefix: "my-app",
queueConcurrency: 10,
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import { createWorld, createPgBossQueue } from "@workflow/world-postgres";
const world = createWorld({
connectionString: "postgres://username:password@localhost:5432/database",
jobPrefix: "myapp", // optional
queueConcurrency: 10, // optional
securityToken: "your-secret-token-here",
queueFactory: createPgBossHttpProxyQueue({
jobPrefix: "my-app",
queueConcurrency: 10,
})
import { createWorld, createPgBossHttpProxyQueue } from "@workflow/world-postgres";
const world = createWorld({
connectionString: "postgres://username:password@localhost:5432/database",
securityToken: "your-secret-token-here",
queueFactory: () => createPgBossHttpProxyQueue({
jobPrefix: "my-app",
queueConcurrency: 10,
})

The README code example for programmatic usage has an incorrect import (createPgBossQueue instead of createPgBossHttpProxyQueue) and passes the queue instance directly to queueFactory instead of wrapping it in a function, which violates the expected type signature () => QueueDriver.

View Details

Analysis

README contains incorrect code example for queueFactory configuration

What fails: The code example in packages/world-postgres/README.md lines 39-49 has two issues:

  1. Incorrect import statement: imports createPgBossQueue instead of createPgBossHttpProxyQueue
  2. Incorrect API usage: passes the result of createPgBossHttpProxyQueue({...}) directly to queueFactory instead of wrapping it in a function

How to reproduce: Copy the code example from packages/world-postgres/README.md lines 39-49 into a TypeScript project and attempt to use it:

  • TypeScript type checking will fail because queueFactory expects () => QueueDriver (a function), but receives QueueDriver (an instance)
  • The code will also fail at runtime when createWorld() attempts to invoke opts.queueFactory() on a non-function value

What happened vs expected:

  • Current (broken):

    import { createWorld, createPgBossQueue } from "@workflow/world-postgres";
    queueFactory: createPgBossHttpProxyQueue({...})
  • Expected:

    import { createWorld, createPgBossHttpProxyQueue } from "@workflow/world-postgres";
    queueFactory: () => createPgBossHttpProxyQueue({...})

The type definition in packages/world-postgres/src/config.ts shows queueFactory?: () => QueueDriver, and the implementation in packages/world-postgres/src/index.ts (line 18) calls opts.queueFactory() - confirming it must be a function. The correct pattern is demonstrated elsewhere in the same README at lines 118-122 and line 168.

Signed-off-by: paulhenri-l <[email protected]>
Signed-off-by: paulhenri-l <[email protected]>
Signed-off-by: paulhenri-l <[email protected]>
Signed-off-by: paulhenri-l <[email protected]>
Signed-off-by: paulhenri-l <[email protected]>
Signed-off-by: paulhenri-l <[email protected]>
Signed-off-by: paulhenri-l <[email protected]>
Signed-off-by: paulhenri-l <[email protected]>
Signed-off-by: paulhenri-l <[email protected]>
Signed-off-by: paulhenri-l <[email protected]>
Signed-off-by: paulhenri-l <[email protected]>
Signed-off-by: paulhenri-l <[email protected]>
Signed-off-by: paulhenri-l <[email protected]>
Signed-off-by: paulhenri-l <[email protected]>
@Schniz
Copy link
Collaborator

Schniz commented Dec 1, 2025

Hey! Sorry for not getting to this sooner :pray_parrot:

I'm very happy to see this contribution, but I have hesitations regarding it. As I understand, the queue driver is basically a way to configure Queue, right?

so, essentially it does the same as the following:

export function createWorld(): World {
  return { ...createPostgres(...), ...myQueueImplementation(...) }
}

I might be completely wrong here, so I'm happy to be corrected!

but if I'm right: I wonder what is the value of adding another named abstraction -- and maybe what we should aim for is better documentation/support/building blocks for "custom worlds colocated in user code"?

I do think there's immense value in generating a "worker" build for workflow/step execution to avoid meaningless conversions from/to HTTP for the environments that don't need it. Initially the Postgres implementation did it because

  1. it was easy and smaller diff
  2. it's easier to demo with single commands like pnpm dev

but the notion of a runnable worker (running with bull.js, graphile, sidekiq, faktory, SNS or whatever) makes total sense to me.

But I still (respectfully!) question the value of a new "queue driver" abstraction and I hope we can discuss about it here.

Again, apologies for chiming in late -- feels incredibly bad on my part especially given you've put a lot of effort on this PR (and got a following with #467) and I'm acting like kind of a buzzkill.

cc @pranaygp @VaguelySerious regarding this

@pranaygp
Copy link
Collaborator

pranaygp commented Dec 1, 2025

@Schniz yeah +1 here on not introducing a "queue driver" abstraction - it defeats the point of world already. composing worlds like you described makes more sense. we can try to marshall typescript/types for world if we want with something like PartialWorld and CompleteWorld but I'm ok not overengineering it for now if we don't need to

also I agree on not going to http if we don't need to, for things like postgres/graphile world. however, in my mind, we can just reuse the exisitng world abstraction to do this correctly? i.e. we still create the routes/bundles, but they can either not be served or respond 4XX to any incoming http request. Then, the world setup or createStepHandler etc. can start the worker on the queue and have it actually execute the bundle directly, like @paulhenri-l and I discussed

I left another comment on the graphile PR btw: #467 (comment)

@paulhenri-l
Copy link
Contributor Author

paulhenri-l commented Dec 2, 2025

Hey @Schniz np, I can see that the whole team is quite buzzy!

The main idea behind the queue driver is to have a default provided by workflow while still not locking the developer in a specific solution. "We provide the queue but you can bring your own"

The way I found to answer to this was this queue driver abstraction, allowing users to opt out of workflows's way and integrate it however they want within their stack.

It may be my OOP background that pushed me into structuring it like this. It is true that all in all it's the equivalent of:

export function createWorld(): World {
  return { ...createPostgres(...), ...myQueueImplementation(...) }
}

The only difference is about on which side we want to push the complexity: workflow or the user, as I think the interface of the queue abstraction is easier to work with than the world queue. I think #467 shows that.

There is also that createQueueHandler doesn't really need to change if you change the underlying queue, but currently the user will have to reimplement it. Whereas the queue abstraction offers a more focused pushStep(), pushFlow() and start() API.

That is mostly DX. I designed it for me as an end user of workflow and wanted the smallest API possible to have control over queuing and execution (I self host on a VPS and have already a queue running) while leaving as much code as possible in workflow's codebase. But it's for you the maintainers to decide where to draw the line between the project's complexity and DX.

and maybe what we should aim for is better documentation/support/building blocks for "custom worlds colocated in user code"?

That would make sense, honestly I didn't think of doing it the way you described before you mentioned it.

I do think there's immense value in generating a "worker" build for workflow/step execution to avoid meaningless conversions from/to HTTP for the environments that don't need it. Initially the Postgres implementation did it because

This was also one of the main reasons I wanted to work on this, I didn't want my http server to handle "background" tasks. I dealt with this by editing the builders to make sure the entrypoints can be imported and used (via the proxy abstraction). A proper worker only build would be much better and avoid having to deal with the generated routes and any proxy abstraction.

we can just reuse the exisitng world abstraction to do this correctly? i.e. we still create the routes/bundles, but they can either not be served or respond 4XX to any incoming http request.

@pranaygp I think that's more or less what's happening here, the bundles have a new export and the handler requires a security token. Without it the route will return a 401

@pranaygp @Schniz I'm available if any of you want to discuss this in more details over a video call

Copy link
Collaborator

pranaygp commented Dec 2, 2025

thanks for the detailed response - I’ll try to deep review the code once again and see if I can get it running myself locally to share more thoughts too

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants