Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(services/azdls): list start from and sas token support #5242

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,11 @@ impl<A: Access> CompleteAccessor<A> {
(true, false) => {
// Forward path that ends with /
if path.ends_with('/') {
let p = FlatLister::new(self.inner.clone(), path);
let p = FlatLister::new(self.inner.clone(), path, args);
Ok((RpList::default(), CompleteLister::Two(p)))
} else {
let parent = get_parent(path);
let p = FlatLister::new(self.inner.clone(), parent);
let p = FlatLister::new(self.inner.clone(), parent, args);
let p = PrefixLister::new(p, path);
Ok((RpList::default(), CompleteLister::Four(p)))
}
Expand Down Expand Up @@ -336,11 +336,11 @@ impl<A: Access> CompleteAccessor<A> {
(true, false) => {
// Forward path that ends with /
if path.ends_with('/') {
let p = FlatLister::new(self.inner.clone(), path);
let p = FlatLister::new(self.inner.clone(), path, args);
Ok((RpList::default(), CompleteLister::Two(p)))
} else {
let parent = get_parent(path);
let p = FlatLister::new(self.inner.clone(), parent);
let p = FlatLister::new(self.inner.clone(), parent, args);
let p = PrefixLister::new(p, path);
Ok((RpList::default(), CompleteLister::Four(p)))
}
Expand Down
8 changes: 5 additions & 3 deletions core/src/raw/oio/list/flat_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub struct FlatLister<A: Access, L> {

next_dir: Option<oio::Entry>,
active_lister: Vec<(Option<oio::Entry>, L)>,
args: OpList,
}

/// # Safety
Expand All @@ -75,11 +76,12 @@ where
A: Access,
{
/// Create a new flat lister
pub fn new(acc: A, path: &str) -> FlatLister<A, L> {
pub fn new(acc: A, path: &str, args: OpList) -> FlatLister<A, L> {
FlatLister {
acc,
next_dir: Some(oio::Entry::new(path, Metadata::new(EntryMode::DIR))),
active_lister: vec![],
args,
}
}
}
Expand All @@ -92,7 +94,7 @@ where
async fn next(&mut self) -> Result<Option<oio::Entry>> {
loop {
if let Some(de) = self.next_dir.take() {
let (_, l) = self.acc.list(de.path(), OpList::new()).await?;
let (_, l) = self.acc.list(de.path(), self.args.clone()).await?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit tricky since FlatLister is used to simulate recursive listing on unsupported services. Therefore, we can't forward OpList directly. Instead, let's introduce a start_after here, and FlatLister should ensure the OpList passed to undering are constructed correctly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

meaning OpList::new() with start_after set to a self.start_after value? or am I misinterpreting?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

meaning OpList::new() with start_after set to a self.start_after value?

Yep.

self.active_lister.push((Some(de), l));
}

Expand Down Expand Up @@ -242,7 +244,7 @@ mod tests {
let _ = tracing_subscriber::fmt().with_test_writer().try_init();

let acc = MockService::new();
let mut lister = FlatLister::new(acc, "x/");
let mut lister = FlatLister::new(acc, "x/", OpList::new());

let mut entries = Vec::default();

Expand Down
21 changes: 19 additions & 2 deletions core/src/services/azdls/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,15 @@ impl AzdlsBuilder {
self
}

/// Set sas_token of this backend.
pub fn sas_token(mut self, sas_token: &str) -> Self {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reqsign has sas_token support; we can use it here: https://docs.rs/reqsign/0.16.0/reqsign/struct.AzureStorageConfig.html#structfield.sas_token

Maybe it's worth a separate PR?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting we use reqsign::AzureStorageConfig on AzdlsBuilder or AzdlsConfig or something else?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can construct like s3 does:

// This is our current config.
let mut cfg = AwsConfig::default();
if !self.config.disable_config_load {
#[cfg(not(target_arch = "wasm32"))]
{
cfg = cfg.from_profile();
cfg = cfg.from_env();
}
}
if let Some(v) = self.config.region.clone() {
cfg.region = Some(v);
}
if cfg.region.is_none() {
return Err(Error::new(
ErrorKind::ConfigInvalid,
"region is missing. Please find it by S3::detect_region() or set them in env.",
)
.with_operation("Builder::build")
.with_context("service", Scheme::S3));
}
let region = cfg.region.to_owned().unwrap();
debug!("backend use region: {region}");
// Building endpoint.
let endpoint = self.build_endpoint(&region);
debug!("backend use endpoint: {endpoint}");
// Setting all value from user input if available.
if let Some(v) = self.config.access_key_id {
cfg.access_key_id = Some(v)
}
if let Some(v) = self.config.secret_access_key {
cfg.secret_access_key = Some(v)
}
if let Some(v) = self.config.session_token {
cfg.session_token = Some(v)
}

if !sas_token.is_empty() {
self.config.sas_token = Some(sas_token.to_string());
}

self
}

/// Specify the http client that used by this service.
///
/// # Notes
Expand Down Expand Up @@ -188,7 +197,7 @@ impl Builder for AzdlsBuilder {
.clone()
.or_else(|| infer_storage_name_from_endpoint(endpoint.as_str())),
account_key: self.config.account_key.clone(),
sas_token: None,
sas_token: self.config.sas_token.clone(),
..Default::default()
};

Expand All @@ -203,6 +212,7 @@ impl Builder for AzdlsBuilder {
loader: cred_loader,
signer,
}),
has_sas_token: self.config.sas_token.is_some(),
})
}
}
Expand All @@ -211,6 +221,7 @@ impl Builder for AzdlsBuilder {
#[derive(Debug, Clone)]
pub struct AzdlsBackend {
core: Arc<AzdlsCore>,
has_sas_token: bool,
}

impl Access for AzdlsBackend {
Expand Down Expand Up @@ -238,6 +249,12 @@ impl Access for AzdlsBackend {
rename: true,

list: true,
list_with_start_after: true,

presign: self.has_sas_token,
presign_stat: self.has_sas_token,
presign_read: self.has_sas_token,
presign_write: self.has_sas_token,

..Default::default()
});
Expand Down Expand Up @@ -347,7 +364,7 @@ impl Access for AzdlsBackend {
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
let l = AzdlsLister::new(self.core.clone(), path.to_string(), args.limit());
let l = AzdlsLister::new(self.core.clone(), path.to_string(), args);

Ok((RpList::default(), oio::PageLister::new(l)))
}
Expand Down
5 changes: 5 additions & 0 deletions core/src/services/azdls/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ pub struct AzdlsConfig {
pub account_name: Option<String>,
/// Account key of this backend.
pub account_key: Option<String>,
/// SAS token of this backend.
pub sas_token: Option<String>,
}

impl Debug for AzdlsConfig {
Expand All @@ -50,6 +52,9 @@ impl Debug for AzdlsConfig {
if self.account_key.is_some() {
ds.field("account_key", &"<redacted>");
}
if self.sas_token.is_some() {
ds.field("sas_token", &"<redacted>");
}

ds.finish()
}
Expand Down
12 changes: 12 additions & 0 deletions core/src/services/azdls/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ use reqsign::AzureStorageSigner;
use crate::raw::*;
use crate::*;

use super::lister::generate_continuation_from_start_after;

const X_MS_RENAME_SOURCE: &str = "x-ms-rename-source";
const X_MS_VERSION: &str = "x-ms-version";

Expand Down Expand Up @@ -258,6 +260,7 @@ impl AzdlsCore {
pub async fn azdls_list(
&self,
path: &str,
start_after: Option<&str>,
continuation: &str,
limit: Option<usize>,
) -> Result<Response<Buffer>> {
Expand All @@ -276,9 +279,18 @@ impl AzdlsCore {
if let Some(limit) = limit {
write!(url, "&maxResults={limit}").expect("write into string must succeed");
}

if !continuation.is_empty() {
write!(url, "&continuation={}", percent_encode_path(continuation))
.expect("write into string must succeed");
} else if let Some(start_after) = start_after {
println!("start_after: {}", start_after);
write!(
url,
"&continuation={}",
generate_continuation_from_start_after(start_after)
)
.expect("write into string must succeed");
}

let mut req = Request::get(&url)
Expand Down
Loading
Loading