Skip to content

Commit b2dfbb3

Browse files
sander-willems-brukerlazear
authored andcommitted
feat: database splitting
FEAT: added option to iterate over fasta chunks FEAT: Added cloning to Search and QuantSettings CHORE: made reordering of target_decoys a seperate function outside of digest CHORE: implemented build db from peptide list FIX: reorder_peptide function return FIX: ambiguous target/decoy peptides are now always target FEAT: added fasta chunking params to db CHORE: refactored process_chunk from runner FEAT: added defaultto IndexedDatabase FEAT: added quick_score option to quickly filter peptides FEAT: added parsing of the prefilter fasta by chunk parameters FEAT: added extra low memory option FEAT: using heap to retain best scores in low_mem mode FIX: merge issues FEAT: auto calculate prefilter chunk size chore: linearize history via rebase
1 parent 97263b1 commit b2dfbb3

File tree

6 files changed

+281
-28
lines changed

6 files changed

+281
-28
lines changed

crates/sage-cli/src/input.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use sage_core::{
1010
};
1111
use serde::{Deserialize, Serialize};
1212

13-
#[derive(Serialize)]
13+
#[derive(Serialize, Clone)]
1414
/// Actual search parameters - may include overrides or default values not set by user
1515
pub struct Search {
1616
pub version: String,
@@ -148,7 +148,7 @@ pub struct QuantOptions {
148148
pub lfq_options: Option<LfqOptions>,
149149
}
150150

151-
#[derive(Serialize, Default)]
151+
#[derive(Serialize, Default, Clone)]
152152
pub struct QuantSettings {
153153
pub tmt: Option<Isobaric>,
154154
pub tmt_settings: TmtSettings,

crates/sage-cli/src/main.rs

+5-7
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
11
use clap::{value_parser, Arg, Command, ValueHint};
22
use input::Input;
3-
4-
mod input;
5-
mod output;
6-
mod runner;
7-
mod telemetry;
3+
use runner::Runner;
84

95
fn main() -> anyhow::Result<()> {
106
env_logger::Builder::default()
@@ -107,9 +103,11 @@ fn main() -> anyhow::Result<()> {
107103

108104
let input = Input::from_arguments(matches)?;
109105

110-
let runner_ = input.build().and_then(runner::Runner::new)?;
106+
let runner = input
107+
.build()
108+
.and_then(|parameters| Runner::new(parameters, parallel))?;
111109

112-
let tel = runner_.run(parallel, parquet)?;
110+
let tel = runner.run(parallel, parquet)?;
113111

114112
if send_telemetry {
115113
tel.send();

crates/sage-cli/src/runner.rs

+155-6
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,18 @@ use csv::ByteRecord;
66
use log::info;
77
use rayon::prelude::*;
88
use sage_cloudpath::CloudPath;
9-
use sage_core::database::IndexedDatabase;
9+
use sage_core::database::{IndexedDatabase, Parameters, PeptideIx};
10+
use sage_core::fasta::Fasta;
1011
use sage_core::ion_series::Kind;
1112
use sage_core::lfq::{Peak, PrecursorId};
1213
use sage_core::mass::Tolerance;
14+
use sage_core::peptide::Peptide;
1315
use sage_core::scoring::Fragments;
1416
use sage_core::scoring::{Feature, Scorer};
1517
use sage_core::spectrum::{ProcessedSpectrum, SpectrumProcessor};
1618
use sage_core::tmt::TmtQuant;
1719
use std::collections::HashMap;
20+
use std::collections::HashSet;
1821
use std::time::Instant;
1922

2023
pub struct Runner {
@@ -24,7 +27,8 @@ pub struct Runner {
2427
}
2528

2629
impl Runner {
27-
pub fn new(parameters: Search) -> anyhow::Result<Self> {
30+
pub fn new(parameters: Search, parallel: usize) -> anyhow::Result<Self> {
31+
let mut parameters = parameters.clone();
2832
let start = Instant::now();
2933
let fasta = sage_cloudpath::util::read_fasta(
3034
&parameters.database.fasta,
@@ -38,7 +42,32 @@ impl Runner {
3842
)
3943
})?;
4044

41-
let database = parameters.database.clone().build(fasta);
45+
let database = match parameters.database.prefilter {
46+
false => parameters.database.clone().build(fasta),
47+
true => {
48+
parameters
49+
.database
50+
.auto_calculate_prefilter_chunk_size(&fasta);
51+
if parameters.database.prefilter_chunk_size >= fasta.targets.len() {
52+
parameters.database.clone().build(fasta)
53+
} else {
54+
info!(
55+
"using {} db chunks of size {}",
56+
(fasta.targets.len() + parameters.database.prefilter_chunk_size - 1)
57+
/ parameters.database.prefilter_chunk_size,
58+
parameters.database.prefilter_chunk_size,
59+
);
60+
let mini_runner = Self {
61+
database: IndexedDatabase::default(),
62+
parameters: parameters.clone(),
63+
start,
64+
};
65+
let peptides = mini_runner.prefilter_peptides(parallel, fasta);
66+
parameters.database.clone().build_from_peptides(peptides)
67+
}
68+
}
69+
};
70+
4271
info!(
4372
"generated {} fragments, {} peptides in {}ms",
4473
database.fragments.len(),
@@ -52,6 +81,108 @@ impl Runner {
5281
})
5382
}
5483

84+
pub fn prefilter_peptides(self, parallel: usize, fasta: Fasta) -> Vec<Peptide> {
85+
let spectra: Option<Vec<ProcessedSpectrum>> =
86+
match parallel >= self.parameters.mzml_paths.len() {
87+
true => Some(self.read_processed_spectra(&self.parameters.mzml_paths, 0, 0)),
88+
false => None,
89+
};
90+
let mut all_peptides: Vec<Peptide> = fasta
91+
.iter_chunks(self.parameters.database.prefilter_chunk_size)
92+
.enumerate()
93+
.flat_map(|(chunk_id, fasta_chunk)| {
94+
let start = Instant::now();
95+
info!("pre-filtering fasta chunk {}", chunk_id,);
96+
let db = &self.parameters.database.clone().build(fasta_chunk);
97+
info!(
98+
"generated {} fragments, {} peptides in {}ms",
99+
db.fragments.len(),
100+
db.peptides.len(),
101+
(Instant::now() - start).as_millis()
102+
);
103+
let scorer = Scorer {
104+
db,
105+
precursor_tol: self.parameters.precursor_tol,
106+
fragment_tol: self.parameters.fragment_tol,
107+
min_matched_peaks: self.parameters.min_matched_peaks,
108+
min_isotope_err: self.parameters.isotope_errors.0,
109+
max_isotope_err: self.parameters.isotope_errors.1,
110+
min_precursor_charge: self.parameters.precursor_charge.0,
111+
max_precursor_charge: self.parameters.precursor_charge.1,
112+
override_precursor_charge: self.parameters.override_precursor_charge,
113+
max_fragment_charge: self.parameters.max_fragment_charge,
114+
chimera: self.parameters.chimera,
115+
report_psms: self.parameters.report_psms + 1,
116+
wide_window: self.parameters.wide_window,
117+
annotate_matches: self.parameters.annotate_matches,
118+
score_type: self.parameters.score_type,
119+
};
120+
let peptide_idxs: HashSet<PeptideIx> = match &spectra {
121+
Some(spectra) => self.peptide_filter_processed_spectra(&scorer, spectra),
122+
None => self
123+
.parameters
124+
.mzml_paths
125+
.chunks(parallel)
126+
.enumerate()
127+
.flat_map(|(chunk_idx, chunk)| {
128+
let spectra_chunk =
129+
self.read_processed_spectra(chunk, chunk_idx, parallel);
130+
self.peptide_filter_processed_spectra(&scorer, &spectra_chunk)
131+
})
132+
.collect(),
133+
}
134+
.into_iter()
135+
.collect();
136+
let peptides: Vec<Peptide> = peptide_idxs
137+
.into_iter()
138+
.map(|idx| db[idx].clone())
139+
.collect();
140+
info!(
141+
"found {} pre-filtered peptides for fasta chunk {}",
142+
peptides.len(),
143+
chunk_id,
144+
);
145+
peptides
146+
})
147+
.collect();
148+
Parameters::reorder_peptides(&mut all_peptides);
149+
all_peptides
150+
}
151+
152+
fn peptide_filter_processed_spectra(
153+
&self,
154+
scorer: &Scorer,
155+
spectra: &Vec<ProcessedSpectrum>,
156+
) -> Vec<PeptideIx> {
157+
use std::sync::atomic::{AtomicUsize, Ordering};
158+
let counter = AtomicUsize::new(0);
159+
let start = Instant::now();
160+
161+
let peptide_idxs: Vec<_> = spectra
162+
.par_iter()
163+
.filter(|spec| spec.peaks.len() >= self.parameters.min_peaks && spec.level == 2)
164+
.map(|x| {
165+
let prev = counter.fetch_add(1, Ordering::Relaxed);
166+
if prev > 0 && prev % 10_000 == 0 {
167+
let duration = Instant::now().duration_since(start).as_millis() as usize;
168+
169+
let rate = prev * 1000 / (duration + 1);
170+
log::trace!("- searched {} spectra ({} spectra/s)", prev, rate);
171+
}
172+
x
173+
})
174+
.flat_map(|spec| {
175+
scorer.quick_score(spec, self.parameters.database.prefilter_low_memory)
176+
})
177+
.collect();
178+
179+
let duration = Instant::now().duration_since(start).as_millis() as usize;
180+
let prev = counter.load(Ordering::Relaxed);
181+
let rate = prev * 1000 / (duration + 1);
182+
log::info!("- search: {:8} ms ({} spectra/s)", duration, rate);
183+
peptide_idxs
184+
}
185+
55186
fn spectrum_fdr(&self, features: &mut [Feature]) -> usize {
56187
if sage_core::ml::linear_discriminant::score_psms(features, self.parameters.precursor_tol)
57188
.is_none()
@@ -76,8 +207,8 @@ impl Runner {
76207
fn search_processed_spectra(
77208
&self,
78209
scorer: &Scorer,
79-
spectra: Vec<ProcessedSpectrum>,
80-
) -> SageResults {
210+
spectra: &Vec<ProcessedSpectrum>,
211+
) -> Vec<Feature> {
81212
use std::sync::atomic::{AtomicUsize, Ordering};
82213
let counter = AtomicUsize::new(0);
83214
let start = Instant::now();
@@ -102,7 +233,14 @@ impl Runner {
102233
let prev = counter.load(Ordering::Relaxed);
103234
let rate = prev * 1000 / (duration + 1);
104235
log::info!("- search: {:8} ms ({} spectra/s)", duration, rate);
236+
features
237+
}
105238

239+
fn complete_features(
240+
&self,
241+
spectra: Vec<ProcessedSpectrum>,
242+
features: Vec<Feature>,
243+
) -> SageResults {
106244
let quant = self
107245
.parameters
108246
.quant
@@ -132,6 +270,17 @@ impl Runner {
132270
chunk_idx: usize,
133271
batch_size: usize,
134272
) -> SageResults {
273+
let spectra = self.read_processed_spectra(chunk, chunk_idx, batch_size);
274+
let features = self.search_processed_spectra(scorer, &spectra);
275+
self.complete_features(spectra, features)
276+
}
277+
278+
fn read_processed_spectra(
279+
&self,
280+
chunk: &[String],
281+
chunk_idx: usize,
282+
batch_size: usize,
283+
) -> Vec<ProcessedSpectrum> {
135284
// Read all of the spectra at once - this can help prevent memory over-consumption issues
136285
info!(
137286
"processing files {} .. {} ",
@@ -190,7 +339,7 @@ impl Runner {
190339
let io_time = Instant::now() - start;
191340
info!("- file IO: {:8} ms", io_time.as_millis());
192341

193-
self.search_processed_spectra(scorer, spectra)
342+
spectra
194343
}
195344

196345
pub fn batch_files(&self, scorer: &Scorer, batch_size: usize) -> SageResults {

crates/sage/src/database.rs

+49-2
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,12 @@ pub struct Builder {
8484
pub generate_decoys: Option<bool>,
8585
/// Path to fasta database
8686
pub fasta: Option<String>,
87+
/// Number of sequences to handle simultaneously when pre-filtering the db
88+
pub prefilter_chunk_size: Option<usize>,
89+
/// Pre-filter the database to minimize memory usage
90+
pub prefilter: Option<bool>,
91+
/// Pre-filter the database with a minimal amount of memory at the cost of speed
92+
pub prefilter_low_memory: Option<bool>,
8793
}
8894

8995
impl Builder {
@@ -102,6 +108,9 @@ impl Builder {
102108
max_variable_mods: self.max_variable_mods.map(|x| x.max(1)).unwrap_or(2),
103109
generate_decoys: self.generate_decoys.unwrap_or(true),
104110
fasta: self.fasta.expect("A fasta file must be provided!"),
111+
prefilter_chunk_size: self.prefilter_chunk_size.unwrap_or(0),
112+
prefilter: self.prefilter.unwrap_or(false),
113+
prefilter_low_memory: self.prefilter_low_memory.unwrap_or(true),
105114
}
106115
}
107116

@@ -124,9 +133,32 @@ pub struct Parameters {
124133
pub decoy_tag: String,
125134
pub generate_decoys: bool,
126135
pub fasta: String,
136+
pub prefilter_chunk_size: usize,
137+
pub prefilter: bool,
138+
pub prefilter_low_memory: bool,
127139
}
128140

129141
impl Parameters {
142+
pub fn auto_calculate_prefilter_chunk_size(&mut self, fasta: &Fasta) {
143+
const MAX_PEPS_PER_CHUNK: usize = 10_000_000;
144+
self.prefilter_chunk_size = match self.prefilter_chunk_size {
145+
0 => {
146+
let enzyme = self.enzyme.clone().into();
147+
let total_unmodified_pep_count: usize = fasta.digest(&enzyme).len();
148+
let mod_count_estimate =
149+
(self.variable_mods.len() + 1) * (1 << self.max_variable_mods);
150+
let chunk_count =
151+
mod_count_estimate * total_unmodified_pep_count / MAX_PEPS_PER_CHUNK;
152+
if chunk_count == 0 {
153+
fasta.targets.len()
154+
} else {
155+
fasta.targets.len() / chunk_count
156+
}
157+
}
158+
x => x,
159+
};
160+
}
161+
130162
pub fn digest(&self, fasta: &Fasta) -> Vec<Peptide> {
131163
log::trace!("digesting fasta");
132164
let enzyme = self.enzyme.clone().into();
@@ -172,6 +204,12 @@ impl Parameters {
172204
})
173205
.collect::<Vec<_>>();
174206

207+
Self::reorder_peptides(&mut target_decoys);
208+
209+
target_decoys
210+
}
211+
212+
pub fn reorder_peptides(target_decoys: &mut Vec<Peptide>) {
175213
log::trace!("sorting and deduplicating peptides");
176214

177215
// This is equivalent to a stable sort
@@ -187,6 +225,9 @@ impl Parameters {
187225
&& remove.cterm == keep.cterm
188226
{
189227
keep.proteins.extend(remove.proteins.iter().cloned());
228+
// When merging peptides from different Fastas,
229+
// decoys in one fasta might be targets in another
230+
keep.decoy &= remove.decoy;
190231
true
191232
} else {
192233
false
@@ -196,13 +237,15 @@ impl Parameters {
196237
target_decoys
197238
.par_iter_mut()
198239
.for_each(|peptide| peptide.proteins.sort_unstable());
199-
200-
target_decoys
201240
}
202241

203242
// pub fn build(self) -> Result<IndexedDatabase, Box<dyn std::error::Error + Send + Sync + 'static>> {
204243
pub fn build(self, fasta: Fasta) -> IndexedDatabase {
205244
let target_decoys = self.digest(&fasta);
245+
self.build_from_peptides(target_decoys)
246+
}
247+
248+
pub fn build_from_peptides(self, target_decoys: Vec<Peptide>) -> IndexedDatabase {
206249
log::trace!("generating fragments");
207250

208251
// Finally, perform in silico digest for our target sequences
@@ -321,6 +364,7 @@ pub struct Theoretical {
321364
pub fragment_mz: f32,
322365
}
323366

367+
#[derive(Default)]
324368
pub struct IndexedDatabase {
325369
pub peptides: Vec<Peptide>,
326370
pub fragments: Vec<Theoretical>,
@@ -598,6 +642,9 @@ mod test {
598642
decoy_tag: "rev_".into(),
599643
generate_decoys: false,
600644
fasta: "none".into(),
645+
prefilter: false,
646+
prefilter_chunk_size: 0,
647+
prefilter_low_memory: true,
601648
};
602649

603650
let peptides = params.digest(&fasta);

crates/sage/src/fasta.rs

+10
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,14 @@ impl Fasta {
7777
})
7878
.collect()
7979
}
80+
81+
pub fn iter_chunks(&self, chunk_size: usize) -> impl Iterator<Item = Self> + '_ {
82+
self.targets
83+
.chunks(chunk_size)
84+
.map(move |target_chunk| Self {
85+
targets: target_chunk.to_vec(),
86+
decoy_tag: self.decoy_tag.clone(),
87+
generate_decoys: self.generate_decoys,
88+
})
89+
}
8090
}

0 commit comments

Comments
 (0)