Skip to content

Commit 929047d

Browse files
author
Gábor Zavarkó
committed
Change the type of Exchange and Routing key to ROM
1 parent c69bf00 commit 929047d

25 files changed

+100
-59
lines changed

projects/Benchmarks/ConsumerDispatching/AsyncBasicConsumerFake.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public AsyncBasicConsumerFake(ManualResetEventSlim autoResetEvent)
1818
_autoResetEvent = autoResetEvent;
1919
}
2020

21-
public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
21+
public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
2222
{
2323
if (Interlocked.Increment(ref _current) == Count)
2424
{
@@ -28,7 +28,7 @@ public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redel
2828
return Task.CompletedTask;
2929
}
3030

31-
void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
31+
void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey,
3232
in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
3333
{
3434
if (Interlocked.Increment(ref _current) == Count)

projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs

+7-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System.Threading;
1+
using System.Text;
2+
using System.Threading;
23
using BenchmarkDotNet.Attributes;
34
using RabbitMQ.Client;
45
using RabbitMQ.Client.ConsumerDispatching;
@@ -15,9 +16,10 @@ public class ConsumerDispatcherBase
1516
private protected readonly AsyncBasicConsumerFake _consumer = new AsyncBasicConsumerFake(_autoResetEvent);
1617
protected readonly string _consumerTag = "ConsumerTag";
1718
protected readonly ulong _deliveryTag = 500UL;
18-
protected readonly string _exchange = "Exchange";
19-
protected readonly string _routingKey = "RoutingKey";
19+
protected static readonly byte[] _exchange = Encoding.UTF8.GetBytes("Exchange");
20+
protected static readonly byte[] _routingKey = Encoding.UTF8.GetBytes("RoutingKey");
2021
protected readonly ReadOnlyBasicProperties _properties = new ReadOnlyBasicProperties();
22+
protected readonly byte[] _method = new byte[512];
2123
protected readonly byte[] _body = new byte[512];
2224
}
2325

@@ -41,7 +43,7 @@ public void AsyncConsumerDispatcher()
4143
{
4244
for (int i = 0; i < Count; i++)
4345
{
44-
_dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _body);
46+
_dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _method, _body);
4547
}
4648
_autoResetEvent.Wait();
4749
_autoResetEvent.Reset();
@@ -59,7 +61,7 @@ public void ConsumerDispatcher()
5961
{
6062
for (int i = 0; i < Count; i++)
6163
{
62-
_dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _body);
64+
_dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _method, _body);
6365
}
6466
_autoResetEvent.Wait();
6567
_autoResetEvent.Reset();

projects/Benchmarks/WireFormatting/MethodSerialization.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public override void SetUp()
4545
}
4646

4747
[Benchmark]
48-
public object BasicDeliverRead() => new BasicDeliver(_buffer.Span)._consumerTag; // return one property to not box when returning an object instead
48+
public object BasicDeliverRead() => new BasicDeliver(_buffer)._consumerTag; // return one property to not box when returning an object instead
4949

5050
[Benchmark]
5151
public int BasicPublishWrite() => _basicPublish.WriteTo(_buffer.Span);

projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,8 @@ public virtual Task HandleBasicConsumeOk(string consumerTag)
110110
public virtual Task HandleBasicDeliver(string consumerTag,
111111
ulong deliveryTag,
112112
bool redelivered,
113-
string exchange,
114-
string routingKey,
113+
ReadOnlyMemory<byte> exchange,
114+
ReadOnlyMemory<byte> routingKey,
115115
in ReadOnlyBasicProperties properties,
116116
ReadOnlyMemory<byte> body)
117117
{
@@ -165,7 +165,7 @@ void IBasicConsumer.HandleBasicConsumeOk(string consumerTag)
165165
throw new InvalidOperationException("Should never be called. Enable 'DispatchConsumersAsync'.");
166166
}
167167

168-
void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
168+
void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
169169
{
170170
throw new InvalidOperationException("Should never be called. Enable 'DispatchConsumersAsync'.");
171171
}

projects/RabbitMQ.Client/client/api/DefaultBasicConsumer.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,8 @@ public virtual void HandleBasicConsumeOk(string consumerTag)
150150
public virtual void HandleBasicDeliver(string consumerTag,
151151
ulong deliveryTag,
152152
bool redelivered,
153-
string exchange,
154-
string routingKey,
153+
ReadOnlyMemory<byte> exchange,
154+
ReadOnlyMemory<byte> routingKey,
155155
in ReadOnlyBasicProperties properties,
156156
ReadOnlyMemory<byte> body)
157157
{

projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ public interface IAsyncBasicConsumer
4949
Task HandleBasicDeliver(string consumerTag,
5050
ulong deliveryTag,
5151
bool redelivered,
52-
string exchange,
53-
string routingKey,
52+
ReadOnlyMemory<byte> exchange,
53+
ReadOnlyMemory<byte> routingKey,
5454
in ReadOnlyBasicProperties properties,
5555
ReadOnlyMemory<byte> body);
5656

projects/RabbitMQ.Client/client/api/IBasicConsumer.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ public interface IBasicConsumer
9292
void HandleBasicDeliver(string consumerTag,
9393
ulong deliveryTag,
9494
bool redelivered,
95-
string exchange,
96-
string routingKey,
95+
ReadOnlyMemory<byte> exchange,
96+
ReadOnlyMemory<byte> routingKey,
9797
in ReadOnlyBasicProperties properties,
9898
ReadOnlyMemory<byte> body);
9999

projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public override async Task HandleBasicConsumeOk(string consumerTag)
7171
}
7272

7373
///<summary>Fires the Received event.</summary>
74-
public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
74+
public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
7575
{
7676
// No need to call base, it's empty.
7777
return _receivedWrapper.InvokeAsync(this, new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body));

projects/RabbitMQ.Client/client/events/BasicDeliverEventArgs.cs

+4-4
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ public BasicDeliverEventArgs()
4747
public BasicDeliverEventArgs(string consumerTag,
4848
ulong deliveryTag,
4949
bool redelivered,
50-
string exchange,
51-
string routingKey,
50+
ReadOnlyMemory<byte> exchange,
51+
ReadOnlyMemory<byte> routingKey,
5252
in ReadOnlyBasicProperties properties,
5353
ReadOnlyMemory<byte> body)
5454
{
@@ -77,13 +77,13 @@ public BasicDeliverEventArgs(string consumerTag,
7777

7878
///<summary>The exchange the message was originally published
7979
///to.</summary>
80-
public string Exchange { get; set; }
80+
public ReadOnlyMemory<byte> Exchange { get; set; }
8181

8282
///<summary>The AMQP "redelivered" flag.</summary>
8383
public bool Redelivered { get; set; }
8484

8585
///<summary>The routing key used when the message was
8686
///originally published.</summary>
87-
public string RoutingKey { get; set; }
87+
public ReadOnlyMemory<byte> RoutingKey { get; set; }
8888
}
8989
}

projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public override void HandleBasicConsumeOk(string consumerTag)
8484
/// Accessing the body at a later point is unsafe as its memory can
8585
/// be already released.
8686
/// </remarks>
87-
public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
87+
public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
8888
{
8989
base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
9090
Received?.Invoke(

projects/RabbitMQ.Client/client/framing/BasicDeliver.cs

+8-8
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,16 @@ namespace RabbitMQ.Client.Framing.Impl
4141
public readonly string _consumerTag;
4242
public readonly ulong _deliveryTag;
4343
public readonly bool _redelivered;
44-
public readonly string _exchange;
45-
public readonly string _routingKey;
44+
public readonly ReadOnlyMemory<byte> _exchange;
45+
public readonly ReadOnlyMemory<byte> _routingKey;
4646

47-
public BasicDeliver(ReadOnlySpan<byte> span)
47+
public BasicDeliver(ReadOnlyMemory<byte> data)
4848
{
49-
int offset = WireFormatting.ReadShortstr(span, out _consumerTag);
50-
offset += WireFormatting.ReadLonglong(span.Slice(offset), out _deliveryTag);
51-
offset += WireFormatting.ReadBits(span.Slice(offset), out _redelivered);
52-
offset += WireFormatting.ReadShortstr(span.Slice(offset), out _exchange);
53-
WireFormatting.ReadShortstr(span.Slice(offset), out _routingKey);
49+
int offset = WireFormatting.ReadShortstr(data.Span, out _consumerTag);
50+
offset += WireFormatting.ReadLonglong(data.Span.Slice(offset), out _deliveryTag);
51+
offset += WireFormatting.ReadBits(data.Span.Slice(offset), out _redelivered);
52+
offset += WireFormatting.ReadShortMemory(data.Slice(offset), out _exchange);
53+
WireFormatting.ReadShortMemory(data.Slice(offset), out _routingKey);
5454
}
5555

5656
public ProtocolCommandId ProtocolCommandId => ProtocolCommandId.BasicDeliver;

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -600,8 +600,7 @@ protected void HandleBasicConsumeOk(in IncomingCommand cmd)
600600

601601
protected void HandleBasicDeliver(in IncomingCommand cmd)
602602
{
603-
var method = new Client.Framing.Impl.BasicDeliver(cmd.MethodBytes.Span);
604-
cmd.ReturnMethodBuffer();
603+
var method = new Client.Framing.Impl.BasicDeliver(cmd.MethodBytes);
605604
var header = new ReadOnlyBasicProperties(cmd.HeaderBytes.Span);
606605
cmd.ReturnHeaderBuffer();
607606

@@ -613,6 +612,7 @@ protected void HandleBasicDeliver(in IncomingCommand cmd)
613612
method._routingKey,
614613
header,
615614
cmd.Body,
615+
cmd.TakeoverMethod(),
616616
cmd.TakeoverBody());
617617
}
618618

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs

+5
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ protected override async Task ProcessChannelAsync()
3939
}
4040
finally
4141
{
42+
if (work.RentedMethodArray != null)
43+
{
44+
ArrayPool<byte>.Shared.Return(work.RentedMethodArray);
45+
}
46+
4247
if (work.RentedArray != null)
4348
{
4449
ArrayPool<byte>.Shared.Return(work.RentedArray);

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs

+5
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ protected override async Task ProcessChannelAsync()
4949
}
5050
finally
5151
{
52+
if (work.RentedMethodArray != null)
53+
{
54+
ArrayPool<byte>.Shared.Return(work.RentedMethodArray);
55+
}
56+
5257
if (work.RentedArray != null)
5358
{
5459
ArrayPool<byte>.Shared.Return(work.RentedArray);

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs

+7-5
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,11 @@ public void HandleBasicConsumeOk(IBasicConsumer consumer, string consumerTag)
5454
}
5555

5656
public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered,
57-
string exchange, string routingKey, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory<byte> body, byte[] rentedArray)
57+
ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory<byte> body, byte[] rentedMethodArray, byte[] rentedArray)
5858
{
5959
if (!IsShutdown)
6060
{
61-
_writer.TryWrite(new WorkStruct(GetConsumerOrDefault(consumerTag), consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, rentedArray));
61+
_writer.TryWrite(new WorkStruct(GetConsumerOrDefault(consumerTag), consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, rentedMethodArray, rentedArray));
6262
}
6363
}
6464

@@ -108,10 +108,11 @@ protected readonly struct WorkStruct
108108
public readonly string? ConsumerTag;
109109
public readonly ulong DeliveryTag;
110110
public readonly bool Redelivered;
111-
public readonly string? Exchange;
112-
public readonly string? RoutingKey;
111+
public readonly ReadOnlyMemory<byte> Exchange;
112+
public readonly ReadOnlyMemory<byte> RoutingKey;
113113
public readonly ReadOnlyBasicProperties BasicProperties;
114114
public readonly ReadOnlyMemory<byte> Body;
115+
public readonly byte[]? RentedMethodArray;
115116
public readonly byte[]? RentedArray;
116117
public readonly ShutdownEventArgs? Reason;
117118
public readonly WorkType WorkType;
@@ -133,7 +134,7 @@ public WorkStruct(IBasicConsumer consumer, ShutdownEventArgs reason)
133134
}
134135

135136
public WorkStruct(IBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered,
136-
string exchange, string routingKey, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory<byte> body, byte[] rentedArray)
137+
ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory<byte> body, byte[] rentedMethodArray, byte[] rentedArray)
137138
{
138139
WorkType = WorkType.Deliver;
139140
Consumer = consumer;
@@ -144,6 +145,7 @@ public WorkStruct(IBasicConsumer consumer, string consumerTag, ulong deliveryTag
144145
RoutingKey = routingKey;
145146
BasicProperties = basicProperties;
146147
Body = body;
148+
RentedMethodArray = rentedMethodArray;
147149
RentedArray = rentedArray;
148150
Reason = default;
149151
}

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/FallbackConsumer.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ void IBasicConsumer.HandleBasicConsumeOk(string consumerTag)
3737
ESLog.Info($"Unhandled {nameof(IBasicConsumer.HandleBasicConsumeOk)} for tag {consumerTag}");
3838
}
3939

40-
void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties,
40+
void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, in ReadOnlyBasicProperties properties,
4141
ReadOnlyMemory<byte> body)
4242
{
4343
ESLog.Info($"Unhandled {nameof(IBasicConsumer.HandleBasicDeliver)} for tag {consumerTag}");
@@ -66,7 +66,7 @@ Task IAsyncBasicConsumer.HandleBasicConsumeOk(string consumerTag)
6666
return Task.CompletedTask;
6767
}
6868

69-
Task IAsyncBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties,
69+
Task IAsyncBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, in ReadOnlyBasicProperties properties,
7070
ReadOnlyMemory<byte> body)
7171
{
7272
((IBasicConsumer)this).HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs

+3-2
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,11 @@ internal interface IConsumerDispatcher
4848
void HandleBasicDeliver(string consumerTag,
4949
ulong deliveryTag,
5050
bool redelivered,
51-
string exchange,
52-
string routingKey,
51+
ReadOnlyMemory<byte> exchange,
52+
ReadOnlyMemory<byte> routingKey,
5353
in ReadOnlyBasicProperties basicProperties,
5454
ReadOnlyMemory<byte> body,
55+
byte[] rentedMethodArray,
5556
byte[] rentedArray);
5657

5758
void HandleBasicCancelOk(string consumerTag);

projects/RabbitMQ.Client/client/impl/IncomingCommand.cs

+5
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ public IncomingCommand(ProtocolCommandId commandId, ReadOnlyMemory<byte> methodB
3232
_rentedBodyArray = rentedBodyArray;
3333
}
3434

35+
public byte[] TakeoverMethod()
36+
{
37+
return _rentedMethodBytes;
38+
}
39+
3540
public byte[] TakeoverBody()
3641
{
3742
return _rentedBodyArray;

projects/RabbitMQ.Client/client/impl/WireFormatting.Read.cs

+21
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,27 @@ public static int ReadShortstr(ReadOnlySpan<byte> span, out string value)
192192
return ThrowArgumentOutOfRangeException(span.Length, byteCount + 1);
193193
}
194194

195+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
196+
public static int ReadShortMemory(ReadOnlyMemory<byte> data, out ReadOnlyMemory<byte> value)
197+
{
198+
int byteCount = data.Span[0];
199+
if (byteCount == 0)
200+
{
201+
value = default;
202+
return 1;
203+
}
204+
205+
// equals data.Length >= byteCount + 1
206+
if (data.Length > byteCount)
207+
{
208+
value = data.Slice(1, byteCount);
209+
return 1 + byteCount;
210+
}
211+
212+
value = default;
213+
return ThrowArgumentOutOfRangeException(data.Length, byteCount + 1);
214+
}
215+
195216
[MethodImpl(MethodImplOptions.AggressiveInlining)]
196217
public static int ReadBits(ReadOnlySpan<byte> span, out bool val)
197218
{

0 commit comments

Comments
 (0)