Skip to content

Commit 95e15da

Browse files
authored
Merge pull request #203 from apkar/mem-leak
Resolves #202: Fixes memory leak on `ExtConnection`
2 parents 74e46c4 + 6c2c0b9 commit 95e15da

File tree

8 files changed

+47
-70
lines changed

8 files changed

+47
-70
lines changed

src/BufferedConnection.actor.cpp

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,14 @@ struct BCBlock : FastAllocated<BCBlock> /*See below, NonCopyable */ {
4040
struct BufferedConnectionData {
4141
explicit BufferedConnectionData(Reference<IConnection> connection);
4242
~BufferedConnectionData() {
43+
// Free left over buffers.
44+
for (auto blk : deadlist) {
45+
delete blk;
46+
}
47+
for (auto blk : buffer) {
48+
delete blk;
49+
}
50+
4351
conn.cancel();
4452
connection->close();
4553
}
@@ -139,25 +147,6 @@ BufferedConnection::~BufferedConnection() {
139147
delete self;
140148
}
141149

142-
StringRef BufferedConnection::peekSome(int count, int offset) {
143-
ASSERT(count + offset <= self->total_bytes.get());
144-
145-
auto it = self->buffer.begin();
146-
int block_offset = self->buffer_begin_offset;
147-
148-
while (offset) {
149-
int advance = std::min(offset, BCBlock::DATA_SIZE - block_offset);
150-
offset -= advance;
151-
block_offset += advance;
152-
if (block_offset == BCBlock::DATA_SIZE) {
153-
block_offset = 0;
154-
++it;
155-
}
156-
}
157-
158-
return {(*it)->data + block_offset, std::min(count, BCBlock::DATA_SIZE - block_offset)};
159-
}
160-
161150
void BufferedConnectionData::copyInto(uint8_t* buf, int count) {
162151
uint8_t* ptr = buf;
163152
int offset = buffer_begin_offset;
@@ -307,10 +296,6 @@ Future<Void> BufferedConnection::onWritable() {
307296
return self->connection->onWritable();
308297
}
309298

310-
int BufferedConnection::bytesAvailable() {
311-
return self->total_bytes.get();
312-
}
313-
314299
NetworkAddress BufferedConnection::getPeerAddress() {
315300
return self->connection->getPeerAddress();
316301
}

src/BufferedConnection.h

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,20 +37,8 @@ struct BufferedConnection : ReferenceCounted<BufferedConnection> {
3737
Future<Void> onBytesAvailable(int count);
3838

3939
/**
40-
* Returns the number of bytes in the buffer.
41-
*/
42-
int bytesAvailable();
43-
44-
/**
45-
* Returns up to count bytes starting at offset. The returned memory is
46-
* guaranteed to be valid until the next call to pop() or read().
47-
*
48-
* NOTE: count + offset MUST BE no greater than bytesAvailable().
49-
*/
50-
StringRef peekSome(int count, int offset = 0);
51-
52-
/**
53-
* Returns exactly count bytes. The returned memory is guaranteed to be
40+
* Returns exactly count bytes. The returned memor
41+
* y is guaranteed to be
5442
* valid until the next call to pop() or read().
5543
*
5644
* NOTE: count MUST BE no greater than bytesAvailable()

src/Cursor.actor.cpp

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,21 +23,27 @@
2323
#include "Knobs.h"
2424
#include "flow/actorcompiler.h" // This must be the last #include.
2525

26-
int32_t Cursor::prune(std::map<int64_t, Reference<Cursor>>& cursors) {
26+
int32_t Cursor::prune(std::map<int64_t, Reference<Cursor>>& cursors, bool pruneAll) {
2727
time_t now = time(nullptr);
2828
int32_t pruned = 0;
2929
std::vector<Reference<Cursor>> to_be_pruned;
3030

31-
for (auto it = cursors.begin(); it != cursors.end();) {
32-
if (it->second && now >= it->second->expiry) {
33-
to_be_pruned.push_back(it->second);
31+
try {
32+
for (auto it = cursors.begin(); it != cursors.end();) {
33+
if (it->second && (pruneAll || now >= it->second->expiry)) {
34+
to_be_pruned.push_back(it->second);
35+
}
36+
++it;
3437
}
35-
++it;
36-
}
3738

38-
for (const auto& i : to_be_pruned) {
39-
(void)pluck(i);
40-
pruned++;
39+
for (const auto& i : to_be_pruned) {
40+
(void)pluck(i);
41+
pruned++;
42+
}
43+
} catch (Error& e) {
44+
TraceEvent(SevError, "BD_cursor_prune_failed").error(e);
45+
// Ignoring error just to keep the code consistent with previous behaviour.
46+
// Cursor design could be made lot better than this.
4147
}
4248

4349
return pruned;

src/Cursor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ struct Cursor : ReferenceCounted<Cursor>, NonCopyable {
4343

4444
void refresh() { expiry = time(nullptr) + DOCLAYER_KNOBS->CURSOR_EXPIRY; }
4545

46-
static int32_t prune(std::map<int64_t, Reference<Cursor>>& cursors);
46+
static int32_t prune(std::map<int64_t, Reference<Cursor>>& cursors, bool pruneAll);
4747

4848
static void pluck(Reference<Cursor> cursor);
4949
static Reference<Cursor> add(std::map<int64_t, Reference<Cursor>>& siblings, Reference<Cursor> cursor);

src/DocLayer.actor.cpp

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ Future<Void> processRequest(Reference<ExtConnection> ec,
143143
try {
144144
Reference<ExtMsg> msg = ExtMsg::create(header, body, finished);
145145
if (verboseLogging)
146-
TraceEvent("BD_processRequest").detail("Message", msg->toString());
146+
TraceEvent("BD_processRequest").detail("Message", msg->toString()).detail("connId", ec->connectionId);
147147
if (verboseConsoleOutput)
148148
fprintf(stderr, "C -> S: %s\n\n", msg->toString().c_str());
149149
return msg->run(ec);
@@ -165,29 +165,44 @@ ACTOR Future<Void> popDisposedMessages(Reference<BufferedConnection> bc,
165165
}
166166
}
167167

168+
ACTOR Future<Void> housekeeping(Reference<ExtConnection> ec) {
169+
try {
170+
loop {
171+
wait(delay(DOCLAYER_KNOBS->CURSOR_EXPIRY));
172+
Cursor::prune(ec->cursors, false);
173+
}
174+
} catch (Error& e) {
175+
// This is the only actor responsible for all the cursors created
176+
// through this connection. Prune all the cursors before cancelling
177+
// this actor.
178+
if (e.code() == error_code_actor_cancelled)
179+
Cursor::prune(ec->cursors, true);
180+
throw;
181+
}
182+
}
183+
168184
ACTOR Future<Void> extServerConnection(Reference<DocumentLayer> docLayer,
169185
Reference<BufferedConnection> bc,
170186
int64_t connectionId) {
171187
if (verboseLogging)
172-
TraceEvent("BD_serverNewConnection");
188+
TraceEvent("BD_serverNewConnection").detail("connId", connectionId);
173189

174190
state Reference<ExtConnection> ec = Reference<ExtConnection>(new ExtConnection(docLayer, bc, connectionId));
175191
state PromiseStream<std::pair<int, Future<Void>>> msg_size_inuse;
176192
state Future<Void> onError = ec->bc->onClosed() || popDisposedMessages(bc, msg_size_inuse.getFuture());
193+
state Future<Void> connHousekeeping = housekeeping(ec);
177194

178195
DocumentLayer::metricReporter->captureGauge(DocLayerConstants::MT_GUAGE_ACTIVE_CONNECTIONS,
179196
++docLayer->nrConnections);
180197
try {
181-
ec->startHousekeeping();
182-
183198
loop {
184199
// Will be broken (or set or whatever) only when the memory we are passing to processRequest is no longer
185200
// needed and can be popped
186201
state Promise<Void> finished;
187202
choose {
188203
when(wait(onError)) {
189204
if (verboseLogging)
190-
TraceEvent("BD_serverClosedConnection");
205+
TraceEvent("BD_serverClosedConnection").detail("connId", connectionId);
191206
throw connection_failed();
192207
}
193208
when(wait(ec->bc->onBytesAvailable(sizeof(ExtMsgHeader)))) {

src/ExtMsg.actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -591,7 +591,7 @@ void ExtMsgReply::write(Reference<ExtConnection> nmc) {
591591
}
592592

593593
if (verboseLogging)
594-
TraceEvent("BD_msgReply").detail("Message", toString());
594+
TraceEvent("BD_msgReply").detail("Message", toString()).detail("connId", nmc->connectionId);
595595
if (verboseConsoleOutput)
596596
fprintf(stderr, "S -> C: %s\n\n", toString().c_str());
597597

src/ExtStructs.actor.cpp

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,21 +40,6 @@ Reference<Plan> ExtConnection::isolatedWrapOperationPlan(Reference<Plan> plan, i
4040
return Reference<Plan>(new RetryPlan(plan, timeout, retryLimit, docLayer->database));
4141
}
4242

43-
ACTOR Future<Void> housekeeping_impl(Reference<ExtConnection> ec) {
44-
loop {
45-
wait(delay(DOCLAYER_KNOBS->CURSOR_EXPIRY));
46-
try {
47-
Cursor::prune(ec->cursors);
48-
} catch (Error& e) {
49-
TraceEvent(SevError, "BD_Cursor_housekeeping").error(e);
50-
}
51-
}
52-
}
53-
54-
void ExtConnection::startHousekeeping() {
55-
housekeeping = housekeeping_impl(Reference<ExtConnection>::addRef(this));
56-
}
57-
5843
ACTOR Future<WriteResult> lastErrorOrLastResult(Future<WriteResult> previous,
5944
Future<WriteResult> next,
6045
FlowLock* lock,

src/ExtStructs.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,6 @@ struct ExtConnection : ReferenceCounted<ExtConnection>, NonCopyable {
113113
Reference<Plan> wrapOperationPlan(Reference<Plan> plan, bool isReadOnly, Reference<UnboundCollectionContext> cx);
114114
Reference<Plan> isolatedWrapOperationPlan(Reference<Plan> plan);
115115
Reference<Plan> isolatedWrapOperationPlan(Reference<Plan> plan, int64_t timeout, int64_t retryLimit);
116-
void startHousekeeping();
117116
Future<Void> beforeWrite(int desiredPermits = 1);
118117
Future<Void> afterWrite(Future<WriteResult> result, int releasePermits = 1);
119118

@@ -152,7 +151,6 @@ struct ExtConnection : ReferenceCounted<ExtConnection>, NonCopyable {
152151
private:
153152
Future<Void> currentWriteLocked;
154153
Reference<FlowLock> lock;
155-
Future<Void> housekeeping;
156154
int32_t maxReceivedRequestID;
157155
int32_t nextServerGeneratedRequestID;
158156
};

0 commit comments

Comments
 (0)