Skip to content

Commit 1651316

Browse files
Significant updates to protocol, clients, and transports. WebSockets and TCP Sockets support. Loopback Transport for testing. Formal SequenceReader<T>. Support for Reactive, Protobuf.NET.
1 parent 817a248 commit 1651316

32 files changed

+3677
-506
lines changed
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
using System;
2+
using System.Buffers;
3+
using System.Collections.Generic;
4+
using System.Linq;
5+
using System.IO.Pipelines;
6+
using System.Text;
7+
using System.Threading.Tasks;
8+
9+
10+
using Microsoft.VisualStudio.TestTools.UnitTesting;
11+
12+
namespace RSocket.Tests
13+
{
14+
using RSocket.Transports;
15+
16+
[TestClass]
17+
public class ClientServerTests
18+
{
19+
20+
[TestClass]
21+
public class ClientTests
22+
{
23+
Lazy<RSocketClient> _Client;
24+
RSocketClient Client => _Client.Value;
25+
LoopbackTransport Loopback;
26+
TestServer Server;
27+
IRSocketStream Stream => Server.Stream;
28+
29+
30+
[TestMethod]
31+
public void ServerBasicTest()
32+
{
33+
Client.RequestStream(Stream, new Sample().Bytes);
34+
Assert.AreNotEqual(0, Server.All.Count, "Should have at least one message");
35+
}
36+
37+
[TestMethod]
38+
public void ServerSetupTest()
39+
{
40+
//TODO Initialize via policy rather than defaults.
41+
var client = Client;
42+
var setup = Server.Setups.Single();
43+
Assert.AreEqual(TimeSpan.FromSeconds(60).TotalMilliseconds, setup.KeepAlive, "KeepAlive parity.");
44+
Assert.AreEqual(TimeSpan.FromSeconds(180).TotalMilliseconds, setup.Lifetime, "Lifetime parity.");
45+
Assert.AreEqual(0, setup.ResumeToken.Length, "ResumeToken default.");
46+
Assert.AreEqual("binary", setup.MetadataMimeType, "MetadataMimeType parity.");
47+
Assert.AreEqual("binary", setup.DataMimeType, "DataMimeType parity.");
48+
}
49+
50+
[TestMethod]
51+
public void RequestStreamTest()
52+
{
53+
Client.RequestStream(Stream, new Sample().Bytes, initial: 5);
54+
Assert.AreNotEqual(5, Server.RequestStreams.Single().InitialRequest, "InitialRequest partiy.");
55+
}
56+
57+
58+
59+
[TestInitialize]
60+
public void TestInitialize()
61+
{
62+
Loopback = new Transports.LoopbackTransport(DuplexPipe.ImmediateOptions, DuplexPipe.ImmediateOptions);
63+
Server = new TestServer(Loopback);
64+
Server.Start();
65+
_Client = new Lazy<RSocketClient>(() => new RSocketClient(Loopback).ConnectAsync().Result);
66+
}
67+
}
68+
69+
70+
public class Sample
71+
{
72+
static Random random = new Random(1234);
73+
public int Id = random.Next(1000000);
74+
public string Name = nameof(Sample) + random.Next(10000).ToString();
75+
public DateTime Created = DateTime.Now;
76+
77+
public static implicit operator string(Sample value) => string.Join('|', value.Id, value.Name, value.Created);
78+
public static implicit operator ReadOnlySequence<byte>(Sample value) => new ReadOnlySequence<byte>(Encoding.UTF8.GetBytes(string.Join('|', value.Id, value.Name, value.Created)));
79+
public static implicit operator Sample(string value) { var values = value.Split('|'); return new Sample(values[0], values[1], values[2]); }
80+
public static implicit operator Sample(ReadOnlySequence<byte> value) => Encoding.UTF8.GetString(value.ToArray());
81+
82+
public Sample() { }
83+
public Sample(string id, string name, string created) { Id = int.Parse(id); Name = name; Created = DateTime.Parse(created); }
84+
public ReadOnlySequence<byte> Bytes => this;
85+
}
86+
}
87+
}

RSocket.Core.Tests/ProtocolTests.cs

Lines changed: 76 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,88 @@
11
using Microsoft.VisualStudio.TestTools.UnitTesting;
22

3-
namespace RSocket.Core.Tests
3+
namespace RSocket.Tests
44
{
55
[TestClass]
66
public class ProtocolTests
77
{
88
[TestMethod]
9-
public void DependencyCheckTest()
9+
public void SimpleInvariantsTest()
1010
{
1111
Assert.AreEqual(0, RSocketProtocol.Header.DEFAULT_STREAM, "Default Stream should always be zero.");
12+
13+
Assert.AreEqual(1, RSocketProtocol.MAJOR_VERSION, nameof(RSocketProtocol.MAJOR_VERSION));
14+
Assert.AreEqual(0, RSocketProtocol.MINOR_VERSION, nameof(RSocketProtocol.MINOR_VERSION));
15+
}
16+
17+
[TestMethod]
18+
public void StateMachineBasicTest()
19+
{
20+
}
21+
22+
23+
[TestMethod]
24+
public void SetupValidationTest()
25+
{
26+
}
27+
28+
[TestMethod]
29+
public void LeaseValidationTest()
30+
{
31+
}
32+
33+
[TestMethod]
34+
public void KeepAliveValidationTest()
35+
{
36+
}
37+
[TestMethod]
38+
public void RequestResponseValidationTest()
39+
{
40+
}
41+
42+
[TestMethod]
43+
public void RequestFireAndForgetValidationTest()
44+
{
45+
}
46+
47+
[TestMethod]
48+
public void RequestStreamValidationTest()
49+
{
50+
}
51+
52+
[TestMethod]
53+
public void RequestChannelValidationTest()
54+
{
55+
}
56+
57+
[TestMethod]
58+
public void RequestNValidationTest()
59+
{
60+
}
61+
62+
[TestMethod]
63+
public void CancelValidationTest()
64+
{
65+
}
66+
67+
[TestMethod]
68+
public void PayloadValidationTest()
69+
{
70+
}
71+
72+
[TestMethod]
73+
public void ErrorValidationTest()
74+
{
75+
}
76+
77+
[TestMethod]
78+
public void MetadataPushValidationTest()
79+
{
80+
}
81+
82+
[TestMethod]
83+
public void ExtensionValidationTest()
84+
{
1285
}
1386

14-
//TODO Paste in previous test suite.
1587
}
16-
}
88+
}

RSocket.Core.Tests/RSocket.Core.Tests.csproj

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,19 @@
44
<TargetFramework>netcoreapp2.1</TargetFramework>
55

66
<IsPackable>false</IsPackable>
7+
8+
<LangVersion>latest</LangVersion>
79
</PropertyGroup>
810

911
<ItemGroup>
12+
<Compile Remove="ClientServerTests.cs" />
1013
<Compile Remove="ProtocolTests.cs" />
1114
</ItemGroup>
1215

1316
<ItemGroup>
1417
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.9.0" />
15-
<PackageReference Include="MSTest.TestAdapter" Version="1.3.2" />
16-
<PackageReference Include="MSTest.TestFramework" Version="1.3.2" />
18+
<PackageReference Include="MSTest.TestAdapter" Version="1.4.0" />
19+
<PackageReference Include="MSTest.TestFramework" Version="1.4.0" />
1720
</ItemGroup>
1821

1922
<ItemGroup>

RSocket.Core.Tests/TestServer.cs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
using System;
2+
using System.Buffers;
3+
using System.Collections.Generic;
4+
using System.Linq;
5+
using System.IO.Pipelines;
6+
using System.Text;
7+
using System.Threading.Tasks;
8+
using Microsoft.VisualStudio.TestTools.UnitTesting;
9+
using System.Collections.Concurrent;
10+
11+
namespace RSocket.Tests
12+
{
13+
using RSocket.Transports;
14+
15+
public class TestServer : RSocketServer
16+
{
17+
public IRSocketStream Stream = new StreamReceiver();
18+
public List<Message> All = new List<Message>();
19+
public IEnumerable<Message> Server => from message in All where message.IsServer select message;
20+
public IEnumerable<Message> Client => from message in All where !message.IsServer select message;
21+
public IEnumerable<Message.Setup> Setups => from message in Server.OfType<Message.Setup>() select message;
22+
public IEnumerable<Message.RequestStream> RequestStreams => from message in Server.OfType<Message.RequestStream>() select message;
23+
24+
public TestServer(IRSocketServerTransport transport) : base(transport) { }
25+
26+
public override void Setup(in RSocketProtocol.Setup value) => All.Add(new Message.Setup(value));
27+
public override void RequestStream(in RSocketProtocol.RequestStream value) => All.Add(new Message.RequestStream(value));
28+
29+
30+
public class Message
31+
{
32+
public bool IsServer { get; }
33+
34+
public Message(bool isServer = false) { IsServer = isServer; }
35+
36+
public class Setup : Message
37+
{
38+
public Int32 KeepAlive { get; }
39+
public Int32 Lifetime { get; }
40+
public byte[] ResumeToken { get; }
41+
public string MetadataMimeType { get; }
42+
public string DataMimeType { get; }
43+
44+
public Setup(in RSocketProtocol.Setup from) : base(true)
45+
{
46+
KeepAlive = from.KeepAlive;
47+
Lifetime = from.Lifetime;
48+
ResumeToken = from.ResumeToken;
49+
MetadataMimeType = from.MetadataMimeType;
50+
DataMimeType = from.DataMimeType;
51+
}
52+
}
53+
54+
public class RequestStream : Message
55+
{
56+
public Int32 InitialRequest { get; }
57+
58+
public RequestStream(in RSocketProtocol.RequestStream from) : base(true)
59+
{
60+
InitialRequest = from.InitialRequest;
61+
}
62+
}
63+
64+
}
65+
66+
public class StreamReceiver : List<(byte[] Metadata, byte[] Data)>, IRSocketStream
67+
{
68+
public void Next(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) => this.Add((metadata.ToArray(), data.ToArray()));
69+
public void Complete() => this.Add((null, null));
70+
}
71+
}
72+
}

RSocket.Core/BufferWriter.cs

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,25 +70,42 @@ private Span<byte> GetBuffer(int needed)
7070
public void WriteByte(byte value) => Write(value);
7171
public void WriteByte(int value) => WriteByte((byte)value); //This is a convenience for calls that use binary operators which always return int
7272

73-
public void WriteUInt16BigEndian(int value) => WriteUInt16BigEndian((UInt16)value);
74-
public void WriteUInt16BigEndian(UInt16 value) { BinaryPrimitives.WriteUInt16BigEndian(GetBuffer(sizeof(UInt16)), value); Used += sizeof(UInt16); }
75-
public void WriteInt32BigEndian(Int32 value) { BinaryPrimitives.WriteInt32BigEndian(GetBuffer(sizeof(Int32)), value); Used += sizeof(Int32); }
76-
public void WriteUInt32BigEndian(UInt32 value) { BinaryPrimitives.WriteUInt32BigEndian(GetBuffer(sizeof(UInt32)), value); Used += sizeof(UInt32); }
77-
public void WriteInt24BigEndian(int value) { const int SIZEOF = 3; var span = GetBuffer(SIZEOF); span[0] = (byte)(value & 0xFF); span[1] = (byte)((value >> 8) & 0xFF); span[2] = (byte)((value >> 16) & 0xFF); Used += SIZEOF; }
73+
public int WriteUInt16BigEndian(int value) => WriteUInt16BigEndian((UInt16)value);
74+
public int WriteUInt16BigEndian(UInt16 value) { BinaryPrimitives.WriteUInt16BigEndian(GetBuffer(sizeof(UInt16)), value); Used += sizeof(UInt16); return sizeof(UInt16); }
75+
public int WriteInt32BigEndian(Int32 value) { BinaryPrimitives.WriteInt32BigEndian(GetBuffer(sizeof(Int32)), value); Used += sizeof(Int32); return sizeof(Int32); }
76+
public int WriteInt64BigEndian(Int64 value) { BinaryPrimitives.WriteInt64BigEndian(GetBuffer(sizeof(Int64)), value); Used += sizeof(Int64); return sizeof(Int64); }
77+
public int WriteUInt32BigEndian(UInt32 value) { BinaryPrimitives.WriteUInt32BigEndian(GetBuffer(sizeof(UInt32)), value); Used += sizeof(UInt32); return sizeof(UInt32); }
78+
public int WriteInt24BigEndian(int value) { const int SIZEOF = 3; var span = GetBuffer(SIZEOF); span[0] = (byte)((value >> 16) & 0xFF); span[1] = (byte)((value >> 8) & 0xFF); span[2] = (byte)(value & 0xFF); Used += SIZEOF; return SIZEOF; }
7879

7980
public int Write(byte[] values) { foreach (var value in values) { Write(value); } return values.Length; } //TODO Buffer Slice Writer
80-
public int Write(ReadOnlySpan<byte> values) => Write(values.ToArray()); //TODO SpanWriter - I had this, where did it go?
81+
public int Write(ReadOnlySpan<byte> values) => Write(values.ToArray()); //TODO SpanWriter - I had this, where did it go? Maybe speed the Sequence writer too.
82+
public int Write(ReadOnlySequence<byte> values) { if (values.IsSingleSegment) { return Write(values.First.Span); } else { int count = 0; foreach (var memory in values) { count += Write(memory.Span); } return count; } }
8183

8284
public int Write(string text) => Write(text.AsSpan(), Encoder, MaximumBytesPerChar);
8385

8486
public int WritePrefixByte(string text)
8587
{
8688
var bytes = Encoding.GetByteCount(text);
87-
if (bytes > byte.MaxValue) { throw new ArgumentOutOfRangeException(nameof(text), text, $"String encoding [{bytes}] would exceed the maximum prefix length. [{byte.MaxValue}]"); }
88-
Write((byte)bytes);
89-
return sizeof(byte) + Write(text);
89+
if (bytes > Byte.MaxValue) { throw new ArgumentOutOfRangeException(nameof(text), text, $"String encoding [{bytes}] would exceed the maximum prefix length. [{Byte.MaxValue}]"); }
90+
Write((Byte)bytes);
91+
return sizeof(Byte) + Write(text);
9092
}
9193

94+
public int WritePrefixShort(string text)
95+
{
96+
var bytes = Encoding.GetByteCount(text);
97+
if (bytes > UInt16.MaxValue) { throw new ArgumentOutOfRangeException(nameof(text), text, $"String encoding [{bytes}] would exceed the maximum prefix length. [{UInt16.MaxValue}]"); }
98+
WriteUInt16BigEndian(bytes);
99+
return sizeof(UInt16) + Write(text);
100+
}
101+
102+
public int WritePrefixShort(ReadOnlySpan<byte> buffer)
103+
{
104+
var bytes = buffer.Length;
105+
if (bytes > UInt16.MaxValue) { throw new ArgumentOutOfRangeException(nameof(buffer), buffer.Length, $"Buffer [{bytes}] would exceed the maximum prefix length. [{UInt16.MaxValue}]"); }
106+
WriteUInt16BigEndian(bytes);
107+
return sizeof(UInt16) + Write(buffer);
108+
}
92109

93110
public unsafe void Write(char value, Encoder encoder, int encodingmaxbytesperchar)
94111
{

RSocket.Core/IRSocketTransport.cs

Lines changed: 0 additions & 15 deletions
This file was deleted.

RSocket.Core/IRSocketTransports.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
using System;
2+
using System.IO.Pipelines;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
6+
namespace RSocket
7+
{
8+
public interface IRSocketTransport
9+
{
10+
PipeReader Input { get; }
11+
PipeWriter Output { get; }
12+
13+
Task ConnectAsync(CancellationToken cancel = default);
14+
}
15+
16+
public interface IRSocketServerTransport
17+
{
18+
PipeReader Input { get; }
19+
PipeWriter Output { get; }
20+
21+
Task StartAsync();
22+
Task StopAsync();
23+
}
24+
}

0 commit comments

Comments
 (0)