Skip to content

Commit 0f28a5a

Browse files
committed
stake distribution dolos data source
1 parent 5beac89 commit 0f28a5a

File tree

1 file changed

+65
-7
lines changed

1 file changed

+65
-7
lines changed
Lines changed: 65 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,79 @@
1+
use crate::{
2+
Result,
3+
client::{MiniBFClient, api::MiniBFApi},
4+
};
5+
use blockfrost_openapi::models::epoch_stake_pool_content_inner::EpochStakePoolContentInner;
6+
use futures::StreamExt;
17
use sidechain_domain::*;
28
use sp_block_participation::inherent_data::BlockParticipationDataSource;
39

4-
pub struct StakeDistributionDataSourceImpl;
10+
pub struct StakeDistributionDataSourceImpl {
11+
client: MiniBFClient,
12+
}
513

614
impl StakeDistributionDataSourceImpl {
7-
pub fn new() -> Self {
8-
Self {}
15+
pub fn new(client: MiniBFClient) -> Self {
16+
Self { client }
917
}
1018
}
1119

1220
#[async_trait::async_trait]
1321
impl BlockParticipationDataSource for StakeDistributionDataSourceImpl {
1422
async fn get_stake_pool_delegation_distribution_for_pools(
1523
&self,
16-
_epoch: McEpochNumber,
17-
_pool_hashes: &[MainchainKeyHash],
18-
) -> Result<StakeDistribution, Box<dyn std::error::Error + Send + Sync>> {
19-
Err("not implemented".into())
24+
epoch_number: McEpochNumber,
25+
pool_hashes: &[MainchainKeyHash],
26+
) -> Result<StakeDistribution> {
27+
let pool_futures = futures::stream::iter(pool_hashes)
28+
.map(|pool_id| async {
29+
self.client
30+
.epochs_stakes_by_pool(epoch_number, *pool_id)
31+
.await
32+
.map(|ss| ss.iter().map(|s| (pool_id.clone(), s.clone())).collect::<Vec<_>>())
33+
})
34+
.collect::<Vec<_>>()
35+
.await;
36+
let pools = futures::future::try_join_all(pool_futures)
37+
.await?
38+
.into_iter()
39+
.flatten()
40+
.collect::<Vec<_>>();
41+
Ok(rows_to_distribution(pools))
42+
}
43+
}
44+
45+
fn rows_to_distribution(
46+
rows: Vec<(sidechain_domain::MainchainKeyHash, EpochStakePoolContentInner)>,
47+
) -> StakeDistribution {
48+
let mut res = BTreeMap::<MainchainKeyHash, PoolDelegation>::new();
49+
for (pool_id, stake) in rows {
50+
match get_delegator_key(&stake) {
51+
Ok(delegator_key) => {
52+
let pool = res.entry(pool_id).or_default();
53+
let stake_amount = stake.amount.parse().expect("valid stake amount");
54+
pool.delegators
55+
.entry(delegator_key)
56+
.or_insert(DelegatorStakeAmount(stake_amount));
57+
pool.total_stake.0 += stake_amount;
58+
},
59+
Err(e) => {
60+
log::warn!("Failed to parse EpochStakePoolContentInner: {}", e)
61+
},
62+
}
63+
}
64+
StakeDistribution(res)
65+
}
66+
67+
fn get_delegator_key(row: &EpochStakePoolContentInner) -> Result<DelegatorKey> {
68+
let (_, stake_address_hash_raw) = bech32::decode(&row.stake_address)?;
69+
match &stake_address_hash_raw[..] {
70+
[0xe0 | 0xe1, rest @ ..] => Ok(DelegatorKey::StakeKeyHash(
71+
rest.try_into().expect("infallible: stake_address_hash_raw is 29 bytes"),
72+
)),
73+
[0xf0 | 0xf1, rest @ ..] => Ok(DelegatorKey::ScriptKeyHash {
74+
hash_raw: rest.try_into().expect("infallible: stake_address_hash_raw is 29 bytes"),
75+
script_hash: [0; 28], // TODO how to get this?
76+
}),
77+
_ => Err(format!("invalid stake address hash: {}", row.stake_address).into()),
2078
}
2179
}

0 commit comments

Comments
 (0)