Skip to content

Commit

Permalink
feat: volo-grpc example googleapis https lb rpc endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
ii64 committed Mar 9, 2025
1 parent bcc7cf8 commit 5e97f3d
Show file tree
Hide file tree
Showing 10 changed files with 511 additions and 0 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ license.workspace = true
authors.workspace = true
publish = false

# googleapis_beyondcorp_appgateways_v1
[[bin]]
name = "googleapis_beyondcorp_appgateways_v1"
path = "src/grpc/googleapis_beyondcorp_appgateways_v1/client.rs"
# hello
[[bin]]
name = "hello-grpc-server"
Expand Down Expand Up @@ -138,6 +142,8 @@ volo-http = { path = "../volo-http", features = [
] }

volo-gen = { path = "./volo-gen" }
url.workspace = true
async-broadcast.workspace = true

[features]
__tls = []
Expand Down
61 changes: 61 additions & 0 deletions examples/proto/googleapis_beyondcorp_appgateways_v1.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
syntax = "proto3";

// Source:
// https://github.com/googleapis/googleapis/blob/df58085901d8fb80c2c021e405923bb2351a6f29/google/cloud/beyondcorp/appgateways/v1/app_gateways_service.proto#L48C9-L48C27
// With modification

package google.cloud.beyondcorp.appgateways.v1;

service AppGatewaysService {
// Lists AppGateways in a given project and location.
rpc ListAppGateways(ListAppGatewaysRequest)
returns (ListAppGatewaysResponse) {
}
}

// Request message for BeyondCorp.ListAppGateways.
message ListAppGatewaysRequest {
// Required. The resource name of the AppGateway location using the form:
// `projects/{project_id}/locations/{location_id}`
string parent = 1;

// Optional. The maximum number of items to return.
// If not specified, a default value of 50 will be used by the service.
// Regardless of the page_size value, the response may include a partial list
// and a caller should only rely on response's
// [next_page_token][BeyondCorp.ListAppGatewaysResponse.next_page_token] to
// determine if there are more instances left to be queried.
int32 page_size = 2;

// Optional. The next_page_token value returned from a previous
// ListAppGatewaysRequest, if any.
string page_token = 3;

// Optional. A filter specifying constraints of a list operation.
string filter = 4;

// Optional. Specifies the ordering of results. See
// [Sorting
// order](https://cloud.google.com/apis/design/design_patterns#sorting_order)
// for more information.
string order_by = 5;
}

// Response message for BeyondCorp.ListAppGateways.
message ListAppGatewaysResponse {
// A list of BeyondCorp AppGateways in the project.
repeated AppGateway app_gateways = 1;

// A token to retrieve the next page of results, or empty if there are no more
// results in the list.
string next_page_token = 2;

// A list of locations that could not be reached.
repeated string unreachable = 3;
}

message AppGateway {
// Required. Unique resource name of the AppGateway.
// The name is ignored when creating an AppGateway.
string name = 1;
}
26 changes: 26 additions & 0 deletions examples/src/grpc/googleapis_beyondcorp_appgateways_v1/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use rpc_provider::RpcProvider;
use volo_gen::proto_gen::google::cloud::beyondcorp::appgateways::v1;

mod header;
mod endpoint;
mod discover;
mod rpc_provider;

#[volo::main]
async fn main() {
let provider = RpcProvider::new();

let endpoint = "https://beyondcorp.googleapis.com:443/".parse().unwrap();
let client = provider.app_gateway_service(endpoint);

let req = volo_grpc::Request::new(v1::ListAppGatewaysRequest {
parent: "".into(),
page_size: 20,
filter: "".into(),
order_by: "".into(),
page_token: "".into(),
});

let resp = client.list_app_gateways(req).await;
println!("resp = {:#?}", resp);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use std::sync::Arc;

use anyhow::anyhow;
use async_broadcast::Receiver;
use volo::{
context::Endpoint,
discovery::{Discover, Instance},
loadbalance::error::LoadBalanceError,
};
use volo_http::client::dns::DnsResolver;

#[derive(Clone)]
pub struct NoCacheDiscover<T>(T);

impl<T> NoCacheDiscover<T> {
pub fn new(inner: T) -> Self {
Self(inner)
}
}

impl<T> Discover for NoCacheDiscover<T>
where
T: Discover,
{
type Key = ();
type Error = T::Error;

async fn discover(
&self,
endpoint: &Endpoint,
) -> Result<Vec<Arc<Instance>>, Self::Error> {
self.0.discover(endpoint).await
}

fn key(&self, endpoint: &Endpoint) -> Self::Key {}

fn watch(
&self,
keys: Option<&[Self::Key]>,
) -> Option<Receiver<volo::discovery::Change<Self::Key>>> {
None
}
}

#[derive(Clone)]
pub struct ConstantDnsDiscover {
resolver: DnsResolver,
service_name: String,
host: String,
port: u16,
}

impl ConstantDnsDiscover {
pub fn new(
resolver: DnsResolver,
service_name: String,
host: String,
port: u16,
) -> Self {
Self { resolver, service_name, host, port }
}
}

impl Discover for ConstantDnsDiscover {
type Key = ();
type Error = LoadBalanceError;

async fn discover<'s>(
&'s self,
endpoint: &'s Endpoint,
) -> Result<Vec<Arc<Instance>>, Self::Error> {
let mut endpoint = Endpoint::new(self.service_name.clone().into());
let addr =
self.resolver.resolve(&self.host, self.port).await.ok_or_else(
|| {
LoadBalanceError::Discover(
anyhow!("unable to resolve: {}", &self.host).into(),
)
},
)?;
endpoint.set_address(addr);
self.resolver.discover(&endpoint).await
}

fn key(&self, endpoint: &Endpoint) -> Self::Key {}

fn watch(
&self,
keys: Option<&[Self::Key]>,
) -> Option<Receiver<volo::discovery::Change<Self::Key>>> {
None
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use std::{
net::{SocketAddr, SocketAddrV4, SocketAddrV6},
sync::Arc,
};

use constant_dns::ConstantDnsDiscover;
use url::Host;
use volo::{
context::Endpoint,
discovery::{Discover, Instance, StaticDiscover},
loadbalance::error::LoadBalanceError,
net::Address,
};
use volo_http::client::dns::DnsResolver;

use super::endpoint::RpcEndpoint;

pub mod constant_dns;

struct LazyDiscoverInternal {
endpoint: RpcEndpoint,
resolver: DnsResolver,
}

#[derive(Clone)]
pub struct LazyDiscover {
inner: Arc<LazyDiscoverInternal>,
}

impl LazyDiscover {
pub fn new(endpoint: RpcEndpoint) -> Self {
let resolver = DnsResolver::default();
Self { inner: Arc::new(LazyDiscoverInternal { endpoint, resolver }) }
}
}

impl Discover for LazyDiscover {
type Key = ();
type Error = LoadBalanceError;

async fn discover<'s>(
&'s self,
endpoint: &'s Endpoint,
) -> Result<Vec<Arc<Instance>>, Self::Error> {
let ep = self.inner.endpoint.clone();
match ep.host {
Host::Domain(domain) => {
ConstantDnsDiscover::new(
self.inner.resolver.clone(),
domain.clone(),
domain,
ep.port,
)
.discover(endpoint)
.await
}
Host::Ipv4(ip) => StaticDiscover::new(vec![Arc::new(Instance {
address: Address::Ip(SocketAddr::V4(SocketAddrV4::new(
ip, ep.port,
))),
weight: 1,
tags: Default::default(),
})])
.discover(endpoint)
.await
.map_err(|_e| LoadBalanceError::Retry),
Host::Ipv6(ip) => StaticDiscover::new(vec![Arc::new(Instance {
address: Address::Ip(SocketAddr::V6(SocketAddrV6::new(
ip, ep.port, 0, 0,
))),
weight: 1,
tags: Default::default(),
})])
.discover(endpoint)
.await
.map_err(|_e| LoadBalanceError::Retry),
}
}

fn key(&self, _endpoint: &volo::context::Endpoint) -> Self::Key {}

fn watch(
&self,
_keys: Option<&[Self::Key]>,
) -> Option<async_broadcast::Receiver<volo::discovery::Change<Self::Key>>>
{
None
}
}
59 changes: 59 additions & 0 deletions examples/src/grpc/googleapis_beyondcorp_appgateways_v1/endpoint.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use std::str::FromStr;

use anyhow::anyhow;
use url::{Host, Url};


#[derive(Clone, PartialEq, Eq, Debug)]
pub struct RpcEndpoint {
pub host: Host,
pub port: u16,
pub server_name: Option<String>,
pub tls: bool,
}

impl RpcEndpoint {
pub fn parse(s: &str) -> Result<RpcEndpoint, anyhow::Error> {
let u = Url::parse(s)?;
let host = match u.host().ok_or_else(|| anyhow!("missing host"))? {
Host::Domain(domain) => Host::Domain(domain.to_string()),
Host::Ipv4(ip) => Host::Ipv4(ip),
Host::Ipv6(ip) => Host::Ipv6(ip),
};

let port = u
.port_or_known_default()
.ok_or_else(|| anyhow!("unknown schema for port"))?;
let server_name = if let Host::Domain(ref server_name) = host {
Some(server_name.clone())
} else {
None
};

let tls = ["https", "tls", "xds"].contains(&u.scheme());

Ok(RpcEndpoint { host, port, server_name, tls })
}

pub fn uri(&self) -> http::Uri {
let scheme = if self.tls { "https" } else { "http" };
let authority = match (scheme, self.port) {
("https", 443) | ("http", 80) => self.host.to_string(),
_ => format!("{}:{}", self.host.to_string(), self.port),
};
http::Uri::builder()
.scheme(scheme)
.authority(authority)
.path_and_query("/")
.build()
.expect("rpc endpoint uri build")
}
}

impl FromStr for RpcEndpoint {
type Err = anyhow::Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Self::parse(s)
}
}
Loading

0 comments on commit 5e97f3d

Please sign in to comment.