1
- using System ;
1
+ using System ;
2
2
using NServiceBus . Unicast . Distributor ;
3
3
using System . Messaging ;
4
4
using NServiceBus . Utils ;
@@ -12,7 +12,7 @@ namespace NServiceBus.Distributor.MsmqWorkerAvailabilityManager
12
12
public class MsmqWorkerAvailabilityManager : IWorkerAvailabilityManager
13
13
{
14
14
MessageQueue storageQueue ;
15
-
15
+ object lockobject = new object ( ) ;
16
16
/// <summary>
17
17
/// Sets the path to the queue that will be used for storing
18
18
/// worker availability.
@@ -29,11 +29,13 @@ public class MsmqWorkerAvailabilityManager : IWorkerAvailabilityManager
29
29
/// </param>
30
30
public void ClearAvailabilityForWorker ( Address address )
31
31
{
32
- var existing = storageQueue . GetAllMessages ( ) ;
33
-
34
- foreach ( var m in existing )
35
- if ( MsmqUtilities . GetIndependentAddressForQueue ( m . ResponseQueue ) == address )
36
- storageQueue . ReceiveById ( m . Id , MessageQueueTransactionType . Automatic ) ;
32
+ lock ( lockobject )
33
+ {
34
+ var existing = storageQueue . GetAllMessages ( ) ;
35
+ foreach ( var m in existing )
36
+ if ( MsmqUtilities . GetIndependentAddressForQueue ( m . ResponseQueue ) == address )
37
+ storageQueue . ReceiveById ( m . Id , MessageQueueTransactionType . Automatic ) ;
38
+ }
37
39
}
38
40
39
41
/// <summary>
@@ -44,14 +46,17 @@ public Address PopAvailableWorker()
44
46
{
45
47
try
46
48
{
47
- var m = storageQueue . Receive ( TimeSpan . Zero , MessageQueueTransactionType . Automatic ) ;
48
-
49
- if ( m == null )
49
+ Message m ;
50
+ lock ( lockobject )
51
+ m = storageQueue . Receive ( TimeSpan . Zero , MessageQueueTransactionType . Automatic ) ;
52
+
53
+ if ( m == null )
50
54
return null ;
51
-
55
+
52
56
return MsmqUtilities . GetIndependentAddressForQueue ( m . ResponseQueue ) ;
57
+
53
58
}
54
- catch ( Exception )
59
+ catch ( Exception )
55
60
{
56
61
return null ;
57
62
}
@@ -65,9 +70,9 @@ public void Start()
65
70
var path = MsmqUtilities . GetFullPath ( StorageQueueAddress ) ;
66
71
67
72
storageQueue = new MessageQueue ( path ) ;
68
-
69
- if ( ! storageQueue . Transactional )
70
- throw new Exception ( "Queue must be transactional." ) ;
73
+
74
+ if ( ! storageQueue . Transactional )
75
+ throw new Exception ( "Queue must be transactional." ) ;
71
76
}
72
77
73
78
/// <summary>
@@ -79,11 +84,14 @@ public void Start()
79
84
/// <param name="capacity">The number of messages that this worker is ready to process</param>
80
85
public void WorkerAvailable ( Address address , int capacity )
81
86
{
82
- for ( var i = 0 ; i < capacity ; i ++ )
83
- storageQueue . Send ( new Message
84
- {
85
- ResponseQueue = new MessageQueue ( MsmqUtilities . GetFullPath ( address ) )
86
- } , MessageQueueTransactionType . Automatic ) ;
87
+ lock ( lockobject )
88
+ {
89
+ for ( var i = 0 ; i < capacity ; i ++ )
90
+ storageQueue . Send ( new Message
91
+ {
92
+ ResponseQueue = new MessageQueue ( MsmqUtilities . GetFullPath ( address ) )
93
+ } , MessageQueueTransactionType . Automatic ) ;
94
+ }
87
95
}
88
96
}
89
- }
97
+ }
0 commit comments