1
1
mod config;
2
2
3
3
use std:: {
4
- path :: PathBuf ,
5
- sync :: { atomic :: AtomicUsize , Arc } ,
4
+ sync :: Arc ,
5
+ time :: { SystemTime , UNIX_EPOCH } ,
6
6
} ;
7
7
8
8
use edr_eth:: remote:: jsonrpc;
9
9
use edr_provider:: { InvalidRequestReason , ProviderRequest } ;
10
10
use napi:: { tokio:: runtime, Env , JsFunction , JsObject , Status } ;
11
11
use napi_derive:: napi;
12
12
use serde:: Serialize ;
13
- use uuid:: Uuid ;
14
13
15
14
use self :: config:: ProviderConfig ;
16
15
use crate :: {
@@ -20,14 +19,12 @@ use crate::{
20
19
trace:: RawTrace ,
21
20
} ;
22
21
23
- const SCENARIO_DIR_ENV : & str = "EDR_SCENARIO_DIR" ;
24
-
25
22
/// A JSON-RPC provider for Ethereum.
26
23
#[ napi]
27
24
pub struct Provider {
28
25
provider : Arc < edr_provider:: Provider < LoggerError > > ,
29
- id : Uuid ,
30
- request_counter : AtomicUsize ,
26
+ # [ cfg ( feature = "scenarios" ) ]
27
+ scenario_file : Option < std :: sync :: Mutex < scenarios :: ScenarioFile > > ,
31
28
}
32
29
33
30
#[ napi]
@@ -45,37 +42,32 @@ impl Provider {
45
42
let config = edr_provider:: ProviderConfig :: try_from ( config) ?;
46
43
let runtime = runtime:: Handle :: current ( ) ;
47
44
48
- let id = Uuid :: new_v4 ( ) ;
49
- if let Some ( scenario_dir) = scenario_directory ( & id) {
50
- std:: fs:: create_dir_all ( & scenario_dir) ?;
51
- let config = ScenarioConfig {
52
- provider_config : & config,
53
- logger_enabled : logger_config. enable ,
54
- } ;
55
- // Save provider config to scenario dir
56
- let config_path = scenario_dir. join ( "config.json" ) ;
57
- std:: fs:: write ( config_path, serde_json:: to_string_pretty ( & config) ?) ?;
58
- }
59
-
60
45
let logger = Box :: new ( Logger :: new ( & env, logger_config) ?) ;
61
46
let subscriber_callback = SubscriberCallback :: new ( & env, subscriber_callback) ?;
62
47
let subscriber_callback = Box :: new ( move |event| subscriber_callback. call ( event) ) ;
63
48
64
49
let ( deferred, promise) = env. create_deferred ( ) ?;
65
50
runtime. clone ( ) . spawn_blocking ( move || {
51
+ #[ cfg( feature = "scenarios" ) ]
52
+ let scenario_file = runtime:: Handle :: current ( ) . block_on ( scenarios:: scenario_file (
53
+ & config,
54
+ edr_provider:: Logger :: is_enabled ( & * logger) ,
55
+ ) ) ?;
56
+
66
57
let result = edr_provider:: Provider :: new ( runtime, logger, subscriber_callback, config)
67
58
. map_or_else (
68
59
|error| Err ( napi:: Error :: new ( Status :: GenericFailure , error. to_string ( ) ) ) ,
69
60
|provider| {
70
61
Ok ( Provider {
71
62
provider : Arc :: new ( provider) ,
72
- id ,
73
- request_counter : AtomicUsize :: new ( 0 ) ,
63
+ # [ cfg ( feature = "scenarios" ) ]
64
+ scenario_file ,
74
65
} )
75
66
} ,
76
67
) ;
77
68
78
69
deferred. resolve ( |_env| result) ;
70
+ Ok :: < _ , napi:: Error > ( ( ) )
79
71
} ) ;
80
72
81
73
Ok ( promise)
@@ -130,16 +122,8 @@ impl Provider {
130
122
}
131
123
} ;
132
124
133
- if let Some ( scenario_dir) = scenario_directory ( & self . id ) {
134
- let count = self
135
- . request_counter
136
- . fetch_add ( 1 , std:: sync:: atomic:: Ordering :: SeqCst ) ;
137
- let name = match & request {
138
- ProviderRequest :: Single ( r) => r. method_name ( ) . into ( ) ,
139
- ProviderRequest :: Batch ( reqs) => format ! ( "batch_request_len_{}" , reqs. len( ) ) ,
140
- } ;
141
- let request_path = scenario_dir. join ( format ! ( "{count:08}_{name}.json" ) ) ;
142
- std:: fs:: write ( request_path, serde_json:: to_string_pretty ( & request) ?) ?;
125
+ if let Some ( scenario_file) = & self . scenario_file {
126
+ scenarios:: write_request ( scenario_file, & request) . await ?;
143
127
}
144
128
145
129
let mut response = runtime:: Handle :: current ( )
@@ -186,17 +170,6 @@ impl Provider {
186
170
}
187
171
}
188
172
189
- fn scenario_directory ( id : & Uuid ) -> Option < PathBuf > {
190
- let scenario_dir = std:: path:: PathBuf :: from ( std:: env:: var ( SCENARIO_DIR_ENV ) . ok ( ) ?) ;
191
- Some ( scenario_dir. join ( id. to_string ( ) ) )
192
- }
193
-
194
- #[ derive( Clone , Debug , Serialize ) ]
195
- struct ScenarioConfig < ' a > {
196
- provider_config : & ' a edr_provider:: ProviderConfig ,
197
- logger_enabled : bool ,
198
- }
199
-
200
173
#[ napi]
201
174
pub struct Response {
202
175
json : String ,
@@ -229,3 +202,114 @@ impl Response {
229
202
. collect ( )
230
203
}
231
204
}
205
+
206
+ #[ cfg( feature = "scenarios" ) ]
207
+ mod scenarios {
208
+ use std:: {
209
+ fs:: File ,
210
+ io:: { BufReader , Seek , Write } ,
211
+ sync:: Mutex ,
212
+ } ;
213
+
214
+ use flate2:: { write:: GzEncoder , Compression } ;
215
+ use napi:: tokio:: task:: { spawn_blocking, JoinError } ;
216
+ use rand:: { distributions:: Alphanumeric , Rng } ;
217
+ use serde:: Serialize ;
218
+ use tempfile:: tempfile;
219
+
220
+ use super :: * ;
221
+
222
+ const SCENARIO_FILE_PREFIX : & str = "EDR_SCENARIO_PREFIX" ;
223
+
224
+ impl Drop for Provider {
225
+ fn drop ( & mut self ) {
226
+ if let Some ( scenario_file) = self . scenario_file . take ( ) {
227
+ napi:: tokio:: task:: block_in_place ( move || {
228
+ let mut scenario_file =
229
+ scenario_file. lock ( ) . expect ( "Failed to lock scenario file" ) ;
230
+ scenario_file
231
+ . tempfile
232
+ . seek ( std:: io:: SeekFrom :: Start ( 0 ) )
233
+ . expect ( "Seek failed" ) ;
234
+ let mut input = BufReader :: new ( & mut scenario_file. tempfile ) ;
235
+
236
+ let output = File :: create ( format ! ( "{}.gz" , scenario_file. result_name) )
237
+ . expect ( "Failed to create gzipped file" ) ;
238
+ let mut encoder = GzEncoder :: new ( output, Compression :: default ( ) ) ;
239
+ encoder. finish ( ) . expect ( "Failed to finish Gzip" ) ;
240
+ } )
241
+ }
242
+ }
243
+ }
244
+
245
+ #[ derive( Debug ) ]
246
+ pub ( super ) struct ScenarioFile {
247
+ tempfile : File ,
248
+ result_name : String ,
249
+ }
250
+
251
+ #[ derive( Clone , Debug , Serialize ) ]
252
+ struct ScenarioConfig < ' a > {
253
+ provider_config : & ' a edr_provider:: ProviderConfig ,
254
+ logger_enabled : bool ,
255
+ }
256
+
257
+ pub ( super ) async fn scenario_file (
258
+ provider_config : & edr_provider:: ProviderConfig ,
259
+ logger_enabled : bool ,
260
+ ) -> Result < Option < Mutex < ScenarioFile > > , napi:: Error > {
261
+ if let Some ( scenario_prefix) = std:: env:: var ( SCENARIO_FILE_PREFIX ) . ok ( ) {
262
+ let timestamp = SystemTime :: now ( )
263
+ . duration_since ( UNIX_EPOCH )
264
+ . expect ( "Time went backwards" )
265
+ . as_secs ( ) ;
266
+ let suffix = rand:: thread_rng ( )
267
+ . sample_iter ( & Alphanumeric )
268
+ . take ( 4 )
269
+ . map ( char:: from)
270
+ . collect :: < String > ( ) ;
271
+
272
+ let mut scenario_file = spawn_blocking ( || tempfile ( ) )
273
+ . await
274
+ . map_err ( handle_join_error) ??;
275
+
276
+ let config = ScenarioConfig {
277
+ provider_config,
278
+ logger_enabled,
279
+ } ;
280
+ let mut line = serde_json:: to_string ( & config) ?;
281
+ line. push ( '\n' ) ;
282
+ spawn_blocking ( move || {
283
+ scenario_file. write_all ( line. as_bytes ( ) ) ?;
284
+
285
+ Ok ( Some ( Mutex :: new ( ScenarioFile {
286
+ tempfile : scenario_file,
287
+ result_name : format ! ( "{}_{}_{}.json" , scenario_prefix, timestamp, suffix) ,
288
+ } ) ) )
289
+ } )
290
+ . await
291
+ . map_err ( handle_join_error) ?
292
+ } else {
293
+ Ok ( None )
294
+ }
295
+ }
296
+
297
+ fn handle_join_error ( error : JoinError ) -> napi:: Error {
298
+ napi:: Error :: new ( Status :: GenericFailure , error. to_string ( ) )
299
+ }
300
+
301
+ pub ( super ) async fn write_request (
302
+ scenario_file : & Mutex < ScenarioFile > ,
303
+ request : & ProviderRequest ,
304
+ ) -> napi:: Result < ( ) > {
305
+ let mut line = serde_json:: to_string ( request) ?;
306
+ line. push ( '\n' ) ;
307
+ {
308
+ let mut scenario_file = scenario_file
309
+ . lock ( )
310
+ . map_err ( |err| napi:: Error :: new ( Status :: GenericFailure , err. to_string ( ) ) ) ?;
311
+ scenario_file. tempfile . write_all ( line. as_bytes ( ) ) ?;
312
+ }
313
+ Ok ( ( ) )
314
+ }
315
+ }
0 commit comments