Skip to content

Commit 1ae27f9

Browse files
author
deadBranch
committed
packet buffering realized
1 parent ff03df0 commit 1ae27f9

16 files changed

+492
-340
lines changed

.idea/tokenwizard.iml

+11-10
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/workspace.xml

+244-242
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

BaseHeader.h

+4
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,15 @@ using std::endl;
2626
#include <boost/shared_ptr.hpp>
2727
#include <boost/make_shared.hpp>
2828
#include <boost/asio.hpp>
29+
#include <alien.pm/tokenwizard/serialization/PacketBuffer.h>
2930

3031
#include "concurrent-guard/cg_shared_ptr.h"
32+
#include "Definitions.h"
3133

3234
using boost::make_shared;
3335
using boost::asio::ip::tcp;
3436

37+
typedef PacketBuffer<PACKET_BUFFER_SIZE> PBuff;
38+
3539

3640
#endif //TOKENWIZARD_BASEHEADER_H

CMakeLists.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,5 @@ file(GLOB SOURCE_FILES
1818
"*/*.cpp"
1919
)
2020

21-
add_executable(tokenwizard ${SOURCE_FILES} ${Boost_LIBRARIES} HandlerSelector.h CommandHandler.h CommandHandlers/GetTokenCommandHandler.cpp CommandHandlers/GetTokenCommandHandler.h CommandHandlers/GenTokenCommandHandler.cpp CommandHandlers/GenTokenCommandHandler.h Worker.h)
21+
add_executable(tokenwizard ${SOURCE_FILES} ${Boost_LIBRARIES} HandlerSelector.h CommandHandler.h CommandHandlers/GetTokenCommandHandler.cpp CommandHandlers/GetTokenCommandHandler.h CommandHandlers/GenTokenCommandHandler.cpp CommandHandlers/GenTokenCommandHandler.h Worker.h serialization/PacketBuffer.h serialization/StaticPacket.h)
2222
TARGET_LINK_LIBRARIES (tokenwizard ${Boost_LIBRARIES})
+4-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include <alien.pm/tokenwizard/ClientCommands.h>
12
#include "GenTokenCommandHandler.h"
23
#include "../UnorderedTokenMap.h"
34
#include "../TcpSession.h"
@@ -7,7 +8,8 @@ void GenTokenCommandHandler::Handle(char *bytes, size_t size, TcpSession &sessio
78
size_t dataLength = size - 1;
89
auto ptr = tokenMap.genToken(data, dataLength);
910
cout << "Generated: " << ptr->token << endl;
10-
shared_ptr<Packet> infoPtr = ptr->tokenInfo();
11-
sessionContext.SendPacket<shared_ptr<Packet>>(infoPtr.get(), move(infoPtr));
11+
ptr->writeTokenInfo(sessionContext.getPacketBuffer(TOKEN_LENGTH+8+4));
12+
//shared_ptr<StaticPacket> infoPtr = ptr->tokenInfo();
13+
//sessionContext.writeStaticPacket(*infoPtr.get());
1214
sessionContext.myWorker->pushRemoveTask(move(ptr));
1315
}

CommandHandlers/GetTokenCommandHandler.cpp

+3-5
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
#include "../UnorderedTokenMap.h"
88
#include "../TcpSession.h"
99

10-
extern Packet tokenDoesNotExistPacket;
10+
extern StaticPacket tokenDoesNotExistPacket;
1111

1212
void GetTokenCommandHandler::Handle(char *bytes, size_t size, TcpSession &sessionContext) {
1313
//Deserialize
@@ -18,11 +18,9 @@ void GetTokenCommandHandler::Handle(char *bytes, size_t size, TcpSession &sessio
1818
auto ptr = tokenMap.try_get(token);
1919
if(ptr && ptr->timeToDie) {
2020
// cout << "token exists" << endl;
21-
sessionContext.SendPacket<decltype(ptr)>(ptr->data, move(ptr));
21+
sessionContext.writeStaticPacket(*ptr->dataPacket);
2222
}
2323
else {
24-
cout << token << endl;
25-
cout << "Unknown token" << endl;
26-
sessionContext.SendPacket(&tokenDoesNotExistPacket);
24+
sessionContext.writeStaticPacket(tokenDoesNotExistPacket);
2725
}
2826
}

Definitions.h

+2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
#define MAP_SIZE_EXPONENT 26 //the number of the elements = 2^mapSizeExponent; range: 0 < MAP_SIZE_EXPONENT < 32
66
#define TOKEN_LENGTH 32 //the length of the entire token
77
#define TOKEN_TTL 20 //time to live of a token
8+
#define SEND_INTERVAL 1000
89
#define DATA_LENGTH 48 //the maximum length of the token content
10+
#define PACKET_BUFFER_SIZE 32768
911

1012
#endif //TOKENWIZARD_DEFINES_H

Int32Encoder.h

-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
#define TOKENWIZARD_INT32ENCODER_H
33

44
#include "BaseHeader.h"
5-
#include "serialization/Packet.h"
65

76
class Int32Encoder {
87
static char encodeMap[];

RandomFiller.h

-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
#include "BaseHeader.h"
55
#include "Definitions.h"
6-
#include "serialization/Packet.h"
76

87
extern const char alphabet[63];
98

TcpSession.cpp

+83-1
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@
22
#include "TcpSession.h"
33

44
TcpSession::TcpSession(tcp::socket socket, Worker* myWorker):
5-
mSocket(std::move(socket)), myWorker(myWorker)
5+
mSocket(std::move(socket)), myWorker(myWorker), mSendInterval(SEND_INTERVAL)
66
{
7+
currentBuffer = new PBuff();
8+
timer = new deadline_timer(*myWorker->workerService, mSendInterval);
9+
timer->async_wait(std::bind(&TcpSession::sendDaemonTick, this));
710
writeOffset = mData;
811
readOffset = mData;
912
cout << "Session has been created!" << endl;
@@ -21,3 +24,82 @@ void TcpSession::BeginCommunication() {
2124
}
2225
);
2326
}
27+
28+
void TcpSession::handlePacket(char *bytes, size_t size) {
29+
auto handlerPtr = handlerSelector.getHandler(*bytes);
30+
assert(handlerPtr);
31+
if (handlerPtr)
32+
handlerPtr->Handle(bytes, size, *this);
33+
}
34+
35+
void TcpSession::readHandler(size_t written) {
36+
currSize += written;
37+
writeOffset += written;
38+
while (true) {
39+
uint32_t packetSize = fixEndianness(*reinterpret_cast<uint32_t *>(readOffset));
40+
if ((packetSize + 4) <= currSize) {
41+
handlePacket(readOffset + 4, packetSize);
42+
readOffset += 4 + packetSize;
43+
currSize -= 4 + packetSize;
44+
} else
45+
break;
46+
}
47+
if (currSize) {
48+
memmove(mData, readOffset, currSize);
49+
}
50+
readOffset = mData;
51+
writeOffset = readOffset + currSize;
52+
}
53+
54+
void TcpSession::read_loop() {
55+
auto self(shared_from_this());
56+
mSocket.async_read_some(boost::asio::buffer(writeOffset, maxLength - currSize),
57+
[this, self](boost::system::error_code ec, std::size_t length) {
58+
if (!ec) {
59+
readHandler(length);
60+
read_loop();
61+
}
62+
});
63+
}
64+
65+
void TcpSession::sendCurrPBuff() {
66+
auto self = shared_from_this();
67+
PBuff* pbuff = currentBuffer;
68+
boost::asio::async_write(mSocket, boost::asio::buffer(pbuff->getBuff(), pbuff->currSize),
69+
[self, pbuff](boost::system::error_code ec, std::size_t l)
70+
{
71+
if (!ec) {
72+
cout << "Sent! " << pbuff->currSize << endl;
73+
}
74+
delete pbuff;
75+
});
76+
}
77+
78+
Packet TcpSession::beginPacket(size_t size) {
79+
if(currentBuffer->bytesLeft() > size)
80+
replacePBuff();
81+
return currentBuffer->beginPacket();
82+
}
83+
84+
void TcpSession::endPacket(Packet &p) {
85+
currentBuffer->endPacket(p);
86+
tryStartDaemon();
87+
}
88+
89+
void TcpSession::replacePBuff() {
90+
sendCurrPBuff();
91+
currentBuffer = new PBuff();
92+
}
93+
94+
void TcpSession::writeStaticPacket(StaticPacket &p) {
95+
if(currentBuffer->bytesLeft() < p.size())
96+
replacePBuff();
97+
currentBuffer->writeStaticPacket(p);
98+
tryStartDaemon();
99+
}
100+
101+
PBuff* TcpSession::getPacketBuffer(size_t maxSize) {
102+
if(currentBuffer->bytesLeft() < maxSize)
103+
replacePBuff();
104+
return currentBuffer;
105+
}

TcpSession.h

+26-62
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,11 @@
1-
//
2-
// Created by origin on 29.10.16.
3-
//
4-
51
#ifndef TOKENWIZARD_SESSION_H
62
#define TOKENWIZARD_SESSION_H
73

84
#include "BaseHeader.h"
9-
#include "serialization/Packet.h"
105
#include "HandlerSelector.h"
116
#include "Worker.h"
127

8+
139
extern HandlerSelector handlerSelector;
1410

1511
class TcpSession: public std::enable_shared_from_this<TcpSession> {
@@ -18,72 +14,40 @@ class TcpSession: public std::enable_shared_from_this<TcpSession> {
1814
maxLength = 65536
1915
};
2016
char mData[maxLength];
21-
22-
void handlePacket(char *bytes, size_t size) {
23-
auto handlerPtr = handlerSelector.getHandler(*bytes);
24-
assert(handlerPtr);
25-
if (handlerPtr)
26-
handlerPtr->Handle(bytes, size, *this);
27-
}
28-
17+
void handlePacket(char *bytes, size_t size);
2918
char *writeOffset;
3019
char *readOffset;
3120
size_t currSize = 0;
32-
33-
void readHandler(size_t written) {
34-
currSize += written;
35-
writeOffset += written;
36-
while (true) {
37-
uint32_t packetSize = fixEndianness(*reinterpret_cast<uint32_t *>(readOffset));
38-
if ((packetSize + 4) <= currSize) {
39-
handlePacket(readOffset + 4, packetSize);
40-
readOffset += 4 + packetSize;
41-
currSize -= 4 + packetSize;
42-
} else
43-
break;
44-
}
45-
if (currSize) {
46-
memmove(mData, readOffset, currSize);
47-
}
48-
readOffset = mData;
49-
writeOffset = readOffset + currSize;
21+
bool sendDaemonStarted = false;
22+
boost::posix_time::milliseconds mSendInterval;
23+
boost::asio::deadline_timer* timer;
24+
25+
void sendDaemonTick() {
26+
sendDaemonStarted = false;
27+
if(currentBuffer->currSize)
28+
replacePBuff();
29+
std::cout << "sd tick" << std::endl;
5030
}
5131

52-
void read_loop() {
53-
auto self(shared_from_this());
54-
mSocket.async_read_some(boost::asio::buffer(writeOffset, maxLength - currSize),
55-
[this, self](boost::system::error_code ec, std::size_t length) {
56-
if (!ec) {
57-
readHandler(length);
58-
read_loop();
59-
}
60-
});
32+
void tryStartDaemon() {
33+
if(!sendDaemonStarted) {
34+
sendDaemonStarted = true;
35+
timer->expires_from_now(boost::posix_time::seconds(5));
36+
//timer->async_wait(std::bind(&TcpSession::sendDaemonTick, this));
37+
}
6138
}
6239

63-
40+
void readHandler(size_t written);
41+
void read_loop();
6442
public:
6543
Worker* myWorker;
66-
template<class T>
67-
void SendPacket(Packet *packet, T &&smart_pointer) {
68-
auto self = shared_from_this();
69-
boost::asio::async_write(mSocket, boost::asio::buffer(packet -> data(), packet->size()),
70-
[self, packet, smart_pointer](boost::system::error_code ec, std::size_t l)
71-
{
72-
if (!ec) {
73-
//cout << "Sent!" << endl;
74-
}
75-
});
76-
}
77-
void SendPacket(Packet *packet) {
78-
auto self = shared_from_this();
79-
boost::asio::async_write(mSocket, boost::asio::buffer(packet -> data(), packet->size()),
80-
[self, packet](boost::system::error_code ec, std::size_t l)
81-
{
82-
if (!ec) {
83-
cout << "Sent!" << endl;
84-
}
85-
});
86-
}
44+
PBuff* currentBuffer;
45+
void sendCurrPBuff();
46+
void replacePBuff();
47+
Packet beginPacket(size_t size);
48+
void endPacket(Packet& p);
49+
void writeStaticPacket(StaticPacket& p);
50+
PBuff* getPacketBuffer(size_t maxSize);
8751
TcpSession(tcp::socket socket, Worker* myWorker);
8852
void BeginCommunication();
8953
~TcpSession();

Token.h

+22-6
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,47 @@
11
#ifndef TOKENWIZARD_TOKENRECORD_H
22
#define TOKENWIZARD_TOKENRECORD_H
33

4-
#include "BaseHeader.h"
4+
#include <boost/thread/pthread/once_atomic.hpp>
5+
#include "serialization/PacketBuffer.h"
6+
#include "serialization/StaticPacket.h"
57
#include "Int32Encoder.h"
68
#include "RandomFiller.h"
79
#include "ServerResponses.h"
10+
#include "BaseHeader.h"
811

912
template <size_t tokenLength>
1013
class Token {
14+
boost::once_flag decreaseOnce = BOOST_ONCE_INIT;
1115
public:
12-
Packet* data;
16+
void deleteOnce
17+
StaticPacket* dataPacket;
1318
char token[tokenLength+7];
1419
time_t timeToDie;
15-
Token(uint32_t val, Packet* _dataPacket): data(_dataPacket) {
20+
21+
Token(uint32_t val, StaticPacket* _dataPacket): dataPacket(_dataPacket) {
1622
timeToDie = time(0) + TOKEN_TTL;
1723
token[tokenLength+6] = '\0';
1824
Int32Encoder::encode64Based(val, token);
1925
RandomFiller<tokenLength>::fillString(token+6);
2026
}
21-
shared_ptr<Packet> tokenInfo() {
22-
shared_ptr<Packet> packetPtr = std::make_shared<Packet>((char)ServerResponse::tokenCreated, tokenLength+7);
27+
28+
shared_ptr<StaticPacket> tokenInfo() {
29+
shared_ptr<StaticPacket> packetPtr = std::make_shared<StaticPacket>((char)ServerResponse::tokenCreated, tokenLength+7);
2330
memcpy(packetPtr->data()+5, token, tokenLength + 7);
2431
packetPtr->serialize();
2532
return packetPtr;
2633
}
34+
35+
void writeTokenInfo(PBuff* buff) {
36+
char* dataPtr = buff->getPtr();
37+
writeCount(tokenLength+8, dataPtr);
38+
dataPtr[4] = (char)ServerResponse::tokenCreated;
39+
memcpy(dataPtr+5, token, tokenLength + 7);
40+
buff->currSize += tokenLength+8+4;
41+
}
42+
2743
~Token() {
28-
delete data;
44+
delete dataPacket;
2945
}
3046
};
3147

0 commit comments

Comments
 (0)