Skip to content

Commit 770c110

Browse files
committed
model with validation
1 parent 00cd677 commit 770c110

File tree

5 files changed

+358
-25
lines changed

5 files changed

+358
-25
lines changed

src/NServiceBus.Transport.RabbitMQ/Connection/ConfirmsAwareChannel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ sealed class ConfirmsAwareChannel : IDisposable
1212
{
1313
public ConfirmsAwareChannel(IConnection connection, IRoutingTopology routingTopology, bool usePublisherConfirms)
1414
{
15-
channel = connection.CreateModel();
15+
channel = new ModelWithValidation(connection.CreateModel());
1616
channel.BasicReturn += Channel_BasicReturn;
1717

1818
this.routingTopology = routingTopology;
Lines changed: 357 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,357 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
using RabbitMQ.Client;
5+
using RabbitMQ.Client.Events;
6+
7+
namespace NServiceBus.Transport.RabbitMQ
8+
{
9+
class ModelWithValidation : IModel
10+
{
11+
IModel model;
12+
13+
public ModelWithValidation(IModel model)
14+
{
15+
this.model = model;
16+
}
17+
18+
public void Dispose()
19+
{
20+
model.Dispose();
21+
}
22+
23+
public void Abort()
24+
{
25+
model.Abort();
26+
}
27+
28+
public void Abort(ushort replyCode, string replyText)
29+
{
30+
model.Abort(replyCode, replyText);
31+
}
32+
33+
public void BasicAck(ulong deliveryTag, bool multiple)
34+
{
35+
model.BasicAck(deliveryTag, multiple);
36+
}
37+
38+
public void BasicCancel(string consumerTag)
39+
{
40+
model.BasicCancel(consumerTag);
41+
}
42+
43+
public void BasicCancelNoWait(string consumerTag)
44+
{
45+
model.BasicCancelNoWait(consumerTag);
46+
}
47+
48+
public string BasicConsume(string queue, bool autoAck, string consumerTag, bool noLocal, bool exclusive, IDictionary<string, object> arguments,
49+
IBasicConsumer consumer)
50+
{
51+
return model.BasicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumer);
52+
}
53+
54+
public BasicGetResult BasicGet(string queue, bool autoAck)
55+
{
56+
return model.BasicGet(queue, autoAck);
57+
}
58+
59+
public void BasicNack(ulong deliveryTag, bool multiple, bool requeue)
60+
{
61+
model.BasicNack(deliveryTag, multiple, requeue);
62+
}
63+
64+
public void BasicPublish(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties,
65+
ReadOnlyMemory<byte> body)
66+
{
67+
model.BasicPublish(exchange, routingKey, mandatory, basicProperties, body);
68+
}
69+
70+
public void BasicQos(uint prefetchSize, ushort prefetchCount, bool global)
71+
{
72+
model.BasicQos(prefetchSize, prefetchCount, global);
73+
}
74+
75+
public void BasicRecover(bool requeue)
76+
{
77+
model.BasicRecover(requeue);
78+
}
79+
80+
public void BasicRecoverAsync(bool requeue)
81+
{
82+
model.BasicRecoverAsync(requeue);
83+
}
84+
85+
public void BasicReject(ulong deliveryTag, bool requeue)
86+
{
87+
model.BasicReject(deliveryTag, requeue);
88+
}
89+
90+
public void Close()
91+
{
92+
model.Close();
93+
}
94+
95+
public void Close(ushort replyCode, string replyText)
96+
{
97+
model.Close(replyCode, replyText);
98+
}
99+
100+
public void ConfirmSelect()
101+
{
102+
model.ConfirmSelect();
103+
}
104+
105+
public IBasicPublishBatch CreateBasicPublishBatch()
106+
{
107+
return model.CreateBasicPublishBatch();
108+
}
109+
110+
public IBasicProperties CreateBasicProperties()
111+
{
112+
return model.CreateBasicProperties();
113+
}
114+
115+
public void ExchangeBind(string destination, string source, string routingKey, IDictionary<string, object> arguments)
116+
{
117+
ThrowIfShortStringIsTooLong(destination, nameof(destination));
118+
ThrowIfShortStringIsTooLong(source, nameof(source));
119+
ThrowIfShortStringIsTooLong(routingKey, nameof(routingKey));
120+
121+
model.ExchangeBind(destination, source, routingKey, arguments);
122+
}
123+
124+
public void ExchangeBindNoWait(string destination, string source, string routingKey, IDictionary<string, object> arguments)
125+
{
126+
ThrowIfShortStringIsTooLong(destination, nameof(destination));
127+
ThrowIfShortStringIsTooLong(source, nameof(source));
128+
ThrowIfShortStringIsTooLong(routingKey, nameof(routingKey));
129+
130+
model.ExchangeBindNoWait(destination, source, routingKey, arguments);
131+
}
132+
133+
public void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments)
134+
{
135+
ThrowIfShortStringIsTooLong(exchange, nameof(exchange));
136+
ThrowIfShortStringIsTooLong(type, nameof(type));
137+
138+
model.ExchangeDeclare(exchange, type, durable, autoDelete, arguments);
139+
}
140+
141+
public void ExchangeDeclareNoWait(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments)
142+
{
143+
ThrowIfShortStringIsTooLong(exchange, nameof(exchange));
144+
ThrowIfShortStringIsTooLong(type, nameof(type));
145+
146+
model.ExchangeDeclareNoWait(exchange, type, durable, autoDelete, arguments);
147+
}
148+
149+
public void ExchangeDeclarePassive(string exchange)
150+
{
151+
ThrowIfShortStringIsTooLong(exchange, nameof(exchange));
152+
153+
model.ExchangeDeclarePassive(exchange);
154+
}
155+
156+
public void ExchangeDelete(string exchange, bool ifUnused)
157+
{
158+
model.ExchangeDelete(exchange, ifUnused);
159+
}
160+
161+
public void ExchangeDeleteNoWait(string exchange, bool ifUnused)
162+
{
163+
model.ExchangeDeleteNoWait(exchange, ifUnused);
164+
}
165+
166+
public void ExchangeUnbind(string destination, string source, string routingKey, IDictionary<string, object> arguments)
167+
{
168+
model.ExchangeUnbind(destination, source, routingKey, arguments);
169+
}
170+
171+
public void ExchangeUnbindNoWait(string destination, string source, string routingKey, IDictionary<string, object> arguments)
172+
{
173+
model.ExchangeUnbindNoWait(destination, source, routingKey, arguments);
174+
}
175+
176+
public void QueueBind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments)
177+
{
178+
ThrowIfShortStringIsTooLong(queue, nameof(queue));
179+
ThrowIfShortStringIsTooLong(exchange, nameof(exchange));
180+
ThrowIfShortStringIsTooLong(routingKey, nameof(routingKey));
181+
182+
model.QueueBind(queue, exchange, routingKey, arguments);
183+
}
184+
185+
public void QueueBindNoWait(string queue, string exchange, string routingKey, IDictionary<string, object> arguments)
186+
{
187+
ThrowIfShortStringIsTooLong(queue, nameof(queue));
188+
ThrowIfShortStringIsTooLong(exchange, nameof(exchange));
189+
ThrowIfShortStringIsTooLong(routingKey, nameof(routingKey));
190+
191+
model.QueueBindNoWait(queue, exchange, routingKey, arguments);
192+
}
193+
194+
public QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments)
195+
{
196+
ThrowIfShortStringIsTooLong(queue, nameof(queue));
197+
198+
return model.QueueDeclare(queue, durable, exclusive, autoDelete, arguments);
199+
}
200+
201+
public void QueueDeclareNoWait(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments)
202+
{
203+
ThrowIfShortStringIsTooLong(queue, nameof(queue));
204+
205+
model.QueueDeclareNoWait(queue, durable, exclusive, autoDelete, arguments);
206+
}
207+
208+
public QueueDeclareOk QueueDeclarePassive(string queue)
209+
{
210+
ThrowIfShortStringIsTooLong(queue, nameof(queue));
211+
212+
return model.QueueDeclarePassive(queue);
213+
}
214+
215+
public uint MessageCount(string queue)
216+
{
217+
return model.MessageCount(queue);
218+
}
219+
220+
public uint ConsumerCount(string queue)
221+
{
222+
return model.ConsumerCount(queue);
223+
}
224+
225+
public uint QueueDelete(string queue, bool ifUnused, bool ifEmpty)
226+
{
227+
return model.QueueDelete(queue, ifUnused, ifEmpty);
228+
}
229+
230+
public void QueueDeleteNoWait(string queue, bool ifUnused, bool ifEmpty)
231+
{
232+
model.QueueDeleteNoWait(queue, ifUnused, ifEmpty);
233+
}
234+
235+
public uint QueuePurge(string queue)
236+
{
237+
return model.QueuePurge(queue);
238+
}
239+
240+
public void QueueUnbind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments)
241+
{
242+
model.QueueUnbind(queue, exchange, routingKey, arguments);
243+
}
244+
245+
public void TxCommit()
246+
{
247+
model.TxCommit();
248+
}
249+
250+
public void TxRollback()
251+
{
252+
model.TxRollback();
253+
}
254+
255+
public void TxSelect()
256+
{
257+
model.TxSelect();
258+
}
259+
260+
public bool WaitForConfirms()
261+
{
262+
return model.WaitForConfirms();
263+
}
264+
265+
public bool WaitForConfirms(TimeSpan timeout)
266+
{
267+
return model.WaitForConfirms(timeout);
268+
}
269+
270+
public bool WaitForConfirms(TimeSpan timeout, out bool timedOut)
271+
{
272+
return model.WaitForConfirms(timeout, out timedOut);
273+
}
274+
275+
public void WaitForConfirmsOrDie()
276+
{
277+
model.WaitForConfirmsOrDie();
278+
}
279+
280+
public void WaitForConfirmsOrDie(TimeSpan timeout)
281+
{
282+
model.WaitForConfirmsOrDie(timeout);
283+
}
284+
285+
public int ChannelNumber => model.ChannelNumber;
286+
287+
public ShutdownEventArgs CloseReason => model.CloseReason;
288+
289+
public IBasicConsumer DefaultConsumer
290+
{
291+
get => model.DefaultConsumer;
292+
set => model.DefaultConsumer = value;
293+
}
294+
295+
public bool IsClosed => model.IsClosed;
296+
297+
public bool IsOpen => model.IsOpen;
298+
299+
public ulong NextPublishSeqNo => model.NextPublishSeqNo;
300+
301+
public TimeSpan ContinuationTimeout
302+
{
303+
get => model.ContinuationTimeout;
304+
set => model.ContinuationTimeout = value;
305+
}
306+
307+
public event EventHandler<BasicAckEventArgs> BasicAcks
308+
{
309+
add => model.BasicAcks += value;
310+
remove => model.BasicAcks -= value;
311+
}
312+
313+
public event EventHandler<BasicNackEventArgs> BasicNacks
314+
{
315+
add => model.BasicNacks += value;
316+
remove => model.BasicNacks -= value;
317+
}
318+
319+
public event EventHandler<EventArgs> BasicRecoverOk
320+
{
321+
add => model.BasicRecoverOk += value;
322+
remove => model.BasicRecoverOk -= value;
323+
}
324+
325+
public event EventHandler<BasicReturnEventArgs> BasicReturn
326+
{
327+
add => model.BasicReturn += value;
328+
remove => model.BasicReturn -= value;
329+
}
330+
331+
public event EventHandler<CallbackExceptionEventArgs> CallbackException
332+
{
333+
add => model.CallbackException += value;
334+
remove => model.CallbackException -= value;
335+
}
336+
337+
public event EventHandler<FlowControlEventArgs> FlowControl
338+
{
339+
add => model.FlowControl += value;
340+
remove => model.FlowControl -= value;
341+
}
342+
343+
public event EventHandler<ShutdownEventArgs> ModelShutdown
344+
{
345+
add => model.ModelShutdown += value;
346+
remove => model.ModelShutdown -= value;
347+
}
348+
349+
public static void ThrowIfShortStringIsTooLong(string name, string argumentName)
350+
{
351+
if (Encoding.UTF8.GetByteCount(name) > 255)
352+
{
353+
throw new ArgumentOutOfRangeException(argumentName, name, "Value exceeds the maximum allowed length of 255 bytes.");
354+
}
355+
}
356+
}
357+
}

src/NServiceBus.Transport.RabbitMQ/Routing/ConventionalRoutingTopology.cs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,6 @@ public void Initialize(IModel channel, IEnumerable<string> receivingAddresses, I
6767
{
6868
foreach (var address in receivingAddresses.Concat(sendingAddresses))
6969
{
70-
NameValidator.ThrowIfNameIsTooLong(address);
71-
7270
channel.QueueDeclare(address, useDurableExchanges, false, false, null);
7371
CreateExchange(channel, address);
7472
channel.QueueBind(address, address, string.Empty);
@@ -121,8 +119,6 @@ void MarkTypeConfigured(Type eventType)
121119

122120
void CreateExchange(IModel channel, string exchangeName)
123121
{
124-
NameValidator.ThrowIfNameIsTooLong(exchangeName);
125-
126122
channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, useDurableExchanges);
127123
}
128124

0 commit comments

Comments
 (0)