Skip to content

Commit eab6754

Browse files
committed
Merge branch '1.0.x'
2 parents 01f0f53 + 765abb7 commit eab6754

File tree

2 files changed

+31
-4
lines changed

2 files changed

+31
-4
lines changed

rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public static Payload create(ByteBuf data) {
100100

101101
public static Payload create(ByteBuf data, @Nullable ByteBuf metadata) {
102102
try {
103-
return create(data.nioBuffer(), metadata == null ? null : metadata.nioBuffer());
103+
return create(toBytes(data), metadata != null ? toBytes(metadata) : null);
104104
} finally {
105105
data.release();
106106
if (metadata != null) {
@@ -110,7 +110,16 @@ public static Payload create(ByteBuf data, @Nullable ByteBuf metadata) {
110110
}
111111

112112
public static Payload create(Payload payload) {
113-
return create(payload.getData(), payload.hasMetadata() ? payload.getMetadata() : null);
113+
return create(
114+
toBytes(payload.data()), payload.hasMetadata() ? toBytes(payload.metadata()) : null);
115+
}
116+
117+
private static byte[] toBytes(ByteBuf byteBuf) {
118+
byte[] bytes = new byte[byteBuf.readableBytes()];
119+
byteBuf.markReaderIndex();
120+
byteBuf.readBytes(bytes);
121+
byteBuf.resetReaderIndex();
122+
return bytes;
114123
}
115124

116125
@Override

rsocket-core/src/test/java/io/rsocket/core/RSocketConnectorTest.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,14 @@ public void ensuresThatSetupPayloadCanBeRetained() {
139139
@Test
140140
public void ensuresThatMonoFromRSocketConnectorCanBeUsedForMultipleSubscriptions() {
141141
Payload setupPayload = ByteBufPayload.create("TestData", "TestMetadata");
142-
143142
Assertions.assertThat(setupPayload.refCnt()).isOne();
144143

144+
// Keep the data and metadata around so we can try changing them independently
145+
ByteBuf dataBuf = setupPayload.data();
146+
ByteBuf metadataBuf = setupPayload.metadata();
147+
dataBuf.retain();
148+
metadataBuf.retain();
149+
145150
TestClientTransport testClientTransport = new TestClientTransport();
146151
Mono<RSocket> connectionMono =
147152
RSocketConnector.create().setupPayload(setupPayload).connect(testClientTransport);
@@ -168,6 +173,15 @@ public void ensuresThatMonoFromRSocketConnectorCanBeUsedForMultipleSubscriptions
168173
.expectComplete()
169174
.verify(Duration.ofMillis(100));
170175

176+
// Changing the original data and metadata should not impact the SetupPayload
177+
dataBuf.writerIndex(dataBuf.readerIndex());
178+
dataBuf.writeChar('d');
179+
dataBuf.release();
180+
181+
metadataBuf.writerIndex(metadataBuf.readerIndex());
182+
metadataBuf.writeChar('m');
183+
metadataBuf.release();
184+
171185
Assertions.assertThat(testClientTransport.testConnection().getSent())
172186
.hasSize(1)
173187
.allMatch(
@@ -176,7 +190,11 @@ public void ensuresThatMonoFromRSocketConnectorCanBeUsedForMultipleSubscriptions
176190
return payload.getDataUtf8().equals("TestData")
177191
&& payload.getMetadataUtf8().equals("TestMetadata");
178192
})
179-
.allMatch(ReferenceCounted::release);
193+
.allMatch(
194+
byteBuf -> {
195+
System.out.println("calling release " + byteBuf.refCnt());
196+
return byteBuf.release();
197+
});
180198
Assertions.assertThat(setupPayload.refCnt()).isZero();
181199
}
182200

0 commit comments

Comments
 (0)