Skip to content

Commit bc24abe

Browse files
committed
Adding MassTransit Connector for ServiceControl
1 parent 5660411 commit bc24abe

File tree

58 files changed

+1799
-6
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+1799
-6
lines changed

menu/menu.yaml

+18-6
Original file line numberDiff line numberDiff line change
@@ -120,16 +120,22 @@
120120

121121
- Name: Service Platform
122122
Topics:
123-
- Title: About
123+
- Title: Introduction
124124
Url: platform
125-
- Title: Platform installation
125+
- Title: Installation
126126
Url: platform/installation
127127
- Title: Platform Sample installation
128128
Url: platform/platform-sample-package
129-
- Title: Connecting endpoints
130-
Url: platform/connecting
131-
- Title: Connection schema
132-
Url: platform/json-schema
129+
- Title: Connecting NServiceBus
130+
Articles:
131+
- Title: Using the NuGet package
132+
Url: platform/connecting
133+
- Title: Configuration schema
134+
Url: platform/json-schema
135+
- Title: Connecting MassTransit
136+
Articles:
137+
- Title: Using MassTransit Connector
138+
Url: platform/masstransit
133139
- Title: Contributing
134140
Url: platform/contributing
135141
- Title: Release notifications
@@ -1565,6 +1571,12 @@
15651571
Url: servicecontrol/contracts
15661572
- Title: Transport adapter
15671573
Url: servicecontrol/transport-adapter
1574+
- Title: MassTransit
1575+
Articles:
1576+
- Title: Connector Settings
1577+
Url: servicecontrol/masstransit
1578+
- Title: Docker Deployment
1579+
Url: servicecontrol/masstransit/docker-deployment
15681580
- Title: Migrations
15691581
Articles:
15701582
- Title: Overview

platform/architecture-overview-diagram-masstransit.drawio

+178
Large diffs are not rendered by default.

platform/architecture-overview-diagram-masstransit.svg

+4
Loading

platform/masstransit-overview.pdn

8.38 MB
Binary file not shown.

platform/masstransit-overview.png

3.82 MB
Loading

platform/masstransit-servicepulse.gif

306 KB
Loading

platform/masstransit.md

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
---
2+
title: The Particular Platform for MassTransit
3+
summary: How the Particular Platform works with MassTransit endpoints
4+
reviewed: 2024-09-04
5+
component: ServiceControl
6+
---
7+
8+
The Particular Service Platform provides recoverability features for MassTransit endpoints on RabbitMQ, Azure Service Bus, and Amazon SQS.
9+
10+
It auto-detects and ingests messages from the [error](https://masstransit.io/documentation/concepts/exceptions#error-pipe) and [dead-letter](https://masstransit.io/documentation/concepts/exceptions#dead-letter-pipe) queues for all endpoints running in your system. The platform provides an aggregated view of the information necessary to detect, diagnose, and fix problems causing the failures as well as the ability to schedule failed messages for re-processing.
11+
12+
![MassTransit Fault Management](masstransit-overview.png "width=715")
13+
*MassTransit error queue auto-discovery and management*
14+
15+
### Managing failures
16+
17+
After the ingestion, failing messages are available via ServicePulse which is the UI for the platform. It enables navigating the list of failures, displaying details of the failed messsage (including exception details), as well as scheduling message for reprocessing.
18+
19+
![Managing failures with ServicePulse](masstransit-servicepulse.gif)
20+
21+
In addition, ServicePulse offers more advanced features such as [retry redirects](/servicepulse/redirect.md) and [failure message editting](/servicepulse/intro-editing-messages.md).
22+
23+
<div class="text-center inline-download hidden-xs"><a id='masstransit-sample' target="_blank" href='https://github.com/particular/MassTransitShowcaseDemo/' class="btn btn-primary btn-lg"><span class="glyphicon glyphicon-download-alt" aria-hidden="true"></span> See it in action</a>
24+
</div>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>net8.0</TargetFramework>
5+
<OutputType>Exe</OutputType>
6+
<LangVersion>12.0</LangVersion>
7+
<ImplicitUsings>enable</ImplicitUsings>
8+
</PropertyGroup>
9+
10+
<ItemGroup>
11+
<ProjectReference Include="..\Helper\Helper.csproj" />
12+
<ProjectReference Include="..\Messages\Messages.csproj" />
13+
</ItemGroup>
14+
15+
<ItemGroup>
16+
<PackageReference Include="MassTransit.RabbitMQ" Version="8.2.5" />
17+
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
18+
</ItemGroup>
19+
</Project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
namespace Billing
2+
{
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using Microsoft.Extensions.Hosting;
6+
7+
class ConsoleBackgroundService(SimulationEffects state) : BackgroundService
8+
{
9+
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
10+
{
11+
while (true)
12+
{
13+
Console.Clear();
14+
await Console.Out.WriteLineAsync("""
15+
Billing Endpoint
16+
Press I to increase the simulated failure rate
17+
Press D to decrease the simulated failure rate
18+
Press R to reset simulation
19+
Press CTRL+C to quit
20+
21+
""");
22+
23+
state.WriteState(Console.Out);
24+
25+
while (!Console.KeyAvailable)
26+
{
27+
await Task.Delay(15, stoppingToken);
28+
}
29+
30+
var input = Console.ReadKey(true);
31+
32+
switch (input.Key)
33+
{
34+
case ConsoleKey.I:
35+
state.IncreaseFailureRate();
36+
break;
37+
case ConsoleKey.D:
38+
state.DecreaseFailureRate();
39+
break;
40+
case ConsoleKey.R:
41+
state.Reset();
42+
break;
43+
}
44+
}
45+
}
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
namespace Billing;
2+
3+
using System.Threading.Tasks;
4+
using Helper;
5+
using MassTransit;
6+
using Messages;
7+
8+
public class BillOrderConsumer(SimulationEffects simulationEffects) : IConsumer<OrderPlaced>
9+
{
10+
public async Task Consume(ConsumeContext<OrderPlaced> context)
11+
{
12+
await simulationEffects.SimulatedMessageProcessing(context.CancellationToken);
13+
14+
var orderBilled = new OrderBilled
15+
{
16+
OrderId = context.Message.OrderId
17+
};
18+
19+
await context.Publish(orderBilled);
20+
21+
await ConsoleHelper.WriteMessageProcessed(context.SentTime ?? DateTime.UtcNow);
22+
}
23+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
namespace Billing;
2+
3+
using Microsoft.Extensions.Hosting;
4+
using MassTransit;
5+
using Microsoft.Extensions.DependencyInjection;
6+
using System.Reflection;
7+
using System.Text;
8+
using Microsoft.Extensions.Logging;
9+
10+
class Program
11+
{
12+
public static IHostBuilder CreateHostBuilder(string[] args)
13+
{
14+
Console.OutputEncoding = Encoding.UTF8;
15+
var host = Host.CreateDefaultBuilder(args)
16+
.ConfigureLogging(cfg => cfg.SetMinimumLevel(LogLevel.Warning))
17+
.ConfigureServices((hostContext, services) =>
18+
{
19+
services.AddMassTransit(x =>
20+
{
21+
x.SetupTransport(args);
22+
x.AddConsumers(Assembly.GetExecutingAssembly());
23+
});
24+
25+
services.AddSingleton<SimulationEffects>();
26+
services.AddHostedService<ConsoleBackgroundService>();
27+
});
28+
29+
return host;
30+
}
31+
32+
static async Task Main(string[] args)
33+
{
34+
Console.Title = "Failure rate (Billing)";
35+
36+
var host = CreateHostBuilder(args).Build();
37+
await host.RunAsync();
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
{
2+
"profiles": {
3+
"RabbitMQ": {
4+
"commandName": "Project",
5+
"commandLineArgs": "--rabbitmq",
6+
"launchBrowser": false,
7+
"environmentVariables": {
8+
"ASPNETCORE_ENVIRONMENT": "Development"
9+
}
10+
},
11+
"Azure Service Bus": {
12+
"commandName": "Project",
13+
"commandLineArgs": "--azureservicebus",
14+
"launchBrowser": false,
15+
"environmentVariables": {
16+
"ASPNETCORE_ENVIRONMENT": "Development"
17+
}
18+
},
19+
"AmazonSQS": {
20+
"commandName": "Project",
21+
"commandLineArgs": "--amazonsqs",
22+
"launchBrowser": false,
23+
"environmentVariables": {
24+
"ASPNETCORE_ENVIRONMENT": "Development"
25+
}
26+
}
27+
}
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
namespace Billing;
2+
3+
public class SimulationEffects
4+
{
5+
public void IncreaseFailureRate() => failureRate = Math.Min(1, failureRate + FailureRateIncrement);
6+
7+
public void DecreaseFailureRate() => failureRate = Math.Max(0, failureRate - FailureRateIncrement);
8+
9+
public void WriteState(TextWriter output) => output.WriteLine("Failure rate: {0:P0}", failureRate);
10+
11+
public async Task SimulatedMessageProcessing(CancellationToken cancellationToken = default)
12+
{
13+
await Task.Delay(200, cancellationToken);
14+
15+
if (Random.Shared.NextDouble() < failureRate)
16+
{
17+
throw new Exception("BOOM! A failure occurred");
18+
}
19+
}
20+
21+
double failureRate = 0.5;
22+
const double FailureRateIncrement = 0.1;
23+
24+
public void Reset()
25+
{
26+
failureRate = 0;
27+
}
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>net8.0</TargetFramework>
5+
<OutputType>Exe</OutputType>
6+
<LangVersion>12.0</LangVersion>
7+
<ImplicitUsings>enable</ImplicitUsings>
8+
</PropertyGroup>
9+
10+
<ItemGroup>
11+
<PackageReference Include="MassTransit.RabbitMQ" Version="8.2.5" />
12+
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
13+
</ItemGroup>
14+
15+
<ItemGroup>
16+
<ProjectReference Include="..\Helper\Helper.csproj" />
17+
<ProjectReference Include="..\Messages\Messages.csproj" />
18+
</ItemGroup>
19+
</Project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
namespace ClientUI
2+
{
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using Microsoft.Extensions.Hosting;
6+
7+
class ConsoleBackgroundService(SimulatedCustomers simulatedCustomers) : BackgroundService
8+
{
9+
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
10+
{
11+
while (true)
12+
{
13+
Console.Clear();
14+
await Console.Out.WriteLineAsync("""
15+
Simulating customers placing orders on a website
16+
Press CTRL+C to quit
17+
18+
""");
19+
20+
simulatedCustomers.WriteState(Console.Out);
21+
22+
while (!Console.KeyAvailable)
23+
{
24+
await Task.Delay(15, stoppingToken);
25+
}
26+
27+
var input = Console.ReadKey(true);
28+
29+
switch (input.Key)
30+
{
31+
case ConsoleKey.I:
32+
simulatedCustomers.IncreaseTraffic();
33+
break;
34+
case ConsoleKey.D:
35+
simulatedCustomers.DecreaseTraffic();
36+
break;
37+
}
38+
}
39+
}
40+
}
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
namespace ClientUI;
2+
3+
using Microsoft.Extensions.Hosting;
4+
using MassTransit;
5+
using Microsoft.Extensions.DependencyInjection;
6+
using System.Reflection;
7+
using System.Text;
8+
using Microsoft.Extensions.Logging;
9+
10+
class Program
11+
{
12+
public static IHostBuilder CreateHostBuilder(string[] args)
13+
{
14+
Console.OutputEncoding = Encoding.UTF8;
15+
var host = Host.CreateDefaultBuilder(args)
16+
.ConfigureLogging(cfg => cfg.SetMinimumLevel(LogLevel.Warning))
17+
.ConfigureServices((_, services) =>
18+
{
19+
services.AddMassTransit(x =>
20+
{
21+
x.SetupTransport(args);
22+
x.AddConsumers(Assembly.GetExecutingAssembly());
23+
});
24+
25+
services.AddSingleton<SimulatedCustomers>();
26+
services.AddSingleton<ConsoleBackgroundService>();
27+
28+
services.AddHostedService(p => p.GetRequiredService<SimulatedCustomers>());
29+
services.AddHostedService(p => p.GetRequiredService<ConsoleBackgroundService>());
30+
});
31+
32+
return host;
33+
}
34+
35+
static async Task Main(string[] args)
36+
{
37+
Console.Title = "Load (ClientUI)";
38+
39+
var host = CreateHostBuilder(args).Build();
40+
await host.RunAsync();
41+
}
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
{
2+
"profiles": {
3+
"RabbitMQ": {
4+
"commandName": "Project",
5+
"commandLineArgs": "--rabbitmq",
6+
"launchBrowser": false,
7+
"environmentVariables": {
8+
"ASPNETCORE_ENVIRONMENT": "Development"
9+
}
10+
},
11+
"Azure Service Bus": {
12+
"commandName": "Project",
13+
"commandLineArgs": "--azureservicebus",
14+
"launchBrowser": false,
15+
"environmentVariables": {
16+
"ASPNETCORE_ENVIRONMENT": "Development"
17+
}
18+
},
19+
"AmazonSQS": {
20+
"commandName": "Project",
21+
"commandLineArgs": "--amazonsqs",
22+
"launchBrowser": false,
23+
"environmentVariables": {
24+
"ASPNETCORE_ENVIRONMENT": "Development"
25+
}
26+
}
27+
}
28+
}

0 commit comments

Comments
 (0)