Skip to content

Commit 3a21e71

Browse files
committed
fix merge conflicts
2 parents 1ca860f + e0cd391 commit 3a21e71

16 files changed

+502
-205
lines changed

mm2src/coins/eth/eth_rpc.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ impl EthCoin {
1818
async fn try_rpc_send(&self, method: &str, params: Vec<jsonrpc_core::Value>) -> Result<Value, web3::Error> {
1919
let mut clients = self.web3_instances.lock().await;
2020

21+
let mut error = web3::Error::Unreachable;
2122
for (i, client) in clients.clone().into_iter().enumerate() {
2223
let execute_fut = match client.web3.transport() {
2324
Web3Transport::Http(http) => http.execute(method, params.clone()),
@@ -35,8 +36,9 @@ impl EthCoin {
3536
clients.rotate_left(i);
3637
return Ok(r);
3738
},
38-
Ok(Err(rpc_error)) => {
39-
debug!("Request on '{method}' failed. Error: {rpc_error}");
39+
Ok(Err(err)) => {
40+
debug!("Request on '{method}' failed. Error: {err}");
41+
error = err;
4042

4143
if let Web3Transport::Websocket(socket_transport) = client.web3.transport() {
4244
socket_transport.stop_connection_loop().await;
@@ -52,9 +54,7 @@ impl EthCoin {
5254
};
5355
}
5456

55-
Err(web3::Error::Transport(web3::error::TransportError::Message(format!(
56-
"Request '{method}' failed due to not being able to find a living RPC client"
57-
))))
57+
Err(error)
5858
}
5959
}
6060

mm2src/coins/my_tx_history_v2.rs

-1
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,6 @@ where
514514
})
515515
}
516516

517-
#[cfg(not(target_arch = "wasm32"))]
518517
pub async fn z_coin_tx_history_rpc(
519518
ctx: MmArc,
520519
request: MyTxHistoryRequestV2<i64>,

mm2src/coins/z_coin.rs

+91-167
Large diffs are not rendered by default.

mm2src/coins/z_coin/storage.rs

+9-3
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,13 @@ pub mod blockdb;
44
pub use blockdb::*;
55

66
pub mod walletdb;
7+
#[cfg(target_arch = "wasm32")] mod z_params;
8+
#[cfg(target_arch = "wasm32")]
9+
pub(crate) use z_params::ZcashParamsWasmImpl;
10+
711
pub use walletdb::*;
812

13+
use crate::z_coin::z_balance_streaming::ZBalanceEventSender;
914
use mm2_err_handle::mm_error::MmResult;
1015
#[cfg(target_arch = "wasm32")]
1116
use walletdb::wasm::storage::DataConnStmtCacheWasm;
@@ -55,7 +60,7 @@ pub struct CompactBlockRow {
5560
#[derive(Clone)]
5661
pub enum BlockProcessingMode {
5762
Validate,
58-
Scan(DataConnStmtCacheWrapper),
63+
Scan(DataConnStmtCacheWrapper, Option<ZBalanceEventSender>),
5964
}
6065

6166
/// Checks that the scanned blocks in the data database, when combined with the recent
@@ -114,7 +119,7 @@ pub async fn scan_cached_block(
114119
params: &ZcoinConsensusParams,
115120
block: &CompactBlock,
116121
last_height: &mut BlockHeight,
117-
) -> Result<(), ValidateBlocksError> {
122+
) -> Result<usize, ValidateBlocksError> {
118123
let mut data_guard = data.inner().clone();
119124
// Fetch the ExtendedFullViewingKeys we are tracking
120125
let extfvks = data_guard.get_extended_full_viewing_keys().await?;
@@ -201,5 +206,6 @@ pub async fn scan_cached_block(
201206

202207
*last_height = current_height;
203208

204-
Ok(())
209+
// If there are any transactions in the block, return the transaction count
210+
Ok(txs.len())
205211
}

mm2src/coins/z_coin/storage/blockdb/blockdb_idb_storage.rs

+12-4
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::z_coin::storage::{scan_cached_block, validate_chain, BlockDbImpl, Blo
33
use crate::z_coin::z_coin_errors::ZcoinStorageError;
44

55
use async_trait::async_trait;
6+
use futures_util::SinkExt;
67
use mm2_core::mm_ctx::MmArc;
78
use mm2_db::indexed_db::{BeBigUint, ConstructibleDb, DbIdentifier, DbInstance, DbLocked, DbUpgrader, IndexedDb,
89
IndexedDbBuilder, InitDbResult, MultiIndex, OnUpgradeResult, TableSignature};
@@ -123,7 +124,7 @@ impl BlockDbImpl {
123124
}
124125

125126
/// Asynchronously rewinds the storage to a specified block height, effectively
126-
/// removing data beyond the specified height from the storage.
127+
/// removing data beyond the specified height from the storage.
127128
pub async fn rewind_to_height(&self, height: BlockHeight) -> ZcoinStorageRes<usize> {
128129
let locked_db = self.lock_db().await?;
129130
let db_transaction = locked_db.get_inner().transaction().await?;
@@ -224,7 +225,7 @@ impl BlockDbImpl {
224225
BlockProcessingMode::Validate => validate_from
225226
.map(|(height, _)| height)
226227
.unwrap_or(BlockHeight::from_u32(params.sapling_activation_height) - 1),
227-
BlockProcessingMode::Scan(data) => data.inner().block_height_extrema().await.map(|opt| {
228+
BlockProcessingMode::Scan(data, _) => data.inner().block_height_extrema().await.map(|opt| {
228229
opt.map(|(_, max)| max)
229230
.unwrap_or(BlockHeight::from_u32(params.sapling_activation_height) - 1)
230231
})?,
@@ -250,8 +251,15 @@ impl BlockDbImpl {
250251
BlockProcessingMode::Validate => {
251252
validate_chain(block, &mut prev_height, &mut prev_hash).await?;
252253
},
253-
BlockProcessingMode::Scan(data) => {
254-
scan_cached_block(data, &params, &block, &mut from_height).await?;
254+
BlockProcessingMode::Scan(data, z_balance_change_sender) => {
255+
let tx_size = scan_cached_block(data, &params, &block, &mut from_height).await?;
256+
// If there is/are transactions present in the current scanned block(s),
257+
// we trigger a `Triggered` event to update the balance change.
258+
if tx_size > 0 {
259+
if let Some(mut sender) = z_balance_change_sender.clone() {
260+
sender.send(()).await.expect("No receiver is available/dropped");
261+
};
262+
};
255263
},
256264
}
257265
}

mm2src/coins/z_coin/storage/blockdb/blockdb_sql_storage.rs

+11-3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::z_coin::ZcoinConsensusParams;
66
use common::async_blocking;
77
use db_common::sqlite::rusqlite::{params, Connection};
88
use db_common::sqlite::{query_single_row, run_optimization_pragmas, rusqlite};
9+
use futures_util::SinkExt;
910
use itertools::Itertools;
1011
use mm2_core::mm_ctx::MmArc;
1112
use mm2_err_handle::prelude::*;
@@ -193,7 +194,7 @@ impl BlockDbImpl {
193194
BlockProcessingMode::Validate => validate_from
194195
.map(|(height, _)| height)
195196
.unwrap_or(BlockHeight::from_u32(params.sapling_activation_height) - 1),
196-
BlockProcessingMode::Scan(data) => {
197+
BlockProcessingMode::Scan(data, _) => {
197198
let data = data.inner();
198199
data.block_height_extrema().await.map(|opt| {
199200
opt.map(|(_, max)| max)
@@ -224,8 +225,15 @@ impl BlockDbImpl {
224225
BlockProcessingMode::Validate => {
225226
validate_chain(block, &mut prev_height, &mut prev_hash).await?;
226227
},
227-
BlockProcessingMode::Scan(data) => {
228-
scan_cached_block(data, &params, &block, &mut from_height).await?;
228+
BlockProcessingMode::Scan(data, z_balance_change_sender) => {
229+
let tx_size = scan_cached_block(data, &params, &block, &mut from_height).await?;
230+
// If there are transactions present in the current scanned block,
231+
// we send a `Triggered` event to update the balance change.
232+
if tx_size > 0 {
233+
if let Some(mut sender) = z_balance_change_sender.clone() {
234+
sender.send(()).await.expect("No receiver is available/dropped");
235+
};
236+
};
229237
},
230238
}
231239
}

mm2src/coins/z_coin/storage/walletdb/wasm/storage.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ impl<'a> WalletIndexedDb {
146146
Ok(db)
147147
}
148148

149-
async fn lock_db(&self) -> ZcoinStorageRes<WalletDbInnerLocked<'_>> {
149+
pub(crate) async fn lock_db(&self) -> ZcoinStorageRes<WalletDbInnerLocked<'_>> {
150150
self.db
151151
.get_or_initialize()
152152
.await
+110
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
use crate::common::Future01CompatExt;
2+
use crate::hd_wallet::AsyncMutex;
3+
use crate::z_coin::ZCoin;
4+
use crate::{MarketCoinOps, MmCoin};
5+
6+
use async_trait::async_trait;
7+
use common::executor::{AbortSettings, SpawnAbortable};
8+
use common::log::{error, info};
9+
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
10+
use futures::channel::oneshot;
11+
use futures::channel::oneshot::{Receiver, Sender};
12+
use futures_util::StreamExt;
13+
use mm2_core::mm_ctx::MmArc;
14+
use mm2_event_stream::behaviour::{EventBehaviour, EventInitStatus};
15+
use mm2_event_stream::{Event, EventStreamConfiguration};
16+
use std::sync::Arc;
17+
18+
pub type ZBalanceEventSender = UnboundedSender<()>;
19+
pub type ZBalanceEventHandler = Arc<AsyncMutex<UnboundedReceiver<()>>>;
20+
21+
#[async_trait]
22+
impl EventBehaviour for ZCoin {
23+
const EVENT_NAME: &'static str = "COIN_BALANCE";
24+
const ERROR_EVENT_NAME: &'static str = "COIN_BALANCE_ERROR";
25+
26+
async fn handle(self, _interval: f64, tx: Sender<EventInitStatus>) {
27+
const RECEIVER_DROPPED_MSG: &str = "Receiver is dropped, which should never happen.";
28+
29+
macro_rules! send_status_on_err {
30+
($match: expr, $sender: tt, $msg: literal) => {
31+
match $match {
32+
Some(t) => t,
33+
None => {
34+
$sender
35+
.send(EventInitStatus::Failed($msg.to_owned()))
36+
.expect(RECEIVER_DROPPED_MSG);
37+
panic!("{}", $msg);
38+
},
39+
}
40+
};
41+
}
42+
43+
let ctx = send_status_on_err!(
44+
MmArc::from_weak(&self.as_ref().ctx),
45+
tx,
46+
"MM context must have been initialized already."
47+
);
48+
let z_balance_change_handler = send_status_on_err!(
49+
self.z_fields.z_balance_event_handler.as_ref(),
50+
tx,
51+
"Z balance change receiver can not be empty."
52+
);
53+
54+
tx.send(EventInitStatus::Success).expect(RECEIVER_DROPPED_MSG);
55+
56+
// Locks the balance change handler, iterates through received events, and updates balance changes accordingly.
57+
let mut bal = z_balance_change_handler.lock().await;
58+
while (bal.next().await).is_some() {
59+
match self.my_balance().compat().await {
60+
Ok(balance) => {
61+
let payload = json!({
62+
"ticker": self.ticker(),
63+
"address": self.my_z_address_encoded(),
64+
"balance": { "spendable": balance.spendable, "unspendable": balance.unspendable }
65+
});
66+
67+
ctx.stream_channel_controller
68+
.broadcast(Event::new(Self::EVENT_NAME.to_string(), payload.to_string()))
69+
.await;
70+
},
71+
Err(err) => {
72+
let ticker = self.ticker();
73+
error!("Failed getting balance for '{ticker}'. Error: {err}");
74+
let e = serde_json::to_value(err).expect("Serialization should't fail.");
75+
return ctx
76+
.stream_channel_controller
77+
.broadcast(Event::new(
78+
format!("{}:{}", Self::ERROR_EVENT_NAME, ticker),
79+
e.to_string(),
80+
))
81+
.await;
82+
},
83+
};
84+
}
85+
}
86+
87+
async fn spawn_if_active(self, config: &EventStreamConfiguration) -> EventInitStatus {
88+
if let Some(event) = config.get_event(Self::EVENT_NAME) {
89+
info!(
90+
"{} event is activated for {} address {}. `stream_interval_seconds`({}) has no effect on this.",
91+
Self::EVENT_NAME,
92+
self.ticker(),
93+
self.my_z_address_encoded(),
94+
event.stream_interval_seconds
95+
);
96+
97+
let (tx, rx): (Sender<EventInitStatus>, Receiver<EventInitStatus>) = oneshot::channel();
98+
let fut = self.clone().handle(event.stream_interval_seconds, tx);
99+
let settings =
100+
AbortSettings::info_on_abort(format!("{} event is stopped for {}.", Self::EVENT_NAME, self.ticker()));
101+
self.spawner().spawn_with_settings(fut, settings);
102+
103+
rx.await.unwrap_or_else(|e| {
104+
EventInitStatus::Failed(format!("Event initialization status must be received: {}", e))
105+
})
106+
} else {
107+
EventInitStatus::Inactive
108+
}
109+
}
110+
}

mm2src/coins/z_coin/z_coin_errors.rs

+22-14
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ pub enum ZCoinBuildError {
249249
ZCashParamsError(String),
250250
ZDerivationPathNotSet,
251251
SaplingParamsInvalidChecksum,
252+
FailedSpawningBalanceEvents(String),
252253
}
253254

254255
#[cfg(not(target_arch = "wasm32"))]
@@ -272,27 +273,32 @@ impl From<ZcoinClientInitError> for ZCoinBuildError {
272273
fn from(err: ZcoinClientInitError) -> Self { ZCoinBuildError::RpcClientInitErr(err) }
273274
}
274275

275-
#[cfg(not(target_arch = "wasm32"))]
276-
pub(super) enum SqlTxHistoryError {
276+
#[derive(Debug, Display)]
277+
pub(crate) enum ZTxHistoryError {
278+
#[cfg(not(target_arch = "wasm32"))]
277279
Sql(SqliteError),
280+
#[cfg(target_arch = "wasm32")]
281+
IndexedDbError(String),
278282
FromIdDoesNotExist(i64),
279283
}
280284

281-
#[cfg(not(target_arch = "wasm32"))]
282-
impl From<SqliteError> for SqlTxHistoryError {
283-
fn from(err: SqliteError) -> Self { SqlTxHistoryError::Sql(err) }
285+
impl From<ZTxHistoryError> for MyTxHistoryErrorV2 {
286+
fn from(err: ZTxHistoryError) -> Self { MyTxHistoryErrorV2::StorageError(err.to_string()) }
284287
}
285288

286289
#[cfg(not(target_arch = "wasm32"))]
287-
impl From<SqlTxHistoryError> for MyTxHistoryErrorV2 {
288-
fn from(err: SqlTxHistoryError) -> Self {
289-
match err {
290-
SqlTxHistoryError::Sql(sql) => MyTxHistoryErrorV2::StorageError(sql.to_string()),
291-
SqlTxHistoryError::FromIdDoesNotExist(id) => {
292-
MyTxHistoryErrorV2::StorageError(format!("from_id {} does not exist", id))
293-
},
294-
}
295-
}
290+
impl From<SqliteError> for ZTxHistoryError {
291+
fn from(err: SqliteError) -> Self { ZTxHistoryError::Sql(err) }
292+
}
293+
294+
#[cfg(target_arch = "wasm32")]
295+
impl From<DbTransactionError> for ZTxHistoryError {
296+
fn from(err: DbTransactionError) -> Self { ZTxHistoryError::IndexedDbError(err.to_string()) }
297+
}
298+
299+
#[cfg(target_arch = "wasm32")]
300+
impl From<CursorError> for ZTxHistoryError {
301+
fn from(err: CursorError) -> Self { ZTxHistoryError::IndexedDbError(err.to_string()) }
296302
}
297303

298304
pub(super) struct NoInfoAboutTx(pub(super) H256Json);
@@ -316,6 +322,7 @@ pub enum ZCoinBalanceError {
316322
impl From<ZcoinStorageError> for ZCoinBalanceError {
317323
fn from(value: ZcoinStorageError) -> Self { ZCoinBalanceError::BalanceError(value.to_string()) }
318324
}
325+
319326
/// The `ValidateBlocksError` enum encapsulates different types of errors that may occur
320327
/// during the validation and scanning process of zcoin blocks.
321328
#[derive(Debug, Display)]
@@ -342,6 +349,7 @@ pub enum ValidateBlocksError {
342349
impl From<ValidateBlocksError> for ZcoinStorageError {
343350
fn from(value: ValidateBlocksError) -> Self { Self::ValidateBlocksError(value) }
344351
}
352+
345353
impl From<MmError<ZcoinStorageError>> for ValidateBlocksError {
346354
fn from(value: MmError<ZcoinStorageError>) -> Self { Self::ZcoinStorageError(value.to_string()) }
347355
}

0 commit comments

Comments
 (0)