Skip to content

Commit ff03df0

Browse files
author
deadBranch
committed
+workers
1 parent 5180661 commit ff03df0

10 files changed

+227
-228
lines changed

.idea/workspace.xml

+185-213
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

CommandHandlers/GenTokenCommandHandler.cpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@
55
void GenTokenCommandHandler::Handle(char *bytes, size_t size, TcpSession &sessionContext) {
66
char* data = bytes + 1;
77
size_t dataLength = size - 1;
8-
auto ptr = tokenMap.genToken(bytes, dataLength);
9-
cout << ptr->token << endl;
8+
auto ptr = tokenMap.genToken(data, dataLength);
9+
cout << "Generated: " << ptr->token << endl;
1010
shared_ptr<Packet> infoPtr = ptr->tokenInfo();
1111
sessionContext.SendPacket<shared_ptr<Packet>>(infoPtr.get(), move(infoPtr));
12+
sessionContext.myWorker->pushRemoveTask(move(ptr));
1213
}

CommandHandlers/GetTokenCommandHandler.cpp

+1-4
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,9 @@ void GetTokenCommandHandler::Handle(char *bytes, size_t size, TcpSession &sessio
1414
//cout << "GetToken" << endl;
1515
char* token = bytes + 1;
1616
//cout << token << endl;
17-
18-
cout << "get_token:" << std::this_thread::get_id()<< endl;
19-
2017
//Handle
2118
auto ptr = tokenMap.try_get(token);
22-
if(ptr && ptr->isEnabled) {
19+
if(ptr && ptr->timeToDie) {
2320
// cout << "token exists" << endl;
2421
sessionContext.SendPacket<decltype(ptr)>(ptr->data, move(ptr));
2522
}

Definitions.h

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#define NETWORK_ENDIANNESS __le
55
#define MAP_SIZE_EXPONENT 26 //the number of the elements = 2^mapSizeExponent; range: 0 < MAP_SIZE_EXPONENT < 32
66
#define TOKEN_LENGTH 32 //the length of the entire token
7+
#define TOKEN_TTL 20 //time to live of a token
78
#define DATA_LENGTH 48 //the maximum length of the token content
89

910
#endif //TOKENWIZARD_DEFINES_H

TcpSession.cpp

-3
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@ TcpSession::~TcpSession() {
1515

1616
void TcpSession::BeginCommunication() {
1717
auto self = shared_from_this();
18-
myWorker->workerService->post([]() {
19-
cout << "direct post: " << std::this_thread::get_id()<< endl;
20-
});
2118
myWorker->workerService->post([this, self]()
2219
{
2320
read_loop();

TcpSession.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
extern HandlerSelector handlerSelector;
1414

1515
class TcpSession: public std::enable_shared_from_this<TcpSession> {
16-
Worker* myWorker;
1716
tcp::socket mSocket;
1817
enum {
1918
maxLength = 65536
@@ -63,6 +62,7 @@ class TcpSession: public std::enable_shared_from_this<TcpSession> {
6362

6463

6564
public:
65+
Worker* myWorker;
6666
template<class T>
6767
void SendPacket(Packet *packet, T &&smart_pointer) {
6868
auto self = shared_from_this();

Token.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,16 @@
44
#include "BaseHeader.h"
55
#include "Int32Encoder.h"
66
#include "RandomFiller.h"
7+
#include "ServerResponses.h"
78

89
template <size_t tokenLength>
910
class Token {
1011
public:
1112
Packet* data;
1213
char token[tokenLength+7];
13-
bool isEnabled = true;
14+
time_t timeToDie;
1415
Token(uint32_t val, Packet* _dataPacket): data(_dataPacket) {
16+
timeToDie = time(0) + TOKEN_TTL;
1517
token[tokenLength+6] = '\0';
1618
Int32Encoder::encode64Based(val, token);
1719
RandomFiller<tokenLength>::fillString(token+6);

TokenServer.cpp

-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ void TokenServer::acceptLoop() {
66
mAcceptor.async_accept(mCurrSocket,
77
[this, pSelectedWorker](boost::system::error_code ec)
88
{
9-
cout << "acceptor:" << std::this_thread::get_id()<< endl;
109
if (!ec)
1110
{
1211
std::make_shared<TcpSession>(std::move(mCurrSocket), pSelectedWorker)->BeginCommunication();

UnorderedTokenMap.h

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ class UnorderedTokenMap {
2626
cg_shared_ptr<Token<tokenLength>> genToken(const char* data, size_t len) {
2727
Packet* dataPacket = new Packet((char)ServerResponse::tokenExists, len);
2828
memcpy(dataPacket->data()+5, data, len);
29+
char* d = dataPacket->data()+5;
2930
dataPacket->serialize();
3031

3132
while(true) {

Worker.h

+32-3
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,45 @@
22
#define TOKENWIZARD_WORKER_H
33

44
#include <thread>
5+
#include <queue>
56
#include "BaseHeader.h"
7+
#include "Token.h"
8+
69
using namespace boost::asio;
710

811
class Worker {
9-
boost::asio::io_service::work* fakeWork;
12+
typedef cg_shared_ptr<Token<TOKEN_LENGTH>> TokenPtr;
13+
boost::asio::io_service::work* gcWork;
14+
15+
boost::posix_time::seconds interval; // 1 second
16+
boost::asio::deadline_timer* timer;
17+
time_t currTime;
18+
void gcTick() {
19+
currTime = time(0);
20+
size_t currSize = removeQueue.size();
21+
for(size_t i = 0; i < currSize; ++i) {
22+
auto ptr = removeQueue.front();
23+
if(ptr->timeToDie < currTime)
24+
removeQueue.pop();
25+
else
26+
break;
27+
}
28+
//std::cout << "gcTick" << std::endl;
29+
timer->expires_at(timer->expires_at() + interval);
30+
timer->async_wait(std::bind(&Worker::gcTick, this));
31+
}
1032
public:
33+
void pushRemoveTask(TokenPtr&& tokenPtr) {
34+
removeQueue.push(move(tokenPtr));
35+
}
36+
queue<TokenPtr> removeQueue;
1137
io_service* workerService;
12-
Worker() {
38+
Worker(): interval(1) {
39+
currTime = time(0);
1340
workerService = new io_service();
14-
fakeWork = new io_service::work(*workerService);
41+
gcWork = new io_service::work(*workerService);
42+
timer = new deadline_timer(*workerService, interval);
43+
timer->async_wait(std::bind(&Worker::gcTick, this));
1544
thread([this]() {
1645
workerService->run();
1746
}).detach();

0 commit comments

Comments
 (0)