Skip to content

Commit a8fc34b

Browse files
committed
Refactor.
1 parent c9d22bc commit a8fc34b

File tree

1 file changed

+122
-116
lines changed

1 file changed

+122
-116
lines changed

pycistopic-lib/src/pseudobulk.rs

Lines changed: 122 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use flate2::write::GzEncoder;
1313
use std::thread;
1414

1515

16-
#[derive(Eq, PartialEq)]
16+
#[derive(Eq, PartialEq, Clone)]
1717
struct GenomicRange {
1818
chromosome: String,
1919
start: usize,
@@ -38,7 +38,7 @@ impl PartialOrd for GenomicRange {
3838
}
3939

4040
impl GenomicRange {
41-
fn new(line: String, file_index: usize, filename: &str) -> Result<GenomicRange, custom_errors::InvalidFragmentFileError> {
41+
fn new(line: &str, file_index: usize, filename: &str) -> Result<GenomicRange, custom_errors::InvalidFragmentFileError> {
4242
let fields: Vec<&str> = line.split('\t').collect();
4343
if fields.len() < 4 {
4444
return Err(custom_errors::InvalidFragmentFileError::new(filename));
@@ -79,6 +79,102 @@ impl fmt::Display for GenomicRange {
7979
}
8080

8181

82+
struct FragmentFileReader {
83+
reader: std::io::BufReader<MultiGzDecoder<File>>,
84+
fragment: Result<GenomicRange, custom_errors::InvalidFragmentFileError>,
85+
buffer: String,
86+
valid_cell_barcodes: Vec<String>,
87+
target_chrom: String,
88+
file_index: usize,
89+
file_name: String,
90+
at_end_of_file: bool,
91+
}
92+
93+
impl FragmentFileReader {
94+
fn new(fragment_file_path: &str, valid_cell_barcodes: Vec<String>, target_chrom: String, file_index: usize) -> Result<FragmentFileReader, std::io::Error> {
95+
let file = File::open(Path::new(fragment_file_path))?;
96+
let d_file = MultiGzDecoder::new(file);
97+
let reader = std::io::BufReader::new(d_file);
98+
Ok(FragmentFileReader{
99+
reader,
100+
fragment: GenomicRange::new("", file_index, fragment_file_path),
101+
buffer: String::new(),
102+
valid_cell_barcodes,
103+
target_chrom,
104+
file_index,
105+
file_name: fragment_file_path.to_string(),
106+
at_end_of_file: false
107+
})
108+
}
109+
110+
fn at_chrom(&self) -> bool {
111+
if let Ok(fragment) = &self.fragment{
112+
fragment.chromosome == self.target_chrom
113+
} else {
114+
false
115+
}
116+
}
117+
118+
fn read_next(&mut self) -> Result<(), custom_errors::InvalidFragmentFileError>{
119+
self.buffer.clear();
120+
match self.reader.read_line(&mut self.buffer) {
121+
Ok(bytes_read) => {
122+
if bytes_read == 0 {
123+
self.at_end_of_file = true
124+
}
125+
},
126+
Err(_) => {
127+
return Err(custom_errors::InvalidFragmentFileError::new(&self.file_name))
128+
}
129+
}
130+
self.buffer = self.buffer.trim().to_string();
131+
self.fragment = GenomicRange::new(&self.buffer, self.file_index, &self.file_name);
132+
Ok(())
133+
}
134+
135+
fn skip_header(&mut self, pattern: &str) -> Result<(), custom_errors::InvalidFragmentFileError> {
136+
self.buffer = pattern.to_string();
137+
while self.buffer.starts_with(pattern) && !self.at_end_of_file {
138+
self.read_next()?;
139+
}
140+
Ok(())
141+
}
142+
143+
fn skip_to_chromosome(&mut self, chromosome: &str) -> Result<(), custom_errors::InvalidFragmentFileError> {
144+
while !self.at_end_of_file {
145+
if let Ok(fragment) = &self.fragment {
146+
if fragment.chromosome == chromosome {
147+
return Ok(());
148+
}
149+
};
150+
self.read_next()?;
151+
}
152+
Ok(())
153+
}
154+
155+
fn get_next_valid_fragment(&mut self) -> Result<Option<GenomicRange>, custom_errors::InvalidFragmentFileError> {
156+
while !self.at_end_of_file {
157+
// first check current fragment
158+
// it could be that after skip_header and skip_to_chromosome we are already at a
159+
// valid fragment.
160+
if let Ok(fragment) = &self.fragment {
161+
if self.valid_cell_barcodes.contains(&fragment.cell_barcode) {
162+
let fragment = fragment.clone();
163+
if fragment.chromosome != self.target_chrom {
164+
return Ok(None)
165+
}
166+
// read next fragment so next time we enter this function we have a new frag,et
167+
self.read_next()?;
168+
return Ok(Some(fragment));
169+
}
170+
}
171+
self.read_next()?;
172+
}
173+
Ok(None)
174+
}
175+
}
176+
177+
82178
fn split_fragments_by_cell_barcodes_for_chromosome(
83179
fragment_file_paths: &[&str],
84180
fragment_file_to_cell_barcode: &HashMap<String, Vec<String>>,
@@ -87,130 +183,40 @@ fn split_fragments_by_cell_barcodes_for_chromosome(
87183
) -> PyResult<()>{
88184

89185
// Open fragment files which are gzipped, and pos-sorted.
90-
let mut readers: Vec<_> = fragment_file_paths
91-
.iter()
92-
.map(|&path| {
93-
let file = File::open(Path::new(path))?;
94-
let d_file = MultiGzDecoder::new(file);
95-
Ok(std::io::BufReader::new(d_file))
96-
})
97-
.collect::<Result<_, std::io::Error>>()?;
186+
let mut readers: Vec<FragmentFileReader> = Vec::new();
187+
for (file_index, fragment_file_path) in fragment_file_paths.iter().enumerate() {
188+
if let Some(cell_barcodes) = fragment_file_to_cell_barcode.get(&fragment_file_path.to_string()) {
189+
readers.push(
190+
FragmentFileReader::new(
191+
fragment_file_path,
192+
cell_barcodes.to_vec(),
193+
chromosome.to_string(),
194+
file_index)?
195+
);
196+
}
197+
}
98198

99199
// A binary heap will be used to write fragments in order from different files
100200
let mut heap = BinaryHeap::new();
101201

102-
let mut line = "#".to_string();
103-
let mut bytes_read;
104-
105-
// Push the first fragment from each file to the heap
106-
for (
107-
fragment_file,
108-
(index, reader)
109-
) in fragment_file_paths.iter().zip(readers.iter_mut().enumerate()) {
110-
// skip header lines
111-
while line.starts_with("#") {
112-
line.clear();
113-
reader.read_line(&mut line)?;
114-
line = line.trim().to_string();
115-
}
116-
// Loop until a fragment with cell barcode, on the correct chromosome, in fragment_file_to_cell_barcode is found
117-
let mut fragment_found = false;
118-
// is the barcode found for the first (non-header) line?
119-
let fragment = GenomicRange::new(
120-
line.to_string(), index, fragment_file
121-
)?;
122-
match fragment_file_to_cell_barcode.get(&fragment_file.to_string()) {
123-
Some(cell_barcodes) => {
124-
if cell_barcodes.contains(&fragment.cell_barcode) && fragment.chromosome == chromosome {
125-
// Reverse so that "smaller" fragments (i.e. lower genomic location
126-
// are written first later on (heap will pop large elements first).
127-
heap.push(Reverse(fragment));
128-
fragment_found = true;
129-
}
130-
},
131-
None => {
132-
return Err(
133-
custom_errors::ValueError::new(
134-
format!(
135-
"fragment_file_to_cell_barcode does not contain entry for {}",
136-
fragment_file)
137-
).into());
138-
}
139-
}
140-
141-
// barcode not in first line.
142-
// keep reading until a fragment with correct barcode, on correct chromosome, is found.
143-
while !fragment_found {
144-
line.clear();
145-
bytes_read = reader.read_line(&mut line)?;
146-
if bytes_read == 0 {
147-
// end of file
148-
break;
149-
}
150-
line = line.trim().to_string();
151-
let fragment = GenomicRange::new(
152-
line.to_string(), index, fragment_file
153-
)?;
154-
match fragment_file_to_cell_barcode.get(&fragment_file.to_string()) {
155-
Some(cell_barcodes) => {
156-
if cell_barcodes.contains(&fragment.cell_barcode) && fragment.chromosome == chromosome {
157-
// Reverse so that "smaller" fragments (i.e. lower genomic location
158-
// are written first later on (heap will pop large elements first).
159-
heap.push(Reverse(fragment));
160-
fragment_found = true;
161-
}
162-
},
163-
None => {
164-
return Err(
165-
custom_errors::ValueError::new(
166-
format!(
167-
"fragment_file_to_cell_barcode does not contain entry for {}",
168-
fragment_file)
169-
).into());
170-
}
202+
// skip header lines, go to correct chromosome, and push first valid fragment to binary heap
203+
for reader in readers.iter_mut(){
204+
reader.skip_header("#")?;
205+
reader.skip_to_chromosome(chromosome)?;
206+
if !reader.at_end_of_file && reader.at_chrom() {
207+
if let Some(fragment) = reader.get_next_valid_fragment()? {
208+
heap.push(Reverse(fragment));
171209
}
172210
}
173211
}
174212

175213
while let Some(Reverse(fragment)) = heap.pop() {
176214
gz_output_file.write_all(format!("{}\n", fragment).as_bytes())?;
177-
// Loop until a fragment with cell barcode in fragment_file_to_cell_barcode is found
178-
let mut fragment_found = false;
179-
while !fragment_found {
180-
// Read next range from file that had the smallest range and add this to the heap.
181-
line.clear();
182-
bytes_read = readers[fragment.file_index].read_line(&mut line)?;
183-
if bytes_read == 0 {
184-
// end of file
185-
break;
186-
}
187-
line = line.trim().to_string();
188-
let next_fragment = GenomicRange::new(
189-
line.to_string(), fragment.file_index, &fragment.file_name
190-
)?;
191-
// Assuming that the fragment files are sorted.
192-
// Using the previous while loop, for each file, we should be at the correct location of the file
193-
// (i.e. where the current chromosomes are located).
194-
// if the next fragment file has a different chromosome, we should be done with this file and we can skip it.
195-
if next_fragment.chromosome != chromosome {
196-
break;
197-
}
198-
match fragment_file_to_cell_barcode.get(&next_fragment.file_name) {
199-
Some(cell_barcodes) => {
200-
if cell_barcodes.contains(&next_fragment.cell_barcode) {
201-
heap.push(Reverse(next_fragment));
202-
fragment_found = true;
203-
}
204-
},
205-
None => {
206-
return Err(
207-
custom_errors::ValueError::new(
208-
format!(
209-
"fragment_file_to_cell_barcode does not contain entry for {}",
210-
next_fragment.file_name)
211-
).into()
212-
);
213-
}
215+
// read from file that currently has the smallest genomic range
216+
let reader = &mut readers[fragment.file_index];
217+
if !reader.at_end_of_file && reader.at_chrom() {
218+
if let Some(fragment) = reader.get_next_valid_fragment()? {
219+
heap.push(Reverse(fragment));
214220
}
215221
}
216222
}

0 commit comments

Comments
 (0)