Skip to content

Commit 776ceb0

Browse files
authored
Fix S3/REST infrastructure: connection lifecycle and API compliance (#12447)
* Fix S3/REST infrastructure: connection lifecycle and API compliance (First of three PR to add backup and restore via s3/mocks3). Fix issues in S3 and REST client infrastructure that were causing simulation failures and API compliance problems. PART 1: Sim2Conn Connection Lifecycle Fixes Problem: Connection pools were destroying connections without explicitly closing them, violating Sim2Conn's assertion (!opened || closedByCaller) in simulation. Fixes: 1. Add destructors to connection pool classes that close all pooled connections before destruction (RESTConnectionPool, S3BlobStoreEndpoint::ConnectionPoolData) 2. Close connections on error paths in doRequest_impl() before they go out of scope (both REST and S3) 3. Replace static globalConnectionPool with getGlobalConnectionPool() returning process-local pools in simulation 4. Add ReusableConnection copy control to detect cross-process copies PART 2: S3 API Compliance and Error Handling Problem: S3 API spec violations and incorrect error handling causing backup failures. Fixes: 1. Strip leading slashes from S3 object keys (S3 spec: keys shouldn't start with '/') - constructResourcePath() now removes all leading '/' characters 2. URL decode object names from S3 XML responses (S3 returns "test%20file.txt") - listObjectsStream_impl(), BackupContainerS3BlobStore list functions 3. Fix guessRegionFromDomain() crash with localhost/127.0.0.1 (used by MockS3) 4. Only parse S3 error codes for 4xx/5xx responses, not 2xx success responses 5. Add connection validation ASSERT before handshake 6. Use flat listing in BackupContainerS3BlobStore to find all nested files - Fixes all Sim2Conn assertion failures in S3/REST tests - Enables S3 backups to work correctly with proper object key handling - Fixes MockS3 testing with localhost addresses - Simulation-only changes (g_network->isSimulated()) have no production impact - S3 API compliance changes apply to both simulation and production * Fix compile issue * Formatting * Fix rconn scope issue in S3BlobStore error handler * Fix s3client_test to use full object path in list verification The test was checking for 'ls_test/sub1/file2_1' but the actual S3 object key is 's3client/ls_test/sub1/file2_1' (the full path including the path_prefix). S3 list operations return the complete object key, not a path relative to the prefix. This broke after the object key normalization changes that strip leading slashes, which changed how paths were being constructed. Fix: Extract the full path prefix from the URL and use it when verifying the listing output, so we check for the correct full object paths. * Formatting * Address review improvement suggestion
1 parent 886c693 commit 776ceb0

File tree

7 files changed

+223
-40
lines changed

7 files changed

+223
-40
lines changed

fdbclient/BackupContainerS3BlobStore.actor.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "fdbclient/BackupContainerS3BlobStore.h"
2323
#include "fdbrpc/AsyncFileEncrypted.h"
2424
#include "fdbrpc/AsyncFileReadAhead.actor.h"
25+
#include "fdbrpc/HTTP.h"
2526
#include "flow/actorcompiler.h" // This must be the last #include.
2627

2728
class BackupContainerS3BlobStoreImpl {
@@ -38,8 +39,10 @@ class BackupContainerS3BlobStoreImpl {
3839
S3BlobStoreEndpoint::ListResult contents = wait(bstore->listObjects(bucket, basePath));
3940
std::vector<std::string> results;
4041
for (const auto& f : contents.objects) {
42+
// URL decode the object name since S3 XML responses contain URL-encoded names
43+
std::string decodedName = HTTP::urlDecode(f.name);
4144
results.push_back(
42-
bstore->getResourceURL(f.name.substr(basePath.size()), format("bucket=%s", bucket.c_str())));
45+
bstore->getResourceURL(decodedName.substr(basePath.size()), format("bucket=%s", bucket.c_str())));
4346
}
4447
return results;
4548
}
@@ -85,12 +88,15 @@ class BackupContainerS3BlobStoreImpl {
8588
return pathFilter(folderPath.substr(prefixTrim));
8689
};
8790

88-
state S3BlobStoreEndpoint::ListResult result = wait(bc->m_bstore->listObjects(
89-
bc->m_bucket, bc->dataPath(path), '/', std::numeric_limits<int>::max(), rawPathFilter));
91+
// Use flat listing for backup files to ensure all files are found regardless of directory structure
92+
state S3BlobStoreEndpoint::ListResult result =
93+
wait(bc->m_bstore->listObjects(bc->m_bucket, bc->dataPath(path), Optional<char>(), 0, rawPathFilter));
9094
BackupContainerFileSystem::FilesAndSizesT files;
9195
for (const auto& o : result.objects) {
9296
ASSERT(o.name.size() >= prefixTrim);
93-
files.push_back({ o.name.substr(prefixTrim), o.size });
97+
// URL decode the object name since S3 XML responses contain URL-encoded names
98+
std::string decodedName = HTTP::urlDecode(o.name);
99+
files.push_back({ decodedName.substr(prefixTrim), o.size });
94100
}
95101
return files;
96102
}

fdbclient/RESTClient.actor.cpp

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -138,21 +138,22 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doRequest_impl(Reference<RESTCli
138138
state bool connectionEstablished = false;
139139

140140
state Reference<HTTP::IncomingResponse> r;
141+
state RESTConnectionPool::ReusableConnection rconn;
141142

142143
try {
143144
// Start connecting
144145
Future<RESTConnectionPool::ReusableConnection> frconn =
145146
client->conectionPool->connect(connectPoolKey, url.connType.secure, client->knobs.max_connection_life);
146147

147148
// Finish connecting, do request
148-
state RESTConnectionPool::ReusableConnection rconn =
149-
wait(timeoutError(frconn, client->knobs.connect_timeout));
149+
wait(store(rconn, timeoutError(frconn, client->knobs.connect_timeout)));
150150
connectionEstablished = true;
151151

152152
remoteAddress = rconn.conn->getPeerAddress();
153-
Reference<HTTP::IncomingResponse> _r = wait(timeoutError(
154-
HTTP::doRequest(rconn.conn, req, sendReceiveRate, &statsPtr->bytes_sent, sendReceiveRate), reqTimeout));
155-
r = _r;
153+
wait(store(
154+
r,
155+
timeoutError(HTTP::doRequest(rconn.conn, req, sendReceiveRate, &statsPtr->bytes_sent, sendReceiveRate),
156+
reqTimeout)));
156157

157158
// Since the response was parsed successfully (which is why we are here) reuse the connection unless we
158159
// received the "Connection: close" header.
@@ -161,6 +162,14 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doRequest_impl(Reference<RESTCli
161162
}
162163
rconn.conn.clear();
163164
} catch (Error& e) {
165+
// Close the connection on error to satisfy Sim2Conn's assertion in simulation.
166+
// If rconn holds a connection that hasn't been returned to the pool,
167+
// we must close it before it's destroyed (when rconn goes out of scope).
168+
// This sets closedByCaller = true in Sim2Conn, preventing assertion failures.
169+
if (connectionEstablished && rconn.conn.isValid()) {
170+
rconn.conn->close();
171+
rconn.conn.clear();
172+
}
164173
if (e.code() == error_code_actor_cancelled) {
165174
throw;
166175
}

fdbclient/RESTUtils.actor.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,24 @@
3030

3131
#include "flow/actorcompiler.h" // always the last include
3232

33+
// RESTConnectionPool destructor implementation
34+
RESTConnectionPool::~RESTConnectionPool() {
35+
// In simulation, explicitly close all pooled connections before destruction.
36+
// This satisfies Sim2Conn's assertion: !opened || closedByCaller
37+
// Without this, connections would be destroyed without being closed, causing assertion failures.
38+
if (g_network && g_network->isSimulated()) {
39+
for (auto& kv : connectionPoolMap) {
40+
while (!kv.second.empty()) {
41+
ReusableConnection& rconn = kv.second.front();
42+
if (rconn.conn.isValid()) {
43+
rconn.conn->close();
44+
}
45+
kv.second.pop();
46+
}
47+
}
48+
}
49+
}
50+
3351
const std::unordered_map<std::string, RESTConnectionType> RESTConnectionType::supportedConnTypes = {
3452
{ "http", RESTConnectionType("http", RESTConnectionType::NOT_SECURE_CONNECTION) },
3553
{ "https", RESTConnectionType("https", RESTConnectionType::SECURE_CONNECTION) }

fdbclient/S3BlobStore.actor.cpp

Lines changed: 103 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,32 @@
2020

2121
#include "fdbclient/S3BlobStore.h"
2222

23+
#include <sstream>
24+
#include "fdbrpc/HTTP.h"
2325
#include "fdbclient/ClientKnobs.h"
2426
#include "fdbclient/Knobs.h"
2527
#include "flow/FastRef.h"
2628
#include "flow/IConnection.h"
2729
#include "flow/Trace.h"
2830
#include "flow/flow.h"
2931
#include "flow/genericactors.actor.h"
32+
33+
// S3BlobStoreEndpoint::ConnectionPoolData destructor implementation
34+
S3BlobStoreEndpoint::ConnectionPoolData::~ConnectionPoolData() {
35+
// In simulation, explicitly close all pooled connections before destruction.
36+
// This satisfies Sim2Conn's assertion: !opened || closedByCaller
37+
// Without this, connections would be destroyed without being closed, causing assertion failures.
38+
if (g_network && g_network->isSimulated()) {
39+
while (!pool.empty()) {
40+
ReusableConnection& rconn = pool.front();
41+
if (rconn.conn.isValid()) {
42+
rconn.conn->close();
43+
}
44+
pool.pop();
45+
}
46+
}
47+
}
48+
3049
#include "md5/md5.h"
3150
#include "libb64/encode.h"
3251
#include "fdbclient/sha1/SHA1.h"
@@ -76,9 +95,6 @@ S3BlobStoreEndpoint::Stats S3BlobStoreEndpoint::s_stats;
7695
std::unique_ptr<S3BlobStoreEndpoint::BlobStats> S3BlobStoreEndpoint::blobStats;
7796
Future<Void> S3BlobStoreEndpoint::statsLogger = Never();
7897

79-
std::unordered_map<BlobStoreConnectionPoolKey, Reference<S3BlobStoreEndpoint::ConnectionPoolData>>
80-
S3BlobStoreEndpoint::globalConnectionPool;
81-
8298
S3BlobStoreEndpoint::BlobKnobs::BlobKnobs() {
8399
secure_connection = 1;
84100
connect_tries = CLIENT_KNOBS->BLOBSTORE_CONNECT_TRIES;
@@ -199,6 +215,11 @@ std::string S3BlobStoreEndpoint::BlobKnobs::getURLParameters() const {
199215
}
200216

201217
std::string guessRegionFromDomain(std::string domain) {
218+
// Special case for localhost/127.0.0.1 to prevent basic_string exception
219+
if (domain == "127.0.0.1" || domain == "localhost") {
220+
return "us-east-1";
221+
}
222+
202223
static const std::vector<const char*> knownServices = { "s3.", "cos.", "oss-", "obs." };
203224
boost::algorithm::to_lower(domain);
204225

@@ -446,11 +467,15 @@ std::string S3BlobStoreEndpoint::constructResourcePath(const std::string& bucket
446467
}
447468

448469
if (!object.empty()) {
449-
// Don't add a slash if the object starts with one
450-
if (!object.starts_with("/")) {
470+
// S3 object keys should not start with '/'. Strip any leading slashes.
471+
std::string cleanedObject = object;
472+
while (!cleanedObject.empty() && cleanedObject[0] == '/') {
473+
cleanedObject = cleanedObject.substr(1);
474+
}
475+
if (!cleanedObject.empty()) {
451476
resource += "/";
477+
resource += cleanedObject;
452478
}
453-
resource += object;
454479
}
455480

456481
return resource;
@@ -843,6 +868,10 @@ ACTOR Future<S3BlobStoreEndpoint::ReusableConnection> connect_impl(Reference<S3B
843868
} else {
844869
wait(store(conn, INetworkConnections::net()->connect(host, service, isTLS)));
845870
}
871+
872+
// Ensure connection is valid before handshake
873+
ASSERT(conn.isValid());
874+
846875
wait(conn->connectHandshake());
847876

848877
TraceEvent("S3BlobStoreEndpointNewConnectionSuccess")
@@ -1074,6 +1103,7 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doRequest_impl(Reference<S3BlobS
10741103
state bool reusingConn = false;
10751104
state bool fastRetry = false;
10761105
state bool simulateS3TokenError = false;
1106+
state S3BlobStoreEndpoint::ReusableConnection rconn; // Moved outside try block for error handler access
10771107

10781108
try {
10791109
// Start connecting
@@ -1095,8 +1125,7 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doRequest_impl(Reference<S3BlobS
10951125
}
10961126

10971127
// Finish connecting, do request
1098-
state S3BlobStoreEndpoint::ReusableConnection rconn =
1099-
wait(timeoutError(frconn, bstore->knobs.connect_timeout));
1128+
wait(store(rconn, timeoutError(frconn, bstore->knobs.connect_timeout)));
11001129
connectionEstablished = true;
11011130
connID = rconn.conn->getDebugID();
11021131
reqStartTimer = g_network->timer();
@@ -1140,7 +1169,11 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doRequest_impl(Reference<S3BlobS
11401169
rconn.conn, dryrunRequest, bstore->sendRate, &bstore->s_stats.bytes_sent, bstore->recvRate);
11411170
Reference<HTTP::IncomingResponse> _dryrunR = wait(timeoutError(dryrunResponse, requestTimeout));
11421171
dryrunR = _dryrunR;
1143-
std::string s3Error = parseErrorCodeFromS3(dryrunR->data.content);
1172+
// Only parse S3 error code for error responses (4xx/5xx), not successful responses (2xx)
1173+
std::string s3Error;
1174+
if (dryrunR->code >= 400) {
1175+
s3Error = parseErrorCodeFromS3(dryrunR->data.content);
1176+
}
11441177
if (dryrunR->code == badRequestCode && isS3TokenError(s3Error)) {
11451178
// authentication fails and s3 token error persists, retry with a HEAD dryrun request
11461179
// to avoid sending duplicate data indefinitly to save network bandwidth
@@ -1206,9 +1239,16 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doRequest_impl(Reference<S3BlobS
12061239
.errorUnsuppressed(e)
12071240
.detail("Verb", verb)
12081241
.detail("Resource", resource);
1242+
// Close the connection on error to satisfy Sim2Conn's assertion in simulation.
1243+
// If rconn holds a connection that hasn't been returned to the pool,
1244+
// we must close it before it's destroyed (when rconn goes out of scope).
1245+
// This sets closedByCaller = true in Sim2Conn, preventing assertion failures.
1246+
if (connectionEstablished && rconn.conn.isValid()) {
1247+
rconn.conn->close();
1248+
rconn.conn.clear();
1249+
}
12091250
if (e.code() == error_code_actor_cancelled)
12101251
throw;
1211-
// TODO: should this also do rconn.conn.clear()? (would need to extend lifetime outside of try block)
12121252
err = e;
12131253
}
12141254

@@ -1263,7 +1303,12 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doRequest_impl(Reference<S3BlobS
12631303

12641304
if (!err.present()) {
12651305
event.detail("ResponseCode", r->code);
1266-
std::string s3Error = parseErrorCodeFromS3(r->data.content);
1306+
// Only parse S3 error code for real error responses (4xx/5xx), not successful responses (2xx)
1307+
// Skip parsing for simulated errors where response content is still binary data
1308+
std::string s3Error;
1309+
if (r->code >= 400 && !simulateS3TokenError) {
1310+
s3Error = parseErrorCodeFromS3(r->data.content);
1311+
}
12671312
event.detail("S3ErrorCode", s3Error);
12681313
if (r->code == badRequestCode) {
12691314
if (isS3TokenError(s3Error) || simulateS3TokenError) {
@@ -1466,7 +1511,8 @@ ACTOR Future<Void> listObjectsStream_impl(Reference<S3BlobStoreEndpoint> bstore,
14661511
if (key == nullptr) {
14671512
throw http_bad_response();
14681513
}
1469-
object.name = key->value();
1514+
// URL decode the object name since S3 XML responses contain URL-encoded names
1515+
object.name = HTTP::urlDecode(key->value());
14701516

14711517
xml_node<>* size = n->first_node("Size");
14721518
if (size == nullptr) {
@@ -2038,15 +2084,12 @@ ACTOR Future<int> readObject_impl(Reference<S3BlobStoreEndpoint> bstore,
20382084
throw io_error();
20392085
}
20402086

2041-
try {
2042-
// Copy the output bytes, server could have sent more or less bytes than requested so copy at most length
2043-
// bytes
2044-
memcpy(data, r->data.content.data(), std::min<int64_t>(r->data.contentLen, length));
2045-
return r->data.contentLen;
2046-
} catch (Error& e) {
2047-
TraceEvent(SevWarn, "S3BlobStoreReadObjectMemcpyError").detail("Error", e.what());
2048-
throw io_error();
2049-
}
2087+
// Copy the output bytes, server could have sent more or less bytes than requested so copy at most length
2088+
// bytes
2089+
int bytesToCopy = std::min<int64_t>(r->data.contentLen, length);
2090+
memcpy(data, r->data.content.data(), bytesToCopy);
2091+
// Return the number of bytes actually copied
2092+
return bytesToCopy;
20502093
} catch (Error& e) {
20512094
TraceEvent(SevWarn, "S3BlobStoreEndpoint_ReadError")
20522095
.error(e)
@@ -2490,4 +2533,42 @@ TEST_CASE("/backup/s3/virtual_hosting_list_resource_path") {
24902533
ASSERT(listResource == "/test-bucket"); // Bucket is part of path
24912534

24922535
return Void();
2493-
}
2536+
}
2537+
2538+
TEST_CASE("/backup/s3/constructResourcePath") {
2539+
// Test that leading slashes in object keys are properly stripped
2540+
// S3 object keys should not start with '/', but if passed, we normalize them
2541+
2542+
std::string url = "blobstore://s3.us-west-2.amazonaws.com?bucket=test-bucket";
2543+
std::string resource;
2544+
std::string error;
2545+
S3BlobStoreEndpoint::ParametersT parameters;
2546+
Reference<S3BlobStoreEndpoint> s3 = S3BlobStoreEndpoint::fromString(url, {}, &resource, &error, &parameters);
2547+
2548+
// Test normal object key (no leading slash)
2549+
ASSERT(s3->constructResourcePath("test-bucket", "normal/path/file.txt") == "/test-bucket/normal/path/file.txt");
2550+
2551+
// Test single leading slash - should be stripped
2552+
ASSERT(s3->constructResourcePath("test-bucket", "/leading/slash/file.txt") ==
2553+
"/test-bucket/leading/slash/file.txt");
2554+
2555+
// Test multiple leading slashes - all should be stripped
2556+
ASSERT(s3->constructResourcePath("test-bucket", "///multiple/slashes.txt") == "/test-bucket/multiple/slashes.txt");
2557+
2558+
// Test object key that is only slashes - should result in empty object path
2559+
ASSERT(s3->constructResourcePath("test-bucket", "///") == "/test-bucket");
2560+
2561+
// Test empty object key
2562+
ASSERT(s3->constructResourcePath("test-bucket", "") == "/test-bucket");
2563+
2564+
// Test virtual hosting mode (bucket in hostname)
2565+
url = "blobstore://test-bucket.s3.us-west-2.amazonaws.com";
2566+
s3 = S3BlobStoreEndpoint::fromString(url, {}, &resource, &error, &parameters);
2567+
2568+
// Virtual hosting mode doesn't include bucket in path
2569+
ASSERT(s3->constructResourcePath("test-bucket", "normal/file.txt") == "/normal/file.txt");
2570+
ASSERT(s3->constructResourcePath("test-bucket", "/leading/slash.txt") == "/leading/slash.txt");
2571+
ASSERT(s3->constructResourcePath("test-bucket", "") == "");
2572+
2573+
return Void();
2574+
}

fdbclient/include/fdbclient/RESTUtils.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ class RESTConnectionPool : public ReferenceCounted<RESTConnectionPool> {
4949
struct ReusableConnection {
5050
Reference<IConnection> conn;
5151
double expirationTime;
52+
53+
ReusableConnection() : expirationTime(0) {}
54+
ReusableConnection(Reference<IConnection> c, double exp) : conn(c), expirationTime(exp) {}
5255
};
5356

5457
// Maximum number of connections cached in the connection-pool.
@@ -58,6 +61,10 @@ class RESTConnectionPool : public ReferenceCounted<RESTConnectionPool> {
5861

5962
RESTConnectionPool(const int maxConnsPerKey) : maxConnPerConnectKey(maxConnsPerKey) {}
6063

64+
// Destructor implementation in RESTUtils.actor.cpp
65+
// In simulation, explicitly closes all pooled connections before destruction
66+
~RESTConnectionPool();
67+
6168
// Routine is responsible to provide an usable TCP connection object; it reuses an active connection from
6269
// connection-pool if available, otherwise, establish a new TCP connection
6370
Future<ReusableConnection> connect(RESTConnectionPoolKey connectKey, const bool isSecure, const int maxConnLife);

0 commit comments

Comments
 (0)