Skip to content

Commit 1beb8c0

Browse files
authored
add new options to support reading files directly. (#163)
* port threadpool from s3fs. * add new options to support reading files directly without using local disk.
1 parent b2b38eb commit 1beb8c0

25 files changed

+1737
-34
lines changed

.gitignore

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ test/chaos-http-proxy-*
8585
test/junk_data
8686
test/s3proxy-*
8787
test/write_multiblock
88-
88+
test/direct_read_test
8989
#
9090
# Windows ports
9191
#

Makefile.am

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ release : dist ../utils/release.sh
3434
cppcheck:
3535
cppcheck --quiet --error-exitcode=1 \
3636
--inline-suppr \
37-
--std=c++03 \
37+
--std=c++11 \
3838
--xml \
3939
-D HAVE_ATTR_XATTR_H \
4040
-D HAVE_SYS_EXTATTR_H \

configure.ac

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ AC_CHECK_HEADERS([attr/xattr.h])
3434
AC_CHECK_HEADERS([sys/extattr.h])
3535
AC_CHECK_FUNCS([fallocate])
3636

37-
CXXFLAGS="$CXXFLAGS -Wall -fno-exceptions -D_FILE_OFFSET_BITS=64 -D_FORTIFY_SOURCE=2"
37+
CXXFLAGS="$CXXFLAGS -Wall -fno-exceptions -D_FILE_OFFSET_BITS=64 -D_FORTIFY_SOURCE=2 -std=c++11"
3838

3939
dnl ----------------------------------------------
4040
dnl For macOS

src/Makefile.am

+3-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ ossfs_SOURCES = \
5757
addhead.cpp \
5858
sighandlers.cpp \
5959
autolock.cpp \
60-
common_auth.cpp
60+
common_auth.cpp \
61+
threadpoolman.cpp \
62+
direct_reader.cpp
6163
if USE_SSL_OPENSSL
6264
ossfs_SOURCES += openssl_auth.cpp
6365
endif

src/common.h

+4
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ extern bool nomultipart;
3939
extern bool pathrequeststyle;
4040
extern bool complement_stat;
4141
extern bool noxmlns;
42+
extern bool direct_read;
43+
44+
extern int direct_read_max_prefetch_thread_count;
45+
4246
extern std::string program_name;
4347
extern std::string service_path;
4448
extern std::string s3host;

src/curl.cpp

+162-1
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,11 @@ bool S3fsCurl::InitS3fsCurl()
161161
if(sCurlPoolSize < std::max(GetMaxParallelCount(), GetMaxMultiRequest())){
162162
sCurlPoolSize = std::max(GetMaxParallelCount(), GetMaxMultiRequest());
163163
}
164+
165+
if(direct_read && sCurlPoolSize < direct_read_max_prefetch_thread_count) {
166+
sCurlPoolSize = direct_read_max_prefetch_thread_count;
167+
}
168+
164169
sCurlPool = new CurlHandlerPool(sCurlPoolSize);
165170
if (!sCurlPool->Init()) {
166171
return false;
@@ -704,6 +709,35 @@ size_t S3fsCurl::DownloadWriteCallback(void* ptr, size_t size, size_t nmemb, voi
704709
return totalwrite;
705710
}
706711

712+
size_t S3fsCurl::DownloadWriteStreamCallback(void* ptr, size_t size, size_t nmemb, void* userp)
713+
{
714+
S3fsCurl* pCurl = static_cast<S3fsCurl*>(userp);
715+
716+
if(1 > (size * nmemb)){
717+
return 0;
718+
}
719+
if(!pCurl-> partdata.streambuffer || 0 >= pCurl->partdata.size){
720+
return 0;
721+
}
722+
723+
// Buffer initial bytes in case it is an XML error response.
724+
if(pCurl->bodydata.size() < GET_OBJECT_RESPONSE_LIMIT){
725+
pCurl->bodydata.Append(ptr, std::min(size * nmemb, GET_OBJECT_RESPONSE_LIMIT - pCurl->bodydata.size()));
726+
}
727+
728+
// write size
729+
ssize_t copysize = (size * nmemb) < (size_t)pCurl->partdata.size ? (size * nmemb) : (size_t)pCurl->partdata.size;
730+
731+
// write
732+
memcpy(&((char*)pCurl->partdata.streambuffer)[pCurl->partdata.streampos], ptr, copysize);
733+
734+
pCurl->partdata.startpos += copysize;
735+
pCurl->partdata.size -= copysize;
736+
pCurl->partdata.streampos += copysize;
737+
738+
return copysize;
739+
}
740+
707741
bool S3fsCurl::SetCheckCertificate(bool isCertCheck)
708742
{
709743
bool old = S3fsCurl::is_cert_check;
@@ -1628,6 +1662,31 @@ bool S3fsCurl::PreGetObjectRequestSetCurlOpts(S3fsCurl* s3fscurl)
16281662
return true;
16291663
}
16301664

1665+
bool S3fsCurl::PreGetObjectStreamRequestSetCurlOpts(S3fsCurl* s3fscurl)
1666+
{
1667+
if(!s3fscurl) {
1668+
return false;
1669+
}
1670+
if(!s3fscurl->CreateCurlHandle()){
1671+
return false;
1672+
}
1673+
1674+
if (CURLE_OK != curl_easy_setopt(s3fscurl->hCurl, CURLOPT_URL, s3fscurl->url.c_str())){
1675+
return false;
1676+
}
1677+
if(CURLE_OK != curl_easy_setopt(s3fscurl->hCurl, CURLOPT_WRITEFUNCTION, DownloadWriteStreamCallback)){
1678+
return false;
1679+
}
1680+
if(CURLE_OK != curl_easy_setopt(s3fscurl->hCurl, CURLOPT_WRITEDATA, (void*)s3fscurl)){
1681+
return false;
1682+
}
1683+
if(!S3fsCurl::AddUserAgent(s3fscurl->hCurl)){ // put User-Agent
1684+
return false;
1685+
}
1686+
1687+
return true;
1688+
}
1689+
16311690
bool S3fsCurl::PreHeadRequestSetCurlOpts(S3fsCurl* s3fscurl)
16321691
{
16331692
if(!s3fscurl){
@@ -1761,6 +1820,7 @@ S3fsCurl::S3fsCurl(bool ahbe) :
17611820
hCurl(NULL), type(REQTYPE_UNSET), requestHeaders(NULL),
17621821
LastResponseCode(S3FSCURL_RESPONSECODE_NOTSET), postdata(NULL), postdata_remaining(0), is_use_ahbe(ahbe),
17631822
retry_count(0), b_infile(NULL), b_postdata(NULL), b_postdata_remaining(0), b_partdata_startpos(0), b_partdata_size(0),
1823+
b_partdata_streambuff(NULL), b_partdata_streampos(0),
17641824
b_ssekey_pos(-1), b_ssetype(sse_type_t::SSE_DISABLE),
17651825
sem(NULL), completed_tids_lock(NULL), completed_tids(NULL), fpLazySetup(NULL), curlCode(CURLE_OK)
17661826
{
@@ -2015,7 +2075,8 @@ bool S3fsCurl::RemakeHandle()
20152075
postdata_remaining = b_postdata_remaining;
20162076
partdata.startpos = b_partdata_startpos;
20172077
partdata.size = b_partdata_size;
2018-
2078+
partdata.streambuffer = b_partdata_streambuff;
2079+
partdata.streampos = b_partdata_streampos;
20192080
// reset handle
20202081
ResetHandle();
20212082

@@ -2285,6 +2346,18 @@ bool S3fsCurl::RemakeHandle()
22852346
}
22862347
break;
22872348

2349+
case REQTYPE_GET_STREAM:
2350+
if (CURLE_OK != curl_easy_setopt(hCurl, CURLOPT_URL, url.c_str())){
2351+
return false;
2352+
}
2353+
if (CURLE_OK != curl_easy_setopt(hCurl, CURLOPT_WRITEFUNCTION, S3fsCurl::DownloadWriteStreamCallback)){
2354+
return false;
2355+
}
2356+
if(CURLE_OK != curl_easy_setopt(hCurl, CURLOPT_WRITEDATA, (void*)this)){
2357+
return false;
2358+
}
2359+
break;
2360+
22882361
default:
22892362
S3FS_PRN_ERR("request type is unknown(%d)", type);
22902363
return false;
@@ -3502,6 +3575,94 @@ int S3fsCurl::GetObjectRequest(const char* tpath, int fd, off_t start, off_t siz
35023575
return result;
35033576
}
35043577

3578+
int S3fsCurl::PreGetObjectStreamRequest(const char* tpath, char* buf, off_t start, off_t size, sse_type_t ssetype, const std::string& ssevalue)
3579+
{
3580+
S3FS_PRN_INFO3("[tpath=%s][start=%lld][size=%lld]", SAFESTRPTR(tpath), static_cast<long long>(start), static_cast<long long>(size));
3581+
3582+
if (!tpath || !buf || 0 > start || 0 > size){
3583+
return -EINVAL;
3584+
}
3585+
3586+
std::string resource;
3587+
std::string turl;
3588+
MakeUrlResource(get_realpath(tpath).c_str(), resource, turl);
3589+
url = prepare_url(turl.c_str());
3590+
path = get_realpath(tpath);
3591+
requestHeaders = NULL;
3592+
responseHeaders.clear();
3593+
3594+
if(0 < size) {
3595+
std::string range = "bytes=";
3596+
range += str(start);
3597+
range += "-";
3598+
range += str(start + size - 1);
3599+
requestHeaders = curl_slist_sort_insert(requestHeaders, "Range", range.c_str());
3600+
requestHeaders = curl_slist_sort_insert(requestHeaders, "x-oss-range-behavior", "standard");
3601+
}
3602+
3603+
// SSE
3604+
if(!AddSseRequestHead(ssetype, ssevalue, true, false)){
3605+
S3FS_PRN_WARN("Failed to set SSE header, but continue...");
3606+
}
3607+
3608+
if(S3fsCurl::download_traffic_limit != 0){
3609+
char buff[64];
3610+
sprintf(buff, "%ld", S3fsCurl::download_traffic_limit);
3611+
requestHeaders = curl_slist_sort_insert(requestHeaders, "x-oss-traffic-limit", buff);
3612+
}
3613+
3614+
op = "GET";
3615+
type = REQTYPE_GET_STREAM;
3616+
3617+
// set lazy function
3618+
fpLazySetup = PreGetObjectStreamRequestSetCurlOpts;
3619+
3620+
// set info for callback func.
3621+
partdata.clear();
3622+
partdata.startpos = start;
3623+
partdata.size = size;
3624+
partdata.streambuffer = buf;
3625+
partdata.streampos = 0;
3626+
b_partdata_startpos = start;
3627+
b_partdata_size = size;
3628+
b_partdata_streambuff = buf;
3629+
b_partdata_streampos = 0;
3630+
b_ssetype = ssetype;
3631+
b_ssevalue = ssevalue;
3632+
b_ssekey_pos = -1;
3633+
3634+
return 0;
3635+
}
3636+
3637+
int S3fsCurl::GetObjectStreamRequest(const char* tpath, char* buf, off_t start, off_t size, ssize_t& rsize)
3638+
{
3639+
int result;
3640+
S3FS_PRN_INFO3("[tpath=%s][start=%lld][size=%lld]", SAFESTRPTR(tpath), static_cast<long long>(start), static_cast<long long>(size));
3641+
3642+
if(!tpath) {
3643+
return -EINVAL;
3644+
}
3645+
sse_type_t ssetype = sse_type_t::SSE_DISABLE;
3646+
std::string ssevalue;
3647+
if(!get_object_sse_type(tpath, ssetype, ssevalue)){
3648+
S3FS_PRN_WARN("Failed to get SSE type for file(%s).", SAFESTRPTR(tpath));
3649+
}
3650+
3651+
if(0 != (result = PreGetObjectStreamRequest(tpath, buf, start, size, ssetype, ssevalue))){
3652+
return result;
3653+
}
3654+
if(!fpLazySetup ||!fpLazySetup(this)){
3655+
S3FS_PRN_INFO3("Failed to lazy setup in single get object request.");
3656+
return -EIO;
3657+
}
3658+
3659+
result = RequestPerform();
3660+
rsize = (ssize_t)partdata.streampos;
3661+
partdata.clear();
3662+
3663+
return result;
3664+
}
3665+
35053666
int S3fsCurl::CheckBucket(const char* check_path)
35063667
{
35073668
S3FS_PRN_INFO3("check a bucket.");

src/curl.h

+8-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ class S3fsCurl
110110
REQTYPE_MULTILIST,
111111
REQTYPE_IAMCRED,
112112
REQTYPE_ABORTMULTIUPLOAD,
113-
REQTYPE_IAMROLE
113+
REQTYPE_IAMROLE,
114+
REQTYPE_GET_STREAM
114115
};
115116

116117
// class variables
@@ -182,6 +183,8 @@ class S3fsCurl
182183
off_t b_postdata_remaining; // backup for retrying
183184
off_t b_partdata_startpos; // backup for retrying
184185
off_t b_partdata_size; // backup for retrying
186+
char* b_partdata_streambuff;// backup for retrying
187+
off_t b_partdata_streampos; // backup for retrying
185188
size_t b_ssekey_pos; // backup for retrying
186189
std::string b_ssevalue; // backup for retrying
187190
sse_type_t b_ssetype; // backup for retrying
@@ -223,6 +226,7 @@ class S3fsCurl
223226
static size_t ReadCallback(void *ptr, size_t size, size_t nmemb, void *userp);
224227
static size_t UploadReadCallback(void *ptr, size_t size, size_t nmemb, void *userp);
225228
static size_t DownloadWriteCallback(void* ptr, size_t size, size_t nmemb, void* userp);
229+
static size_t DownloadWriteStreamCallback(void* ptr, size_t size, size_t nmenb, void* userp);
226230

227231
static bool UploadMultipartPostCallback(S3fsCurl* s3fscurl);
228232
static bool CopyMultipartPostCallback(S3fsCurl* s3fscurl);
@@ -237,6 +241,7 @@ class S3fsCurl
237241
static bool CopyMultipartPostSetCurlOpts(S3fsCurl* s3fscurl);
238242
static bool PreGetObjectRequestSetCurlOpts(S3fsCurl* s3fscurl);
239243
static bool PreHeadRequestSetCurlOpts(S3fsCurl* s3fscurl);
244+
static bool PreGetObjectStreamRequestSetCurlOpts(S3fsCurl* s3fscurl);
240245

241246
static bool LoadEnvSseCKeys();
242247
static bool LoadEnvSseKmsid();
@@ -372,6 +377,8 @@ class S3fsCurl
372377
int MultipartHeadRequest(const char* tpath, off_t size, headers_t& meta, bool is_copy);
373378
int MultipartUploadRequest(const std::string& upload_id, const char* tpath, int fd, off_t offset, off_t size, etagpair* petagpair);
374379
int MultipartRenameRequest(const char* from, const char* to, headers_t& meta, off_t size);
380+
int PreGetObjectStreamRequest(const char* tpath, char* buf, off_t start, off_t size, sse_type_t ssetype, const std::string& ssevalue);
381+
int GetObjectStreamRequest(const char* tpath, char* buf, off_t start, off_t size, ssize_t& rsize);
375382

376383
// methods(variables)
377384
CURL* GetCurlHandle() const { return hCurl; }

0 commit comments

Comments
 (0)