title | summary | component | reviewed | related | |
---|---|---|---|---|---|
Steps, Stages and Connectors |
The pipeline is composed of a number of Stages that communicate via Connectors |
Core |
2024-02-16 |
|
Each pipeline is composed of steps. A step is an identifiable value in the pipeline used to programmatically define order of execution. Each step represents a behavior which will be executed at the given place within the pipeline. Add additional behavior to the pipeline by registering a new step or replace the behavior of an existing step with the custom implementation.
partial: stages
partial: incoming
- Operation specific processing: There is a dedicated stage for each context operation (e.g. Send, Publish, Subscribe, ...). Behaviors can use one of the following contexts:
IOutgoingSendContext
,IOutgoingPublishContext
,IOutgoingReplyContext
,ISubscribeContext
,IUnsubscribeContext
. Subscribe and Unsubscribe are not shown on the diagram below. - Outgoing Logical Message: Behaviors on this stage have access to the message which should be sent. Use
IOutgoingLogicalMessageContext
in a behavior to enlist in this stage. - Outgoing Physical Message: Enables to access the serialized message. This stage provides an
IOutgoingPhysicalMessageContext
instance to its behaviors. - Routing: Allows the selected routing strategies for outgoing messages to be manipulated. If the outgoing pipeline was initiated by the incoming pipeline, this stage collects all outgoing operations (except for immediate dispatch messages).
This stage provides an
IRoutingContext
instance to its behaviors. - Batch Dispatch: Forwards all collected outgoing operations to the dispatch stage once message processing has been completed. This stage provides access to the collection of transport operations that are to be dispatched. This stage provides an
IBatchDispatchContext
instance to its behaviors. - Dispatch: Provides access to outgoing dispatch operations before they are handed off to the transport. This stage provides an
IDispatchContext
instance to its behaviors.
graph LR
UC((Initiating User Code))
subgraph Outgoing Pipeline
Outgoing{Outgoing}
OP[Outgoing<br>Publish]
OS[Outgoing<br>Send]
OR[Outgoing<br>Reply]
OLM[Outgoing<br>Logical<br>Message]
OPM[Outgoing<br>Physical<br>Message]
Routing[Routing]
end
subgraph Dispatch Phase
Dispatch{Dispatch}
Transport((Transport))
BD[Batch<br>Dispatch]
ID[Immediate<br>Dispatch]
end
UC --> Outgoing
Outgoing --> OP
Outgoing --> OS
Outgoing --> OR
OP --> OLM
OS --> OLM
OR --> OLM
OLM --> OPM
OPM --> Routing
Routing --> Dispatch
ID--> Transport
Dispatch --> BD
Dispatch --> ID
BD --> Transport
partial: recoverability
partial: optional
Pipeline contexts have an extension bag which can be used to create, read, update or delete custom state with a key identifier. For example, this can be used to set metadata- or pipeline-specific state in an incoming behavior that can be used in later pipeline stages if needed. State stored via the extension bag will not be available once the extension bag runs out of scope at the end of the pipeline.
State set during a forked pipeline will not be available to the forking pipeline. For example, state changes during the outgoing pipeline will not be available in the incoming pipeline. If state has to be propagated, have the forking pipeline set a context object that the forked pipeline later can get and modify as needed.
snippet: SetContextBetweenIncomingAndOutgoing
Stage connectors connect from the current stage (e.g. IOutgoingLogicalMessageContext
) to another stage (e.g. IOutgoingPhysicalMessageContext
). In order to override an existing stage, inherit from StageConnector<TFromContext, TToContext>
and then replace an existing stage connector. Most pipeline extensions can be done by inheriting from Behavior<TContext>
. It is rarely necessary to replace existing stage connectors. When implementing a stage connector, ensure that all required data is passed along for the next stage.
snippet: CustomStageConnector
graph LR
subgraph Root pipeline
A[TFromContext] --- B{Fork Connector}
B --- C[TFromContext]
end
subgraph Forked pipeline
B --> D[TForkContext]
end
Fork connectors fork from a current stage (e.g. IIncomingPhysicalMessageContext
) to another independent pipeline (e.g. IAuditContext
). In order to override an existing fork connector inherit from ForkConnector<TFromContext, TForkContext>
and then replace an existing fork connector.
snippet: CustomForkConnector
graph LR
subgraph Root stage
A[TFromContext] --- B{StageFork<br/>Connector}
B --- C[TToContext]
end
subgraph Forked pipeline
B --> D[TForkContext]
end
Stage fork connectors are essentially a combination of a stage connector and a fork connector. They have the ability to connect from the current stage (e.g. ITransportReceiveContext
) to another stage (e.g. IIncomingPhysicalMessageContext
) and fork to another independent pipeline (e.g. IBatchedDispatchContext
). In order to override an existing stage fork connector inherit from StageForkConnector<TFromContext, TToContext, TForkContext
and then replace an existing stage fork connector.
snippet: CustomStageForkConnector