From 9884802f9705856e21903eddbe35ddb02e2766b9 Mon Sep 17 00:00:00 2001 From: Gooddbird <1753009868@qq.com> Date: Sat, 2 Jul 2022 18:01:01 +0800 Subject: [PATCH] fix bug that async rpc call timeout isn't useful --- conf/test_http_server.xml | 4 ++-- testcases/test_http_server.cc | 2 +- testcases/test_rpc_server1.cc | 6 ++++- tinyrpc/comm/error_code.h | 1 + tinyrpc/net/reactor.cc | 5 ++++ tinyrpc/net/tcp/io_thread.cc | 24 ++++++++++++++++--- tinyrpc/net/tcp/io_thread.h | 6 ++++- tinyrpc/net/tcp/tcp_client.cc | 2 +- tinyrpc/net/timer.cc | 13 ++++++---- .../net/tinypb/tinypb_rpc_async_channel.cc | 14 +++++++---- tinyrpc/net/tinypb/tinypb_rpc_channel.cc | 5 ++-- tinyrpc/net/tinypb/tinypb_rpc_channel.h | 5 ++-- 12 files changed, 65 insertions(+), 22 deletions(-) diff --git a/conf/test_http_server.xml b/conf/test_http_server.xml index 8d76c66..b2f58ef 100644 --- a/conf/test_http_server.xml +++ b/conf/test_http_server.xml @@ -32,7 +32,7 @@ 75 - 10 + 2 3 @@ -47,4 +47,4 @@ HTTP - \ No newline at end of file + diff --git a/testcases/test_http_server.cc b/testcases/test_http_server.cc index 63a7c7a..b0b4487 100644 --- a/testcases/test_http_server.cc +++ b/testcases/test_http_server.cc @@ -169,7 +169,7 @@ class AsyncRPCTestServlet: public tinyrpc::HttpServlet { QueryService_Stub stub(async_channel.get()); tinyrpc::TinyPbRpcController rpc_controller; - rpc_controller.SetTimeout(5000); + rpc_controller.SetTimeout(2000); AppDebugLog << "AsyncRPCTestServlet begin to call RPC async"; stub.query_age(&rpc_controller, &rpc_req, &rpc_res, NULL); diff --git a/testcases/test_rpc_server1.cc b/testcases/test_rpc_server1.cc index 04d561a..84ffba2 100644 --- a/testcases/test_rpc_server1.cc +++ b/testcases/test_rpc_server1.cc @@ -100,6 +100,10 @@ class QueryServiceImpl : public QueryService { ::google::protobuf::Closure* done) { AppInfoLog << "QueryServiceImpl.query_age, req={"<< request->ShortDebugString() << "}"; + AppInfoLog << "QueryServiceImpl.query_age, sleep 6 s begin"; + sleep(6); + AppInfoLog << "QueryServiceImpl.query_age, sleep 6 s end"; + response->set_ret_code(0); response->set_res_info("OK"); response->set_req_no(request->req_no()); @@ -110,7 +114,7 @@ class QueryServiceImpl : public QueryService { done->Run(); } - // AppInfoLog << "QueryServiceImpl.query_age, req={"<< request->ShortDebugString() << "}, res={" << response->ShortDebugString() << "}"; + AppInfoLog << "QueryServiceImpl.query_age, req={"<< request->ShortDebugString() << "}, res={" << response->ShortDebugString() << "}"; } }; diff --git a/tinyrpc/comm/error_code.h b/tinyrpc/comm/error_code.h index 7ede9dd..1a98b14 100644 --- a/tinyrpc/comm/error_code.h +++ b/tinyrpc/comm/error_code.h @@ -27,6 +27,7 @@ const int ERROR_SERVICE_NOT_FOUND = SYS_ERROR_PREFIX(0008); // not found serv const int ERROR_METHOD_NOT_FOUND = SYS_ERROR_PREFIX(0009); // not found method const int ERROR_PARSE_SERVICE_NAME = SYS_ERROR_PREFIX(0010); // not found service name +const int ERROR_ASYNC_RPC_CALL_SINGLE_IOTHREAD = SYS_ERROR_PREFIX(0011); // not supoort async rpc call when only have single iothread } // namespace tinyrpc diff --git a/tinyrpc/net/reactor.cc b/tinyrpc/net/reactor.cc index 39d63ad..caf744e 100644 --- a/tinyrpc/net/reactor.cc +++ b/tinyrpc/net/reactor.cc @@ -265,6 +265,11 @@ void Reactor::loop() { ErrorLog << "socket [" << fd << "] occur other unknow event:[" << one_event.events << "], need unregister this socket"; delEventInLoopThread(fd); } else { + // if timer event, direct excute + if (fd == m_timer_fd) { + read_cb(); + continue; + } if (one_event.events & EPOLLIN) { // DebugLog << "socket [" << fd << "] occur read event"; Mutex::Lock lock(m_mutex); diff --git a/tinyrpc/net/tcp/io_thread.cc b/tinyrpc/net/tcp/io_thread.cc index 6f79bf8..bab486b 100644 --- a/tinyrpc/net/tcp/io_thread.cc +++ b/tinyrpc/net/tcp/io_thread.cc @@ -8,6 +8,7 @@ #include "tinyrpc/net/tcp/tcp_server.h" #include "tinyrpc/net/tcp/tcp_connection_time_wheel.h" #include "tinyrpc/coroutine/coroutine.h" +#include "tinyrpc/coroutine/coroutine_pool.h" #include "tinyrpc/comm/config.h" @@ -149,17 +150,34 @@ void IOThreadPool::addTaskByIndex(int index, std::function cb) { } } -void IOThreadPool::addCoroutineRandomThread(Coroutine::ptr cor, bool self /* = false*/) { +void IOThreadPool::addCoroutineToRandomThread(Coroutine::ptr cor, bool self /* = false*/) { srand(time(0)); int i = 0; while (1) { - i = rand() % (m_size - 1); + i = rand() % (m_size); if (!self && m_io_threads[i]->getPthreadId() == t_cur_io_thread->getPthreadId()) { - continue; + i++; + if (i == m_size) { + i -= 2; + } } break; } m_io_threads[i]->getReactor()->addCoroutine(cor, true); + // if (m_io_threads[m_index]->getPthreadId() == t_cur_io_thread->getPthreadId()) { + // m_index++; + // if (m_index == m_size || m_index == -1) { + // m_index = 0; + // } + // } +} + + +Coroutine::ptr IOThreadPool::addCoroutineToRandomThread(std::function cb, bool self/* = false*/) { + Coroutine::ptr cor = GetCoroutinePool()->getCoroutineInstanse(); + cor->setCallBack(cb); + addCoroutineToRandomThread(cor, self); + return cor; } diff --git a/tinyrpc/net/tcp/io_thread.h b/tinyrpc/net/tcp/io_thread.h index 0553c62..60e5751 100644 --- a/tinyrpc/net/tcp/io_thread.h +++ b/tinyrpc/net/tcp/io_thread.h @@ -70,9 +70,13 @@ class IOThreadPool { void addTaskByIndex(int index, std::function cb); + void addCoroutineToRandomThread(Coroutine::ptr cor, bool self = false); + // add a coroutine to random thread in io thread pool // self = false, means random thread cann't be current thread - void addCoroutineRandomThread(Coroutine::ptr cor, bool self = false); + // please free cor, or causes memory leak + // call returnCoroutine(cor) to free coroutine + Coroutine::ptr addCoroutineToRandomThread(std::function cb, bool self = false); private: int m_size {0}; diff --git a/tinyrpc/net/tcp/tcp_client.cc b/tinyrpc/net/tcp/tcp_client.cc index 97952c7..c641dd1 100644 --- a/tinyrpc/net/tcp/tcp_client.cc +++ b/tinyrpc/net/tcp/tcp_client.cc @@ -116,7 +116,7 @@ int TcpClient::sendAndRecvTinyPb(const std::string& msg_no, TinyPbStruct::pb_ptr } while (!m_connection->getResPackageData(msg_no, res)) { - + DebugLog << "redo getResPackageData"; m_connection->input(); if (m_connection->getOverTimerFlag()) { diff --git a/tinyrpc/net/timer.cc b/tinyrpc/net/timer.cc index ff0ff05..97fb412 100644 --- a/tinyrpc/net/timer.cc +++ b/tinyrpc/net/timer.cc @@ -57,7 +57,7 @@ void Timer::addTimerEvent(TimerEvent::ptr event, bool need_reset /*=true*/) { } m_pending_events.emplace(event->m_arrive_time, event); if (is_reset && need_reset) { - // DebugLog << "need reset timer"; + DebugLog << "need reset timer"; resetArriveTime(); } // DebugLog << "add timer event succ"; @@ -114,17 +114,16 @@ void Timer::onTimer() { int64_t now = getNowMs(); auto it = m_pending_events.begin(); std::vector tmps; - std::vector> tasks; + std::vector>> tasks; for (it = m_pending_events.begin(); it != m_pending_events.end(); ++it) { if ((*it).first <= now && !((*it).second->m_is_cancled)) { tmps.push_back((*it).second); - tasks.push_back((*it).second->m_task); + tasks.push_back(std::make_pair((*it).second->m_arrive_time, (*it).second->m_task)); } else { break; } } - m_reactor->addTask(tasks); m_pending_events.erase(m_pending_events.begin(), it); for (auto i = tmps.begin(); i != tmps.end(); ++i) { if ((*i)->m_is_repeated) { @@ -134,6 +133,12 @@ void Timer::onTimer() { } resetArriveTime(); + + // m_reactor->addTask(tasks); + for (auto i : tasks) { + // DebugLog << "excute timeevent:" << i.first; + i.second(); + } } } diff --git a/tinyrpc/net/tinypb/tinypb_rpc_async_channel.cc b/tinyrpc/net/tinypb/tinypb_rpc_async_channel.cc index 35df9da..a6fa2f9 100644 --- a/tinyrpc/net/tinypb/tinypb_rpc_async_channel.cc +++ b/tinyrpc/net/tinypb/tinypb_rpc_async_channel.cc @@ -39,6 +39,14 @@ void TinyPbRpcAsyncChannel::CallMethod(const google::protobuf::MethodDescriptor* google::protobuf::Message* response, google::protobuf::Closure* done) { + if (GetServer()->getIOThreadPool()->getIOThreadPoolSize() <= 1) { + ErrorLog << "Error! must have at least 2 iothread when call TinyPbRpcAsyncChannel"; + TinyPbRpcController* rpc_controller = dynamic_cast(controller); + rpc_controller->SetError(ERROR_ASYNC_RPC_CALL_SINGLE_IOTHREAD, "Error! must have at least 2 iothread when call TinyPbRpcAsyncChannel"); + m_promise.set_value(true); + return; + } + std::shared_ptr s_ptr = shared_from_this(); std::shared_ptr method_ptr; method_ptr.reset(method); @@ -61,11 +69,7 @@ void TinyPbRpcAsyncChannel::CallMethod(const google::protobuf::MethodDescriptor* DebugLog << "excute rpc call method by this thread finish"; s_ptr->m_promise.set_value(true); }; - - m_cor = GetCoroutinePool()->getCoroutineInstanse(); - m_cor->setCallBack(cb); - - GetServer()->getIOThreadPool()->addCoroutineRandomThread(m_cor, false); + m_cor = GetServer()->getIOThreadPool()->addCoroutineToRandomThread(cb, false); } std::future TinyPbRpcAsyncChannel::getFuture() { diff --git a/tinyrpc/net/tinypb/tinypb_rpc_channel.cc b/tinyrpc/net/tinypb/tinypb_rpc_channel.cc index 02f463b..0416842 100644 --- a/tinyrpc/net/tinypb/tinypb_rpc_channel.cc +++ b/tinyrpc/net/tinypb/tinypb_rpc_channel.cc @@ -14,8 +14,8 @@ namespace tinyrpc { -TinyPbRpcChannel::TinyPbRpcChannel(NetAddress::ptr addr) { - m_client = std::make_shared(addr); +TinyPbRpcChannel::TinyPbRpcChannel(NetAddress::ptr addr) : m_addr(addr) { + } void TinyPbRpcChannel::CallMethod(const google::protobuf::MethodDescriptor* method, @@ -24,6 +24,7 @@ void TinyPbRpcChannel::CallMethod(const google::protobuf::MethodDescriptor* meth google::protobuf::Message* response, google::protobuf::Closure* done) { + m_client = std::make_shared(m_addr); TinyPbStruct pb_struct; TinyPbRpcController* rpc_controller = dynamic_cast(controller); rpc_controller->SetLocalAddr(m_client->getLocalAddr()); diff --git a/tinyrpc/net/tinypb/tinypb_rpc_channel.h b/tinyrpc/net/tinypb/tinypb_rpc_channel.h index fdf24ec..97f881d 100644 --- a/tinyrpc/net/tinypb/tinypb_rpc_channel.h +++ b/tinyrpc/net/tinypb/tinypb_rpc_channel.h @@ -3,8 +3,8 @@ #include #include -#include "../net_address.h" -#include "../tcp/tcp_client.h" +#include "tinyrpc/net/net_address.h" +#include "tinyrpc/net//tcp/tcp_client.h" namespace tinyrpc { @@ -22,6 +22,7 @@ void CallMethod(const google::protobuf::MethodDescriptor* method, google::protobuf::Closure* done); private: + NetAddress::ptr m_addr; TcpClient::ptr m_client; };