Skip to content

Commit 1542262

Browse files
committed
Merge #98: Implement retryable calls
b96270e feat(async,blocking)!: implement retryable calls (valued mammal) Pull request description: Based on #93 the PR implements retryable calls for request failure due to too many requests (429) or service unavailable (503). Inspired by #71 h/t @e1a0a0ea ### Notes I've added the field `max_retries` to the `Builder`. `max_retries` is also added to each of `AsyncClient`, `BlockingClient`. I added the dependency `async-std` in order to have async `sleep`. Instead of implementing a trait on the `Request` type as in #71, the approach is to add a method on the client that sends a get request to a url and returns the response after allowing for retries. I tested on the bdk `wallet_esplora_*` example crates against https://blockstream.info/testnet/api and it seemed to resolve the 429 issue. ACKs for top commit: oleonardolima: ACK b96270e notmandatory: tACK b96270e Tree-SHA512: 78124106959ba9a5cce58e343bbf30c29b4bc7e1ac434ba71eeb2e774d73ea003d0520139781062947ed27563748925c24998b2a5be450bc511bb2c7090a6682
2 parents ce6a635 + b96270e commit 1542262

File tree

4 files changed

+116
-25
lines changed

4 files changed

+116
-25
lines changed

Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ hex = { version = "0.2", package = "hex-conservative" }
2323
log = "^0.4"
2424
minreq = { version = "2.11.0", features = ["json-using-serde"], optional = true }
2525
reqwest = { version = "0.11", features = ["json"], default-features = false, optional = true }
26+
async-std = { version = "1.13.0", optional = true }
2627

2728
[dev-dependencies]
2829
serde_json = "1.0"
@@ -37,7 +38,7 @@ blocking-https = ["blocking", "minreq/https"]
3738
blocking-https-rustls = ["blocking", "minreq/https-rustls"]
3839
blocking-https-native = ["blocking", "minreq/https-native"]
3940
blocking-https-bundled = ["blocking", "minreq/https-bundled"]
40-
async = ["reqwest", "reqwest/socks"]
41+
async = ["async-std", "reqwest", "reqwest/socks"]
4142
async-https = ["async", "reqwest/default-tls"]
4243
async-https-native = ["async", "reqwest/native-tls"]
4344
async-https-rustls = ["async", "reqwest/rustls-tls"]

src/async.rs

+44-8
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
//! Esplora by way of `reqwest` HTTP client.
1313
14+
use async_std::task;
1415
use std::collections::HashMap;
1516
use std::str::FromStr;
1617

@@ -24,16 +25,21 @@ use bitcoin::{
2425
#[allow(unused_imports)]
2526
use log::{debug, error, info, trace};
2627

27-
use reqwest::{header, Client};
28+
use reqwest::{header, Client, Response};
2829

29-
use crate::{BlockStatus, BlockSummary, Builder, Error, MerkleProof, OutputStatus, Tx, TxStatus};
30+
use crate::{
31+
BlockStatus, BlockSummary, Builder, Error, MerkleProof, OutputStatus, Tx, TxStatus,
32+
BASE_BACKOFF_MILLIS, RETRYABLE_ERROR_CODES,
33+
};
3034

3135
#[derive(Debug, Clone)]
3236
pub struct AsyncClient {
3337
/// The URL of the Esplora Server.
3438
url: String,
3539
/// The inner [`reqwest::Client`] to make HTTP requests.
3640
client: Client,
41+
/// Number of times to retry a request
42+
max_retries: usize,
3743
}
3844

3945
impl AsyncClient {
@@ -63,12 +69,20 @@ impl AsyncClient {
6369
client_builder = client_builder.default_headers(headers);
6470
}
6571

66-
Ok(Self::from_client(builder.base_url, client_builder.build()?))
72+
Ok(AsyncClient {
73+
url: builder.base_url,
74+
client: client_builder.build()?,
75+
max_retries: builder.max_retries,
76+
})
6777
}
6878

6979
/// Build an async client from the base url and [`Client`]
7080
pub fn from_client(url: String, client: Client) -> Self {
71-
AsyncClient { url, client }
81+
AsyncClient {
82+
url,
83+
client,
84+
max_retries: crate::DEFAULT_MAX_RETRIES,
85+
}
7286
}
7387

7488
/// Make an HTTP GET request to given URL, deserializing to any `T` that
@@ -84,7 +98,7 @@ impl AsyncClient {
8498
/// [`bitcoin::consensus::Decodable`] deserialization.
8599
async fn get_response<T: Decodable>(&self, path: &str) -> Result<T, Error> {
86100
let url = format!("{}{}", self.url, path);
87-
let response = self.client.get(url).send().await?;
101+
let response = self.get_with_retry(&url).await?;
88102

89103
if !response.status().is_success() {
90104
return Err(Error::HttpResponse {
@@ -124,7 +138,7 @@ impl AsyncClient {
124138
path: &str,
125139
) -> Result<T, Error> {
126140
let url = format!("{}{}", self.url, path);
127-
let response = self.client.get(url).send().await?;
141+
let response = self.get_with_retry(&url).await?;
128142

129143
if !response.status().is_success() {
130144
return Err(Error::HttpResponse {
@@ -166,7 +180,7 @@ impl AsyncClient {
166180
/// [`bitcoin::consensus::Decodable`] deserialization.
167181
async fn get_response_hex<T: Decodable>(&self, path: &str) -> Result<T, Error> {
168182
let url = format!("{}{}", self.url, path);
169-
let response = self.client.get(url).send().await?;
183+
let response = self.get_with_retry(&url).await?;
170184

171185
if !response.status().is_success() {
172186
return Err(Error::HttpResponse {
@@ -203,7 +217,7 @@ impl AsyncClient {
203217
/// This function will return an error either from the HTTP client.
204218
async fn get_response_text(&self, path: &str) -> Result<String, Error> {
205219
let url = format!("{}{}", self.url, path);
206-
let response = self.client.get(url).send().await?;
220+
let response = self.get_with_retry(&url).await?;
207221

208222
if !response.status().is_success() {
209223
return Err(Error::HttpResponse {
@@ -410,4 +424,26 @@ impl AsyncClient {
410424
pub fn client(&self) -> &Client {
411425
&self.client
412426
}
427+
428+
/// Sends a GET request to the given `url`, retrying failed attempts
429+
/// for retryable error codes until max retries hit.
430+
async fn get_with_retry(&self, url: &str) -> Result<Response, Error> {
431+
let mut delay = BASE_BACKOFF_MILLIS;
432+
let mut attempts = 0;
433+
434+
loop {
435+
match self.client.get(url).send().await? {
436+
resp if attempts < self.max_retries && is_status_retryable(resp.status()) => {
437+
task::sleep(delay).await;
438+
attempts += 1;
439+
delay *= 2;
440+
}
441+
resp => return Ok(resp),
442+
}
443+
}
444+
}
445+
}
446+
447+
fn is_status_retryable(status: reqwest::StatusCode) -> bool {
448+
RETRYABLE_ERROR_CODES.contains(&status.as_u16())
413449
}

src/blocking.rs

+46-16
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,12 @@
1414
use std::collections::HashMap;
1515
use std::convert::TryFrom;
1616
use std::str::FromStr;
17+
use std::thread;
1718

1819
#[allow(unused_imports)]
1920
use log::{debug, error, info, trace};
2021

21-
use minreq::{Proxy, Request};
22+
use minreq::{Proxy, Request, Response};
2223

2324
use bitcoin::consensus::{deserialize, serialize, Decodable};
2425
use bitcoin::hashes::{sha256, Hash};
@@ -27,7 +28,10 @@ use bitcoin::{
2728
block::Header as BlockHeader, Block, BlockHash, MerkleBlock, Script, Transaction, Txid,
2829
};
2930

30-
use crate::{BlockStatus, BlockSummary, Builder, Error, MerkleProof, OutputStatus, Tx, TxStatus};
31+
use crate::{
32+
BlockStatus, BlockSummary, Builder, Error, MerkleProof, OutputStatus, Tx, TxStatus,
33+
BASE_BACKOFF_MILLIS, RETRYABLE_ERROR_CODES,
34+
};
3135

3236
#[derive(Debug, Clone)]
3337
pub struct BlockingClient {
@@ -39,6 +43,8 @@ pub struct BlockingClient {
3943
pub timeout: Option<u64>,
4044
/// HTTP headers to set on every request made to Esplora server
4145
pub headers: HashMap<String, String>,
46+
/// Number of times to retry a request
47+
pub max_retries: usize,
4248
}
4349

4450
impl BlockingClient {
@@ -49,6 +55,7 @@ impl BlockingClient {
4955
proxy: builder.proxy,
5056
timeout: builder.timeout,
5157
headers: builder.headers,
58+
max_retries: builder.max_retries,
5259
}
5360
}
5461

@@ -80,20 +87,20 @@ impl BlockingClient {
8087
}
8188

8289
fn get_opt_response<T: Decodable>(&self, path: &str) -> Result<Option<T>, Error> {
83-
match self.get_request(path)?.send() {
90+
match self.get_with_retry(path) {
8491
Ok(resp) if is_status_not_found(resp.status_code) => Ok(None),
8592
Ok(resp) if !is_status_ok(resp.status_code) => {
8693
let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?;
8794
let message = resp.as_str().unwrap_or_default().to_string();
8895
Err(Error::HttpResponse { status, message })
8996
}
9097
Ok(resp) => Ok(Some(deserialize::<T>(resp.as_bytes())?)),
91-
Err(e) => Err(Error::Minreq(e)),
98+
Err(e) => Err(e),
9299
}
93100
}
94101

95102
fn get_opt_response_txid(&self, path: &str) -> Result<Option<Txid>, Error> {
96-
match self.get_request(path)?.send() {
103+
match self.get_with_retry(path) {
97104
Ok(resp) if is_status_not_found(resp.status_code) => Ok(None),
98105
Ok(resp) if !is_status_ok(resp.status_code) => {
99106
let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?;
@@ -103,12 +110,12 @@ impl BlockingClient {
103110
Ok(resp) => Ok(Some(
104111
Txid::from_str(resp.as_str().map_err(Error::Minreq)?).map_err(Error::HexToArray)?,
105112
)),
106-
Err(e) => Err(Error::Minreq(e)),
113+
Err(e) => Err(e),
107114
}
108115
}
109116

110117
fn get_opt_response_hex<T: Decodable>(&self, path: &str) -> Result<Option<T>, Error> {
111-
match self.get_request(path)?.send() {
118+
match self.get_with_retry(path) {
112119
Ok(resp) if is_status_not_found(resp.status_code) => Ok(None),
113120
Ok(resp) if !is_status_ok(resp.status_code) => {
114121
let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?;
@@ -122,12 +129,12 @@ impl BlockingClient {
122129
.map_err(Error::BitcoinEncoding)
123130
.map(|r| Some(r))
124131
}
125-
Err(e) => Err(Error::Minreq(e)),
132+
Err(e) => Err(e),
126133
}
127134
}
128135

129136
fn get_response_hex<T: Decodable>(&self, path: &str) -> Result<T, Error> {
130-
match self.get_request(path)?.send() {
137+
match self.get_with_retry(path) {
131138
Ok(resp) if !is_status_ok(resp.status_code) => {
132139
let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?;
133140
let message = resp.as_str().unwrap_or_default().to_string();
@@ -138,51 +145,51 @@ impl BlockingClient {
138145
let hex_vec = Vec::from_hex(hex_str).unwrap();
139146
deserialize::<T>(&hex_vec).map_err(Error::BitcoinEncoding)
140147
}
141-
Err(e) => Err(Error::Minreq(e)),
148+
Err(e) => Err(e),
142149
}
143150
}
144151

145152
fn get_response_json<'a, T: serde::de::DeserializeOwned>(
146153
&'a self,
147154
path: &'a str,
148155
) -> Result<T, Error> {
149-
let response = self.get_request(path)?.send();
156+
let response = self.get_with_retry(path);
150157
match response {
151158
Ok(resp) if !is_status_ok(resp.status_code) => {
152159
let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?;
153160
let message = resp.as_str().unwrap_or_default().to_string();
154161
Err(Error::HttpResponse { status, message })
155162
}
156163
Ok(resp) => Ok(resp.json::<T>().map_err(Error::Minreq)?),
157-
Err(e) => Err(Error::Minreq(e)),
164+
Err(e) => Err(e),
158165
}
159166
}
160167

161168
fn get_opt_response_json<T: serde::de::DeserializeOwned>(
162169
&self,
163170
path: &str,
164171
) -> Result<Option<T>, Error> {
165-
match self.get_request(path)?.send() {
172+
match self.get_with_retry(path) {
166173
Ok(resp) if is_status_not_found(resp.status_code) => Ok(None),
167174
Ok(resp) if !is_status_ok(resp.status_code) => {
168175
let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?;
169176
let message = resp.as_str().unwrap_or_default().to_string();
170177
Err(Error::HttpResponse { status, message })
171178
}
172179
Ok(resp) => Ok(Some(resp.json::<T>()?)),
173-
Err(e) => Err(Error::Minreq(e)),
180+
Err(e) => Err(e),
174181
}
175182
}
176183

177184
fn get_response_str(&self, path: &str) -> Result<String, Error> {
178-
match self.get_request(path)?.send() {
185+
match self.get_with_retry(path) {
179186
Ok(resp) if !is_status_ok(resp.status_code) => {
180187
let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?;
181188
let message = resp.as_str().unwrap_or_default().to_string();
182189
Err(Error::HttpResponse { status, message })
183190
}
184191
Ok(resp) => Ok(resp.as_str()?.to_string()),
185-
Err(e) => Err(Error::Minreq(e)),
192+
Err(e) => Err(e),
186193
}
187194
}
188195

@@ -339,6 +346,24 @@ impl BlockingClient {
339346
};
340347
self.get_response_json(&path)
341348
}
349+
350+
/// Sends a GET request to the given `url`, retrying failed attempts
351+
/// for retryable error codes until max retries hit.
352+
pub fn get_with_retry(&self, url: &str) -> Result<Response, Error> {
353+
let mut delay = BASE_BACKOFF_MILLIS;
354+
let mut attempts = 0;
355+
356+
loop {
357+
match self.get_request(url)?.send()? {
358+
resp if attempts < self.max_retries && is_status_retryable(resp.status_code) => {
359+
thread::sleep(delay);
360+
attempts += 1;
361+
delay *= 2;
362+
}
363+
resp => return Ok(resp),
364+
}
365+
}
366+
}
342367
}
343368

344369
fn is_status_ok(status: i32) -> bool {
@@ -348,3 +373,8 @@ fn is_status_ok(status: i32) -> bool {
348373
fn is_status_not_found(status: i32) -> bool {
349374
status == 404
350375
}
376+
377+
fn is_status_retryable(status: i32) -> bool {
378+
let status = status as u16;
379+
RETRYABLE_ERROR_CODES.contains(&status)
380+
}

src/lib.rs

+24
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
use std::collections::HashMap;
7070
use std::fmt;
7171
use std::num::TryFromIntError;
72+
use std::time::Duration;
7273

7374
pub mod api;
7475

@@ -83,6 +84,19 @@ pub use blocking::BlockingClient;
8384
#[cfg(feature = "async")]
8485
pub use r#async::AsyncClient;
8586

87+
/// Response status codes for which the request may be retried.
88+
const RETRYABLE_ERROR_CODES: [u16; 3] = [
89+
429, // TOO_MANY_REQUESTS
90+
500, // INTERNAL_SERVER_ERROR
91+
503, // SERVICE_UNAVAILABLE
92+
];
93+
94+
/// Base backoff in milliseconds.
95+
const BASE_BACKOFF_MILLIS: Duration = Duration::from_millis(256);
96+
97+
/// Default max retries.
98+
const DEFAULT_MAX_RETRIES: usize = 6;
99+
86100
/// Get a fee value in sats/vbytes from the estimates
87101
/// that matches the confirmation target set as parameter.
88102
///
@@ -117,6 +131,8 @@ pub struct Builder {
117131
pub timeout: Option<u64>,
118132
/// HTTP headers to set on every request made to Esplora server.
119133
pub headers: HashMap<String, String>,
134+
/// Max retries
135+
pub max_retries: usize,
120136
}
121137

122138
impl Builder {
@@ -127,6 +143,7 @@ impl Builder {
127143
proxy: None,
128144
timeout: None,
129145
headers: HashMap::new(),
146+
max_retries: DEFAULT_MAX_RETRIES,
130147
}
131148
}
132149

@@ -148,6 +165,13 @@ impl Builder {
148165
self
149166
}
150167

168+
/// Set the maximum number of times to retry a request if the response status
169+
/// is one of [`RETRYABLE_ERROR_CODES`].
170+
pub fn max_retries(mut self, count: usize) -> Self {
171+
self.max_retries = count;
172+
self
173+
}
174+
151175
/// Build a blocking client from builder
152176
#[cfg(feature = "blocking")]
153177
pub fn build_blocking(self) -> BlockingClient {

0 commit comments

Comments
 (0)