-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathTcpSession.cpp
116 lines (102 loc) · 3.51 KB
/
TcpSession.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#include <thread>
#include "TcpSession.h"
TcpSession::TcpSession(tcp::socket socket, Worker* myWorker):
mSocket(std::move(socket)), myWorker(myWorker), mSendInterval(SEND_INTERVAL)
{
currentBuffer = new PBuff();
timer = new deadline_timer(*myWorker->workerService, mSendInterval);
writeOffset = mData;
readOffset = mData;
cout << "Session has been created!" << endl;
}
void TcpSession::BeginCommunication() {
auto self = shared_from_this();
myWorker->workerService->post([this, self]()
{
read_loop();
}
);
}
void TcpSession::handlePacket(char *bytes, size_t size) {
auto handlerPtr = handlerSelector.getHandler(*bytes);
assert(handlerPtr);
if (handlerPtr)
handlerPtr->Handle(bytes, size, *this);
}
void TcpSession::readHandler(size_t written) {
currSize += written;
writeOffset += written;
while (true) {
uint32_t packetSize = fixEndianness(*reinterpret_cast<uint32_t *>(readOffset));
if ((packetSize + 4) <= currSize) {
handlePacket(readOffset + 4, packetSize);
readOffset += 4 + packetSize;
currSize -= 4 + packetSize;
} else
break;
}
if (currSize) {
memmove(mData, readOffset, currSize);
}
readOffset = mData;
writeOffset = readOffset + currSize;
}
void TcpSession::read_loop() {
auto self(shared_from_this());
mSocket.async_read_some(boost::asio::buffer(writeOffset, bufferLength - currSize),
[this, self](boost::system::error_code ec, std::size_t length) {
if (!ec) {
readHandler(length);
read_loop();
}
else {
int a = 2;
}
});
}
void TcpSession::sendCurrPBuff() {
auto self = shared_from_this();
PBuff* pbuff = currentBuffer;
boost::asio::async_write(mSocket, boost::asio::buffer(pbuff->getBuff(), pbuff->currSize),
[self, pbuff](boost::system::error_code ec, std::size_t l)
{
if (!ec) {
//cout << "Sent! " << pbuff->currSize << endl;
}
delete pbuff;
});
}
void TcpSession::replacePBuff() {
sendCurrPBuff();
currentBuffer = new PBuff();
}
void TcpSession::writeStaticPacket(StaticPacket &p) {
if(currentBuffer->bytesLeft() < p.size())
replacePBuff();
currentBuffer->writeStaticPacket(p);
startSendDaemon();
}
PBuff* TcpSession::getPacketBuffer(size_t maxSize) {
if(currentBuffer->bytesLeft() < maxSize)
replacePBuff();
return currentBuffer;
}
void TcpSession::sendDaemonTick() {
//std::cout << "sd tick" << std::endl;
if(currentBuffer->currSize)
replacePBuff();
sendDaemonProtector = nullptr;
sendDaemonStarted = false;
}
void TcpSession::startSendDaemon() {
if(!sendDaemonStarted) {
sendDaemonProtector = shared_from_this();
sendDaemonStarted = true;
timer->expires_from_now(boost::posix_time::milliseconds(SEND_INTERVAL));
timer->async_wait(std::bind(&TcpSession::sendDaemonTick, this));
}
}
TcpSession::~TcpSession() {
delete timer;
cout << "Session has been destroyed!" << endl;
}