Skip to content

Commit cc96620

Browse files
authored
Merge pull request #1 from Particular/core-v5
Core v5
2 parents 4285d8b + c753a7d commit cc96620

9 files changed

+272
-7
lines changed

packaging/nuget/nservicebus.metrics.servicecontrol.msmq.nuspec

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
<copyright>$copyright$</copyright>
1414
<tags>$tags$</tags>
1515
<dependencies>
16-
<dependency id="NServiceBus.Metrics" version="[2.0.0,3.0.0)" />
16+
<dependency id="NServiceBus.Metrics.ServiceControl" version="[1.3.0-rc0002,2.0.0)" />
17+
<dependency id="NServiceBus" version="[5.2,6)" />
1718
</dependencies>
1819
</metadata>
1920
<files>

src/NServiceBus.Metrics.ServiceControl.Msmq.sln

+6
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NServiceBus.Metrics.Service
77
EndProject
88
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NServiceBus.Metrics.ServiceControl.Msmq.Tests", "NServiceBus.Metrics.ServiceControl..Msmq.Tests\NServiceBus.Metrics.ServiceControl.Msmq.Tests.csproj", "{09E52424-06AF-4948-92D8-560491309181}"
99
EndProject
10+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SampleEndpoint", "SampleEndpoint\SampleEndpoint.csproj", "{76FADEEE-3788-4F59-902A-A970167CE11F}"
11+
EndProject
1012
Global
1113
GlobalSection(SolutionConfigurationPlatforms) = preSolution
1214
Debug|Any CPU = Debug|Any CPU
@@ -21,6 +23,10 @@ Global
2123
{09E52424-06AF-4948-92D8-560491309181}.Debug|Any CPU.Build.0 = Debug|Any CPU
2224
{09E52424-06AF-4948-92D8-560491309181}.Release|Any CPU.ActiveCfg = Release|Any CPU
2325
{09E52424-06AF-4948-92D8-560491309181}.Release|Any CPU.Build.0 = Release|Any CPU
26+
{76FADEEE-3788-4F59-902A-A970167CE11F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
27+
{76FADEEE-3788-4F59-902A-A970167CE11F}.Debug|Any CPU.Build.0 = Debug|Any CPU
28+
{76FADEEE-3788-4F59-902A-A970167CE11F}.Release|Any CPU.ActiveCfg = Release|Any CPU
29+
{76FADEEE-3788-4F59-902A-A970167CE11F}.Release|Any CPU.Build.0 = Release|Any CPU
2430
EndGlobalSection
2531
GlobalSection(SolutionProperties) = preSolution
2632
HideSolutionNode = FALSE

src/NServiceBus.Metrics.ServiceControl.Msmq/Class1.cs

-6
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
using System;
2+
using System.Messaging;
3+
using System.Runtime.InteropServices;
4+
5+
static class MsmqExtensions
6+
{
7+
/// <remarks>
8+
/// Source: http://functionalflow.co.uk/blog/2008/08/27/counting-the-number-of-messages-in-a-message-queue-in/
9+
/// </remarks>
10+
public static int GetCount(this MessageQueue self)
11+
{
12+
var props = new Win32.MQMGMTPROPS {cProp = 1};
13+
try
14+
{
15+
props.aPropID = Marshal.AllocHGlobal(sizeof(int));
16+
Marshal.WriteInt32(props.aPropID, Win32.PROPID_MGMT_QUEUE_MESSAGE_COUNT);
17+
18+
props.aPropVar = Marshal.AllocHGlobal(Marshal.SizeOf(typeof(Win32.MQPROPVariant)));
19+
Marshal.StructureToPtr(new Win32.MQPROPVariant {vt = Win32.VT_NULL}, props.aPropVar, false);
20+
21+
props.status = Marshal.AllocHGlobal(sizeof(int));
22+
Marshal.WriteInt32(props.status, 0);
23+
24+
var result = Win32.MQMgmtGetInfo(null, "queue=" + self.FormatName, ref props);
25+
if (result != 0)
26+
{
27+
throw new InvalidOperationException($"Unable to retrieve queue information (error: {result:x8}");
28+
}
29+
30+
if (Marshal.ReadInt32(props.status) != 0)
31+
{
32+
return -1;
33+
}
34+
35+
var propVar = (Win32.MQPROPVariant)Marshal.PtrToStructure(props.aPropVar, typeof(Win32.MQPROPVariant));
36+
37+
return propVar.vt != Win32.VT_UI4
38+
? 0
39+
: Convert.ToInt32(propVar.ulVal);
40+
}
41+
finally
42+
{
43+
Marshal.FreeHGlobal(props.aPropID);
44+
Marshal.FreeHGlobal(props.aPropVar);
45+
Marshal.FreeHGlobal(props.status);
46+
}
47+
}
48+
49+
static class Win32
50+
{
51+
[DllImport("mqrt.dll")]
52+
internal static extern uint MQMgmtGetInfo([MarshalAs(UnmanagedType.BStr)] string computerName, [MarshalAs(UnmanagedType.BStr)] string objectName, ref MQMGMTPROPS mgmtProps);
53+
54+
public const byte VT_NULL = 1;
55+
public const byte VT_UI4 = 19;
56+
public const int PROPID_MGMT_QUEUE_MESSAGE_COUNT = 7;
57+
58+
//size must be 16
59+
[StructLayout(LayoutKind.Sequential)]
60+
internal struct MQPROPVariant
61+
{
62+
public byte vt; //0
63+
public byte spacer; //1
64+
public short spacer2; //2
65+
public int spacer3; //4
66+
public uint ulVal; //8
67+
public int spacer4; //12
68+
}
69+
70+
//size must be 16 in x86 and 28 in x64
71+
[StructLayout(LayoutKind.Sequential)]
72+
internal struct MQMGMTPROPS
73+
{
74+
public uint cProp;
75+
public IntPtr aPropID;
76+
public IntPtr aPropVar;
77+
public IntPtr status;
78+
}
79+
}
80+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
using System.Messaging;
2+
using NServiceBus;
3+
using NServiceBus.Metrics.ServiceControl;
4+
5+
class MsmqNativeQueueLengthReporter
6+
{
7+
IReportNativeQueueLength nativeQueueLengthReporter;
8+
readonly Configure configure;
9+
string localAddress;
10+
MessageQueue messageQueue;
11+
12+
public MsmqNativeQueueLengthReporter(IReportNativeQueueLength nativeQueueLengthReporter, Configure configure)
13+
{
14+
this.nativeQueueLengthReporter = nativeQueueLengthReporter;
15+
this.configure = configure;
16+
}
17+
18+
public void Warmup()
19+
{
20+
localAddress = configure.LocalAddress.ToString();
21+
messageQueue = new MessageQueue($@".\private$\{configure.LocalAddress.Queue}", QueueAccessMode.Peek);
22+
}
23+
24+
public void ReportNativeQueueLength()
25+
{
26+
nativeQueueLengthReporter.ReportQueueLength(localAddress, messageQueue.GetCount());
27+
}
28+
}

src/NServiceBus.Metrics.ServiceControl.Msmq/NServiceBus.Metrics.ServiceControl.Msmq.csproj

+9
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,16 @@
1010
<DebugSymbols>true</DebugSymbols>
1111
</PropertyGroup>
1212
<ItemGroup>
13+
<PackageReference Include="NServiceBus.Metrics.ServiceControl" Version="1.3.0-rc0002" />
14+
<PackageReference Include="NServiceBus" Version="5.0.0" />
1315
<PackageReference Include="GitVersionTask" Version="3.6.5" />
1416
<PackageReference Include="NuGetPackager" Version="0.6.5" />
1517
</ItemGroup>
18+
<ItemGroup>
19+
<Reference Include="System.Messaging">
20+
<HintPath>..\..\..\..\Program Files (x86)\Reference Assemblies\Microsoft\Framework\.NETFramework\v4.5\System.Messaging.dll</HintPath>
21+
</Reference>
22+
</ItemGroup>
23+
24+
1625
</Project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using NServiceBus;
5+
using NServiceBus.Features;
6+
using NServiceBus.Logging;
7+
using NServiceBus.Transports;
8+
9+
class ReportMsmqNativeQueueLength : Feature
10+
{
11+
public ReportMsmqNativeQueueLength()
12+
{
13+
EnableByDefault();
14+
DependsOn("ServiceControlMonitoring");
15+
Prerequisite(ctx => ctx.Settings.Get<TransportDefinition>() is MsmqTransport, "MSMQ Transport not configured");
16+
}
17+
18+
protected override void Setup(FeatureConfigurationContext context)
19+
{
20+
context.Container.ConfigureComponent<MsmqNativeQueueLengthReporter>(DependencyLifecycle.SingleInstance);
21+
context.Container.ConfigureComponent<PeriodicallyReportQueueLength>(DependencyLifecycle.SingleInstance);
22+
23+
RegisterStartupTask<PeriodicallyReportQueueLength>();
24+
}
25+
26+
class PeriodicallyReportQueueLength : FeatureStartupTask
27+
{
28+
TimeSpan delayBetweenReports = TimeSpan.FromSeconds(1);
29+
CancellationTokenSource cancellationTokenSource;
30+
Task task;
31+
MsmqNativeQueueLengthReporter reporter;
32+
33+
public PeriodicallyReportQueueLength(MsmqNativeQueueLengthReporter reporter)
34+
{
35+
this.reporter = reporter;
36+
}
37+
38+
protected override void OnStart()
39+
{
40+
cancellationTokenSource = new CancellationTokenSource();
41+
task = Task.Run(async () =>
42+
{
43+
reporter.Warmup();
44+
while (!cancellationTokenSource.IsCancellationRequested)
45+
{
46+
try
47+
{
48+
await Task.Delay(delayBetweenReports, cancellationTokenSource.Token).ConfigureAwait(false);
49+
reporter.ReportNativeQueueLength();
50+
}
51+
catch (TaskCanceledException)
52+
{
53+
// Ignore cancellation. It means we are shutting down
54+
}
55+
catch (Exception ex)
56+
{
57+
Log.Warn("Error reporting MSMQ native queue length", ex);
58+
}
59+
}
60+
});
61+
}
62+
63+
protected override void OnStop()
64+
{
65+
cancellationTokenSource.Cancel();
66+
task.GetAwaiter().GetResult();
67+
}
68+
69+
static ILog Log = LogManager.GetLogger<PeriodicallyReportQueueLength>();
70+
}
71+
}

src/SampleEndpoint/Program.cs

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
using System;
2+
3+
namespace SampleEndpoint
4+
{
5+
using System.Threading;
6+
using NServiceBus;
7+
using NServiceBus.Config;
8+
using NServiceBus.Config.ConfigurationSource;
9+
using NServiceBus.Metrics.ServiceControl;
10+
11+
class Program
12+
{
13+
static void Main(string[] args)
14+
{
15+
var config = new BusConfiguration();
16+
config.EndpointName("SomeName");
17+
config.SendMetricDataToServiceControl("Particular.Monitoring");
18+
config.UsePersistence<InMemoryPersistence>();
19+
20+
using (var bus = Bus.Create(config).Start())
21+
{
22+
Console.WriteLine("Bus Started");
23+
24+
while (Console.ReadKey(true).Key != ConsoleKey.Escape)
25+
{
26+
bus.SendLocal(new SomeMessage());
27+
}
28+
29+
Console.WriteLine("Bus Stopped");
30+
}
31+
}
32+
}
33+
34+
class SomeMessage : ICommand
35+
{
36+
37+
}
38+
39+
class SomeMessageHandler : IHandleMessages<SomeMessage>
40+
{
41+
public void Handle(SomeMessage message)
42+
{
43+
Thread.Sleep(TimeSpan.FromMilliseconds(500));
44+
}
45+
}
46+
47+
48+
class AuditConfigProvider : IProvideConfiguration<AuditConfig>
49+
{
50+
51+
public AuditConfig GetConfiguration() => new AuditConfig
52+
{
53+
QueueName = "audit"
54+
};
55+
}
56+
57+
class ErrorConfigProvider : IProvideConfiguration<MessageForwardingInCaseOfFaultConfig>
58+
{
59+
public MessageForwardingInCaseOfFaultConfig GetConfiguration() => new MessageForwardingInCaseOfFaultConfig
60+
{
61+
ErrorQueue = "error"
62+
};
63+
}
64+
}
+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>net452</TargetFramework>
6+
</PropertyGroup>
7+
8+
<ItemGroup>
9+
<ProjectReference Include="..\NServiceBus.Metrics.ServiceControl.Msmq\NServiceBus.Metrics.ServiceControl.Msmq.csproj" />
10+
</ItemGroup>
11+
12+
</Project>

0 commit comments

Comments
 (0)