Skip to content

Commit 25a242b

Browse files
committed
Merge branch 'master' into plugins_support
+ Apply changes from master to code introduced in plugins. Conflicts: application/apps/indexer/Cargo.lock application/apps/indexer/indexer_cli/Cargo.toml application/apps/indexer/indexer_cli/src/interactive.rs application/apps/indexer/indexer_cli/src/main.rs application/apps/indexer/sources/benches/macros/mock_producer_multi.rs application/apps/indexer/sources/benches/macros/mock_producer_multi_parallel.rs application/apps/indexer/sources/benches/macros/mock_producer_once.rs application/apps/indexer/sources/benches/macros/mock_producer_once_parallel.rs application/apps/indexer/stypes/Cargo.toml application/apps/rustcore/rs-bindings/Cargo.lock
2 parents 1025880 + 2361937 commit 25a242b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+1068
-641
lines changed

application/apps/indexer/Cargo.lock

Lines changed: 4 additions & 40 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

application/apps/indexer/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ thiserror = "2.0"
2525
lazy_static = "1.5"
2626
tokio = { version = "1", features = ["full"] }
2727
tokio-stream = "0.1"
28-
dlt-core = { version = "0.19.3" , features = ["fibex_parser"] }
28+
dlt-core = "0.20.0"
2929
crossbeam-channel = "0.5"
3030
futures = "0.3"
3131
tokio-util = "0.7"

application/apps/indexer/addons/dlt-tools/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ pub async fn scan_dlt_ft(
4242
None,
4343
with_storage_header,
4444
);
45-
let mut producer = MessageProducer::new(parser, source, None);
45+
let mut producer = MessageProducer::new(parser, source);
4646

4747
let mut canceled = false;
4848

application/apps/indexer/session/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ edition = "2021"
88
blake3.workspace = true
99
crossbeam-channel.workspace = true
1010
dirs.workspace = true
11-
dlt-core = { workspace = true, features = ["statistics", "serde-support"] }
11+
dlt-core = { workspace = true, features = ["statistics", "serialization"] }
1212
envvars = { workspace = true }
1313
file-tools = { path = "../addons/file-tools" }
1414
futures.workspace = true

application/apps/indexer/session/src/handlers/export_raw.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ async fn export<S: ByteSource>(
144144
settings.plugin_configs.clone(),
145145
)
146146
.await?;
147-
let producer = MessageProducer::new(parser, source, None);
147+
let producer = MessageProducer::new(parser, source);
148148
export_runner(producer, dest, sections, read_to_end, false, cancel).await
149149
}
150150
stypes::ParserType::SomeIp(settings) => {
@@ -153,7 +153,7 @@ async fn export<S: ByteSource>(
153153
} else {
154154
SomeipParser::new()
155155
};
156-
let producer = MessageProducer::new(parser, source, None);
156+
let producer = MessageProducer::new(parser, source);
157157
export_runner(producer, dest, sections, read_to_end, false, cancel).await
158158
}
159159
stypes::ParserType::Dlt(settings) => {
@@ -165,11 +165,11 @@ async fn export<S: ByteSource>(
165165
None,
166166
settings.with_storage_header,
167167
);
168-
let producer = MessageProducer::new(parser, source, None);
168+
let producer = MessageProducer::new(parser, source);
169169
export_runner(producer, dest, sections, read_to_end, false, cancel).await
170170
}
171171
stypes::ParserType::Text(()) => {
172-
let producer = MessageProducer::new(StringTokenizer {}, source, None);
172+
let producer = MessageProducer::new(StringTokenizer {}, source);
173173
export_runner(producer, dest, sections, read_to_end, true, cancel).await
174174
}
175175
}

application/apps/indexer/session/src/handlers/observe.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::{
44
state::SessionStateAPI,
55
};
66
use log::error;
7-
use sources::producer::SdeReceiver;
7+
use sources::sde::SdeReceiver;
88

99
pub async fn start_observing(
1010
operation_api: OperationAPI,

application/apps/indexer/session/src/handlers/observing/mod.rs

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ use parsers::{
1414
};
1515
use plugins_host::PluginsParser;
1616
use sources::{
17-
producer::{MessageProducer, SdeReceiver},
17+
producer::MessageProducer,
18+
sde::{SdeMsg, SdeReceiver},
1819
ByteSource,
1920
};
2021
use tokio::{
@@ -27,6 +28,7 @@ enum Next<'a, T: LogMessage> {
2728
Items(&'a mut Vec<(usize, MessageStreamItem<T>)>),
2829
Timeout,
2930
Waiting,
31+
Sde(SdeMsg),
3032
}
3133

3234
pub mod concat;
@@ -84,8 +86,8 @@ async fn run_source_intern<S: ByteSource>(
8486
settings.plugin_configs.clone(),
8587
)
8688
.await?;
87-
let producer = MessageProducer::new(parser, source, rx_sde);
88-
run_producer(operation_api, state, source_id, producer, rx_tail).await
89+
let producer = MessageProducer::new(parser, source);
90+
run_producer(operation_api, state, source_id, producer, rx_tail, rx_sde).await
8991
}
9092
stypes::ParserType::SomeIp(settings) => {
9193
let someip_parser = match &settings.fibex_file_paths {
@@ -94,12 +96,12 @@ async fn run_source_intern<S: ByteSource>(
9496
}
9597
None => SomeipParser::new(),
9698
};
97-
let producer = MessageProducer::new(someip_parser, source, rx_sde);
98-
run_producer(operation_api, state, source_id, producer, rx_tail).await
99+
let producer = MessageProducer::new(someip_parser, source);
100+
run_producer(operation_api, state, source_id, producer, rx_tail, rx_sde).await
99101
}
100102
stypes::ParserType::Text(()) => {
101-
let producer = MessageProducer::new(StringTokenizer {}, source, rx_sde);
102-
run_producer(operation_api, state, source_id, producer, rx_tail).await
103+
let producer = MessageProducer::new(StringTokenizer {}, source);
104+
run_producer(operation_api, state, source_id, producer, rx_tail, rx_sde).await
103105
}
104106
stypes::ParserType::Dlt(settings) => {
105107
let fmt_options = Some(FormatOptions::from(settings.tz.as_ref()));
@@ -113,8 +115,8 @@ async fn run_source_intern<S: ByteSource>(
113115
someip_metadata.as_ref(),
114116
settings.with_storage_header,
115117
);
116-
let producer = MessageProducer::new(dlt_parser, source, rx_sde);
117-
run_producer(operation_api, state, source_id, producer, rx_tail).await
118+
let producer = MessageProducer::new(dlt_parser, source);
119+
run_producer(operation_api, state, source_id, producer, rx_tail, rx_sde).await
118120
}
119121
}
120122
}
@@ -125,6 +127,7 @@ async fn run_producer<T: LogMessage, P: Parser<T>, S: ByteSource>(
125127
source_id: u16,
126128
mut producer: MessageProducer<T, P, S>,
127129
mut rx_tail: Option<Receiver<Result<(), tail::Error>>>,
130+
mut rx_sde: Option<SdeReceiver>,
128131
) -> OperationResult<()> {
129132
use log::debug;
130133
state.set_session_file(None).await?;
@@ -145,6 +148,14 @@ async fn run_producer<T: LogMessage, P: Parser<T>, S: ByteSource>(
145148
Err(_) => Some(Next::Timeout),
146149
}
147150
} => next_from_stream,
151+
Some(sde_msg) = async {
152+
if let Some(rx_sde) = rx_sde.as_mut() {
153+
rx_sde.recv().await
154+
} else {
155+
None
156+
}
157+
} => Some(Next::Sde(sde_msg)),
158+
148159
_ = cancel.cancelled() => None,
149160
} {
150161
match next {
@@ -226,6 +237,12 @@ async fn run_producer<T: LogMessage, P: Parser<T>, S: ByteSource>(
226237
break;
227238
}
228239
}
240+
Next::Sde((msg, tx_response)) => {
241+
let sde_res = producer.sde_income(msg).await.map_err(|e| e.to_string());
242+
if tx_response.send(sde_res).is_err() {
243+
log::warn!("Fail to send back message from source");
244+
}
245+
}
229246
}
230247
}
231248
debug!("listen done");

application/apps/indexer/session/src/handlers/observing/stream.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@ use crate::{
55
};
66
use sources::{
77
command::process::ProcessSource,
8-
producer::SdeReceiver,
98
serial::serialport::SerialSource,
109
socket::{tcp::TcpSource, udp::UdpSource},
1110
};
1211

12+
use super::SdeReceiver;
13+
1314
pub async fn observe_stream(
1415
operation_api: OperationAPI,
1516
state: SessionStateAPI,
@@ -40,7 +41,7 @@ pub async fn observe_stream(
4041
.await
4142
}
4243
stypes::Transport::TCP(settings) => {
43-
let tcp_source = TcpSource::new(settings.bind_addr.clone(), None)
44+
let tcp_source = TcpSource::new(&settings.bind_addr, None, None)
4445
.await
4546
.map_err(|e| stypes::NativeError {
4647
severity: stypes::Severity::ERROR,

application/apps/indexer/session/src/operations.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use log::{debug, error, warn};
33
use merging::merger::FileMergeOptions;
44
use processor::search::filter::SearchFilter;
55
use serde::Serialize;
6-
use sources::producer::{SdeReceiver, SdeSender};
6+
use sources::sde::{SdeReceiver, SdeSender};
77
use std::{
88
ops::RangeInclusive,
99
path::PathBuf,

application/apps/indexer/session/src/tracker.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{operations::OperationStat, progress::ProgressProviderAPI, state::SessionStateAPI};
22
use log::{debug, error};
3-
use sources::producer::SdeSender;
3+
use sources::sde::SdeSender;
44
use std::collections::{hash_map::Entry, HashMap};
55
use tokio::{
66
sync::{

application/apps/indexer/session/src/unbound/commands/dlt.rs

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,36 @@
11
use crate::unbound::signal::Signal;
2-
use dlt_core::statistics::{collect_dlt_stats, StatisticInfo};
3-
use std::path::Path;
2+
use dlt_core::{
3+
read::DltMessageReader,
4+
statistics::{
5+
collect_statistics,
6+
common::{StatisticInfo, StatisticInfoCollector},
7+
},
8+
};
9+
use std::fs::File;
410

511
pub fn stats(
6-
files: Vec<String>,
12+
file_paths: Vec<String>,
713
_signal: Signal,
814
) -> Result<stypes::CommandOutcome<stypes::DltStatisticInfo>, stypes::ComputationError> {
915
let mut stat = StatisticInfo::new();
1016
let mut error: Option<String> = None;
11-
files.iter().for_each(|file| {
17+
file_paths.iter().for_each(|file_path| {
1218
if error.is_some() {
1319
return;
1420
}
15-
match collect_dlt_stats(Path::new(&file)) {
16-
Ok(res) => {
17-
stat.merge(res);
21+
match File::open(file_path) {
22+
Ok(file) => {
23+
let mut reader = DltMessageReader::new(file, true);
24+
let mut collector = StatisticInfoCollector::default();
25+
26+
match collect_statistics(&mut reader, &mut collector) {
27+
Ok(()) => {
28+
stat.merge(collector.collect());
29+
}
30+
Err(err) => {
31+
error = Some(err.to_string());
32+
}
33+
}
1834
}
1935
Err(err) => {
2036
error = Some(err.to_string());

application/apps/indexer/sources/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ regex.workspace = true
2525
lazy_static.workspace = true
2626
shellexpand = "3.1"
2727
stypes = { path = "../stypes", features=["rustcore"] }
28+
socket2 = "0.5.8"
2829

2930
[dev-dependencies]
3031
env_logger.workspace = true

application/apps/indexer/sources/benches/dlt_producer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
2-
use std::{hint::black_box, path::PathBuf};
2+
use std::path::PathBuf;
33

44
use bench_utls::{
55
bench_standrad_config, create_binary_bytesource, get_config, read_binary, run_producer,
@@ -36,7 +36,7 @@ fn dlt_producer(c: &mut Criterion) {
3636
|| {
3737
let parser = DltParser::new(None, fibex.as_ref(), None, None, true);
3838
let source = create_binary_bytesource(data);
39-
MessageProducer::new(parser, source, black_box(None))
39+
MessageProducer::new(parser, source)
4040
},
4141
run_producer,
4242
BatchSize::SmallInput,

application/apps/indexer/sources/benches/mocks_multi_parallel.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@ fn mocks_multi_parallel(c: &mut Criterion) {
4040
for _ in 0..tasks_count {
4141
let parser = MockParser::new_multi(max);
4242
let byte_source = MockByteSource::new();
43-
let producer =
44-
MessageProducer::new(parser, byte_source, black_box(None));
43+
let producer = MessageProducer::new(parser, byte_source);
4544
producers.push(producer);
4645
}
4746
producers

application/apps/indexer/sources/benches/mocks_multi_producer.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use bench_utls::{bench_standrad_config, run_producer};
77
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
88
use mocks::{mock_parser::MockParser, mock_source::MockByteSource};
99
use sources::producer::MessageProducer;
10-
use std::hint::black_box;
1110
mod bench_utls;
1211
mod mocks;
1312

@@ -33,7 +32,7 @@ fn mocks_multi_producer(c: &mut Criterion) {
3332
|| {
3433
let parser = MockParser::new_multi(max);
3534
let byte_source = MockByteSource::new();
36-
let producer = MessageProducer::new(parser, byte_source, black_box(None));
35+
let producer = MessageProducer::new(parser, byte_source);
3736
producer
3837
},
3938
|producer| run_producer(producer),

application/apps/indexer/sources/benches/mocks_once_parallel.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@ fn mocks_once_parallel(c: &mut Criterion) {
3737
for _ in 0..tasks_count {
3838
let parser = MockParser::new_once(max);
3939
let byte_source = MockByteSource::new();
40-
let producer =
41-
MessageProducer::new(parser, byte_source, black_box(None));
40+
let producer = MessageProducer::new(parser, byte_source);
4241
producers.push(producer);
4342
}
4443
producers

0 commit comments

Comments
 (0)