Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/generated/cloud/compute/v1/src/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use crate::model::Operation;
use gax::error::rpc::Status;

impl lro::internal::DiscoveryOperation for Operation {
fn name(&self) -> Option<&String> {
Expand All @@ -22,7 +21,4 @@ impl lro::internal::DiscoveryOperation for Operation {
fn done(&self) -> bool {
self.status == Some(crate::model::operation::Status::Done)
}
fn status(&self) -> Option<Status> {
None
}
}
135 changes: 7 additions & 128 deletions src/lro/src/internal/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
//! we can name directly.
//!

use crate::{Error, Poller, PollingBackoffPolicy, PollingErrorPolicy, PollingResult, Result};
use gax::error::rpc::Status;
use crate::{Poller, PollingBackoffPolicy, PollingErrorPolicy, PollingResult, Result};
use gax::polling_state::PollingState;
use gax::retry_result::RetryResult;
use std::sync::Arc;
Expand All @@ -49,13 +48,6 @@ pub trait DiscoveryOperation {
///
/// It may be `None` in which case the polling loop stops.
fn name(&self) -> Option<&String>;

/// Determines if the operation completed with an error.
///
/// If the operation completed successfully this returns `None`. On an error
/// the trait must convert the error details to `gax::error::rpc::Status` as
/// it always indicates a service error.
fn status(&self) -> Option<Status>;
}

pub fn new_discovery_poller<S, SF, Q, QF, O>(
Expand Down Expand Up @@ -170,7 +162,7 @@ where
{
match result {
Err(ref _e) => (None, PollingResult::Completed(result)),
Ok(o) if o.done() => (None, handle_polling_done(o)),
Ok(o) if o.done() => (None, PollingResult::Completed(Ok(o))),
Ok(o) => handle_polling_success(o),
}
}
Expand All @@ -189,7 +181,7 @@ where
let state = error_policy.on_error(state, e);
self::handle_polling_error(state, operation_name)
}
Ok(o) if o.done() => (None, handle_polling_done(o)),
Ok(o) if o.done() => (None, PollingResult::Completed(Ok(o))),
Ok(o) => handle_polling_success(o),
}
}
Expand All @@ -209,34 +201,22 @@ where
}
}

fn handle_polling_done<O>(o: O) -> PollingResult<O, O>
where
O: DiscoveryOperation,
{
match o.status() {
None => PollingResult::Completed(Ok(o)),
Some(s) => PollingResult::Completed(Err(Error::service(s))),
}
}

fn handle_polling_success<O>(o: O) -> (Option<String>, PollingResult<O, O>)
where
O: DiscoveryOperation,
{
match o.status() {
Some(s) => (None, PollingResult::Completed(Err(Error::service(s)))),
None => (o.name().cloned(), PollingResult::InProgress(Some(o))),
}
(o.name().cloned(), PollingResult::InProgress(Some(o)))
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use super::*;
use crate::Error;
use gax::error::rpc::Code;
use gax::error::rpc::Status;
use gax::exponential_backoff::{ExponentialBackoff, ExponentialBackoffBuilder};
use gax::polling_error_policy::{Aip194Strict, AlwaysContinue};
use std::time::Duration;

#[tokio::test]
async fn poller_until_done_success() {
Expand Down Expand Up @@ -322,40 +302,6 @@ mod tests {
);
}

#[tokio::test]
async fn poller_until_done_error_on_done() {
let start = || async move {
let op = TestOperation {
name: Some("start-name".into()),
..TestOperation::default()
};
Ok(op)
};
let query = |_name| async move {
let op = TestOperation {
done: true,
status: Some(permanent_status()),
..TestOperation::default()
};
Ok(op)
};
let got = new_discovery_poller(
Arc::new(AlwaysContinue),
Arc::new(test_backoff()),
start,
query,
)
.until_done()
.await;
assert!(
matches!(
got,
Err(ref e) if e.status() == Some(&permanent_status())
),
"{got:?}"
);
}

#[tokio::test]
async fn poller_until_done_error_on_start() {
let start = || async move { Err(Error::service(permanent_status())) };
Expand Down Expand Up @@ -446,21 +392,6 @@ mod tests {
assert!(matches!(&got.1, PollingResult::Completed(Ok(_))), "{got:?}");
}

#[test]
fn start_done_with_error() {
let input = TestOperation {
done: true,
status: Some(permanent_status()),
..TestOperation::default()
};
let got = handle_start(Ok(input));
assert!(got.0.is_none(), "{got:?}");
assert!(
matches!(&got.1, PollingResult::Completed(Err(_))),
"{got:?}"
);
}

#[test]
fn start_in_progress() {
let input = TestOperation {
Expand Down Expand Up @@ -497,37 +428,20 @@ mod tests {
let input = TestOperation {
done: true,
name: Some("in-progress".into()),
status: None,
..TestOperation::default()
};
let got = handle_poll(Arc::new(policy), &state, "started".to_string(), Ok(input));
assert!(got.0.is_none(), "{got:?}");
assert!(matches!(got.1, PollingResult::Completed(Ok(_))), "{got:?}");
}

#[test]
fn poll_done_error() {
let policy = Aip194Strict;
let state = PollingState::default();
let input = TestOperation {
done: true,
name: Some("in-progress".into()),
status: Some(permanent_status()),
..TestOperation::default()
};
let got = handle_poll(Arc::new(policy), &state, "started".to_string(), Ok(input));
assert!(got.0.is_none(), "{got:?}");
assert!(matches!(got.1, PollingResult::Completed(Err(_))), "{got:?}");
}

#[test]
fn poll_in_progress() {
let policy = Aip194Strict;
let state = PollingState::default();
let input = TestOperation {
done: false,
name: Some("in-progress".into()),
status: None,
..TestOperation::default()
};
let got = handle_poll(Arc::new(policy), &state, "started".to_string(), Ok(input));
Expand Down Expand Up @@ -568,39 +482,8 @@ mod tests {
);
}

#[test]
fn polling_done() {
let input = TestOperation {
status: Some(transient_status()),
..TestOperation::default()
};
let got = handle_polling_done(input);
assert!(
matches!(&got, PollingResult::Completed(Err(e)) if e.status() == Some(&transient_status())),
"{got:?}"
);

let input = TestOperation {
status: None,
..TestOperation::default()
};
let got = handle_polling_done(input);
assert!(matches!(&got, PollingResult::Completed(Ok(_))), "{got:?}");
}

#[test]
fn polling_success() {
let input = TestOperation {
status: Some(transient_status()),
..TestOperation::default()
};
let got = handle_polling_success(input);
assert!(got.0.is_none(), "{got:?}");
assert!(
matches!(&got.1, PollingResult::Completed(Err(e)) if e.status() == Some(&transient_status())),
"{got:?}"
);

let input = TestOperation {
name: Some("in-progress".to_string()),
..TestOperation::default()
Expand Down Expand Up @@ -645,7 +528,6 @@ mod tests {
struct TestOperation {
done: bool,
name: Option<String>,
status: Option<gax::error::rpc::Status>,
value: Option<i32>,
}

Expand All @@ -656,8 +538,5 @@ mod tests {
fn name(&self) -> Option<&String> {
self.name.as_ref()
}
fn status(&self) -> Option<Status> {
self.status.clone()
}
}
}
Loading