Skip to content

Commit 0e464f7

Browse files
EvenLjjliujianjun.ljj
and
liujianjun.ljj
authored
fix triple POJO stream parent interface method not cache stream call type issue (#1481)
Co-authored-by: liujianjun.ljj <[email protected]>
1 parent db4f464 commit 0e464f7

File tree

5 files changed

+78
-2
lines changed

5 files changed

+78
-2
lines changed

remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/utils/SofaProtoUtils.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public static MethodDescriptor.MethodType mapGrpcCallType(String callType) {
9696

9797
public static Map<String, String> cacheStreamCallType(Class proxyClass) {
9898
Map<String, String> methodCallType = new ConcurrentHashMap<>();
99-
Method[] declaredMethods = proxyClass.getDeclaredMethods();
99+
Method[] declaredMethods = proxyClass.getMethods();
100100
for (Method method : declaredMethods) {
101101
String streamType = mapStreamType(method);
102102
if (StringUtils.isNotBlank(streamType)) {

test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/HelloService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
import com.alipay.sofa.rpc.transport.SofaStreamObserver;
2020

21-
public interface HelloService {
21+
public interface HelloService extends ParentService {
2222

2323
String CMD_TRIGGER_STREAM_FINISH = "finish";
2424

test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/HelloServiceImpl.java

+11
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,15 @@ public void sayHelloServerStream(ClientRequest clientRequest, SofaStreamObserver
7878
}
7979
}
8080

81+
@Override
82+
public ServerResponse sayHello(ClientRequest clientRequest) {
83+
return new ServerResponse(clientRequest.getMsg(), clientRequest.getCount());
84+
}
85+
86+
@Override
87+
public void parentSayHelloServerStream(ClientRequest clientRequest,
88+
SofaStreamObserver<ServerResponse> sofaStreamObserver) {
89+
sayHelloServerStream(clientRequest, sofaStreamObserver);
90+
}
91+
8192
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.alipay.sofa.rpc.test.triple.stream;
18+
19+
import com.alipay.sofa.rpc.transport.SofaStreamObserver;
20+
21+
/**
22+
* @author Even
23+
* @date 2025/3/4 23:49
24+
*/
25+
public interface ParentService {
26+
27+
ServerResponse sayHello(ClientRequest clientRequest);
28+
29+
void parentSayHelloServerStream(ClientRequest clientRequest, SofaStreamObserver<ServerResponse> sofaStreamObserver);
30+
31+
}

test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/TripleGenericStreamTest.java

+34
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public static void beforeClass() throws InterruptedException {
7373
.setProtocol("tri")
7474
.setDirectUrl("triple://127.0.0.1:50066");
7575
helloServiceRef = consumerConfig.refer();
76+
Thread.sleep(5000);
7677
}
7778

7879
@AfterClass
@@ -84,6 +85,39 @@ public static void afterClass() {
8485
RpcInvokeContext.removeContext();
8586
}
8687

88+
@Test
89+
public void testTripleParentCall() throws InterruptedException {
90+
ClientRequest clientRequest = new ClientRequest("hello world", 5);
91+
ServerResponse serverResponse = helloServiceRef.sayHello(clientRequest);
92+
Assert.assertEquals("hello world", serverResponse.getMsg());
93+
94+
CountDownLatch countDownLatch = new CountDownLatch(6);
95+
AtomicBoolean receivedFinish = new AtomicBoolean(false);
96+
List<ServerResponse> list = new ArrayList<>();
97+
helloServiceRef.parentSayHelloServerStream(clientRequest, new SofaStreamObserver<ServerResponse>() {
98+
@Override
99+
public void onNext(ServerResponse message) {
100+
list.add(message);
101+
countDownLatch.countDown();
102+
}
103+
104+
@Override
105+
public void onCompleted() {
106+
receivedFinish.set(true);
107+
countDownLatch.countDown();
108+
}
109+
110+
@Override
111+
public void onError(Throwable throwable) {
112+
countDownLatch.countDown();
113+
}
114+
});
115+
116+
Assert.assertTrue(countDownLatch.await(20, TimeUnit.SECONDS));
117+
Assert.assertEquals(5, list.size());
118+
Assert.assertTrue(receivedFinish.get());
119+
}
120+
87121
@Test
88122
public void testTripleBiStreamFinish() throws InterruptedException {
89123
testTripleBiStream(false);

0 commit comments

Comments
 (0)